aboutsummaryrefslogtreecommitdiffstats
path: root/src/interface/grim_reaper.rs
blob: 1c5029dddbd7e5061b1ea9394386f165699437c1 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
use failure::Error;
use futures::{self, Future, Async, Poll, Stream};
use notify::{self, Watcher, RecursiveMode, RawEvent, raw_watcher};
use std::{self, thread, time::Duration, path::Path};
use tokio_core::reactor::Handle;
use tokio_signal::{self, unix::{Signal, SIGTERM}};

pub struct GrimReaper {
    rx: futures::sync::oneshot::Receiver<()>,
    signal: Box<Stream<Item = (), Error = std::io::Error>>,
}

impl GrimReaper {
    pub fn spawn(handle: &Handle, socket_path: &Path) -> Result<Self, Error> {
        let (std_tx, std_rx) = std::sync::mpsc::channel::<RawEvent>();
        let (tx,     rx    ) = futures::sync::oneshot::channel::<()>();

        let path = socket_path.to_owned();
        debug!("grim reaper spawning for {}.", socket_path.to_string_lossy());

        thread::Builder::new()
            .name("grim reaper".into())
            .spawn(move || {
                thread::sleep(Duration::from_millis(500)); // TODO we shouldn't need this.
                let mut watcher = raw_watcher(std_tx).unwrap();
                watcher.watch(path, RecursiveMode::Recursive).unwrap();

                loop {
                    debug!("listening");
                    let event = std_rx.recv().unwrap();
                    debug!("FS EVENT: {:?}", event);
                    if event.op.unwrap() == notify::op::REMOVE {
                        tx.send(()).unwrap();
                        panic!("configuration socket removed, sounding death cry.")
                    }
                }
            })?;

        let sigint  = tokio_signal::ctrl_c(handle).flatten_stream();
        let sigterm = Signal::new(SIGTERM, handle).flatten_stream().map(|_| ());
        let signal  = Box::new(sigint.select(sigterm));

        Ok(Self { rx, signal })
    }
}

impl Future for GrimReaper {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        match self.rx.poll() {
            Ok(Async::NotReady) => {},
            _ => {
                info!("configuration socket removed, bubbling up to reactor core.");
                return Err(())
            },
        }

        match self.signal.poll() {
            Ok(Async::NotReady) => {},
            _ => {
                info!("SIGINT received, bubbling up to reactor core.");
                return Err(())
            },
        }

        Ok(Async::NotReady)
    }
}