diff options
Diffstat (limited to 'src/timer.rs')
-rw-r--r-- | src/timer.rs | 44 |
1 files changed, 26 insertions, 18 deletions
diff --git a/src/timer.rs b/src/timer.rs index bbf80f3..ca00e29 100644 --- a/src/timer.rs +++ b/src/timer.rs @@ -1,8 +1,8 @@ use consts::TIMER_RESOLUTION; -use futures::{Future, Stream, Sink, Poll, unsync}; -use std::time::Duration; +use futures::{Async, Future, Stream, Sink, Poll, unsync}; +use std::time::{Instant, Duration}; +use tokio::timer::Delay; use tokio_core::reactor::Handle; -use tokio_timer; use interface::SharedPeer; #[derive(Debug)] @@ -13,9 +13,18 @@ pub enum TimerMessage { Wipe(SharedPeer), } +pub struct TimerHandle { + tx: unsync::oneshot::Sender<()> +} + +impl TimerHandle { + pub fn cancel(self) -> Result<(), ()> { + self.tx.send(()) + } +} + pub struct Timer { handle: Handle, - timer: tokio_timer::Timer, tx: unsync::mpsc::Sender<TimerMessage>, rx: unsync::mpsc::Receiver<TimerMessage>, } @@ -23,25 +32,24 @@ pub struct Timer { impl Timer { pub fn new(handle: Handle) -> Self { let (tx, rx) = unsync::mpsc::channel::<TimerMessage>(1024); - let timer = tokio_timer::wheel() - .tick_duration(*TIMER_RESOLUTION) - .num_slots(1 << 14) - .build(); - Self { handle, timer, tx, rx } + Self { handle, tx, rx } } - pub fn send_after(&mut self, delay: Duration, message: TimerMessage) { + pub fn send_after(&mut self, delay: Duration, message: TimerMessage) -> TimerHandle { trace!("queuing timer message {:?}", &message); - let timer = self.timer.sleep(delay + (*TIMER_RESOLUTION * 2)); - let future = timer.and_then({ - let tx = self.tx.clone(); - move |_| { - tx.clone().send(message).then(|_| Ok(())) - } - }).then(|_| Ok(())); + let (cancel_tx, mut cancel_rx) = unsync::oneshot::channel(); + let tx = self.tx.clone(); + let future = Delay::new(Instant::now() + delay + (*TIMER_RESOLUTION * 2)) + .map_err(|e| panic!("timer failed; err={:?}", e)) + .and_then(move |_| { + if let Ok(Async::Ready(())) = cancel_rx.poll() { + trace!("timer cancel signal sent, won't send message."); + } + tx.send(message).then(|_| Ok(())) + }); self.handle.spawn(future); + TimerHandle { tx: cancel_tx } } - } impl Stream for Timer { |