1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
use failure::Error;
use futures::{self, Future, Async, Poll, Stream};
use notify::{self, Watcher, RecursiveMode, RawEvent, raw_watcher};
use std::{self, thread, time::Duration, path::Path};
use tokio_core::reactor::Handle;
use tokio_signal::{self, unix::{Signal, SIGTERM}};
pub struct GrimReaper {
rx: futures::sync::oneshot::Receiver<()>,
signal: Box<Stream<Item = (), Error = std::io::Error>>,
}
impl GrimReaper {
pub fn spawn(handle: &Handle, socket_path: &Path) -> Result<Self, Error> {
let (std_tx, std_rx) = std::sync::mpsc::channel::<RawEvent>();
let (tx, rx ) = futures::sync::oneshot::channel::<()>();
let path = socket_path.to_owned();
debug!("grim reaper spawning for {}.", socket_path.to_string_lossy());
thread::Builder::new()
.name("grim reaper".into())
.spawn(move || {
thread::sleep(Duration::from_millis(500)); // TODO we shouldn't need this.
let mut watcher = raw_watcher(std_tx).unwrap();
watcher.watch(path, RecursiveMode::Recursive).unwrap();
loop {
debug!("listening");
let event = std_rx.recv().unwrap();
debug!("FS EVENT: {:?}", event);
if event.op.unwrap() == notify::op::REMOVE {
tx.send(()).unwrap();
panic!("configuration socket removed, sounding death cry.")
}
}
})?;
let sigint = tokio_signal::ctrl_c(handle).flatten_stream();
let sigterm = Signal::new(SIGTERM, handle).flatten_stream().map(|_| ());
let signal = Box::new(sigint.select(sigterm));
Ok(Self { rx, signal })
}
}
impl Future for GrimReaper {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.rx.poll() {
Ok(Async::NotReady) => {},
_ => {
info!("configuration socket removed, bubbling up to reactor core.");
return Err(())
},
}
match self.signal.poll() {
Ok(Async::NotReady) => {},
_ => {
info!("SIGINT received, bubbling up to reactor core.");
return Err(())
},
}
Ok(Async::NotReady)
}
}
|