diff options
author | Jake McGinty <me@jake.su> | 2018-04-24 21:59:43 -0700 |
---|---|---|
committer | Jake McGinty <me@jake.su> | 2018-04-24 21:59:45 -0700 |
commit | a6f923d220fa873311a732866c148afc6ea592f2 (patch) | |
tree | ac067c4def2f5e3ac7b65826a0e308fbe5ca685b /src | |
parent | tests: more informative error bubbling for netns failures (diff) | |
download | wireguard-rs-a6f923d220fa873311a732866c148afc6ea592f2.tar.xz wireguard-rs-a6f923d220fa873311a732866c148afc6ea592f2.zip |
timer: use tokio-timer 0.2 and make timers cancelable
Diffstat (limited to 'src')
-rw-r--r-- | src/interface/peer_server.rs | 1 | ||||
-rw-r--r-- | src/lib.rs | 2 | ||||
-rw-r--r-- | src/timer.rs | 44 |
3 files changed, 27 insertions, 20 deletions
diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs index 954b3a0..1e082b0 100644 --- a/src/interface/peer_server.rs +++ b/src/interface/peer_server.rs @@ -398,7 +398,6 @@ impl PeerServer { } Ok(()) } - } impl Future for PeerServer { @@ -30,10 +30,10 @@ extern crate snow; extern crate socket2; extern crate subtle; extern crate test; +extern crate tokio; extern crate tokio_io; extern crate tokio_uds; extern crate tokio_utun; -extern crate tokio_timer; extern crate tokio_signal; extern crate treebitmap; extern crate x25519_dalek; diff --git a/src/timer.rs b/src/timer.rs index bbf80f3..ca00e29 100644 --- a/src/timer.rs +++ b/src/timer.rs @@ -1,8 +1,8 @@ use consts::TIMER_RESOLUTION; -use futures::{Future, Stream, Sink, Poll, unsync}; -use std::time::Duration; +use futures::{Async, Future, Stream, Sink, Poll, unsync}; +use std::time::{Instant, Duration}; +use tokio::timer::Delay; use tokio_core::reactor::Handle; -use tokio_timer; use interface::SharedPeer; #[derive(Debug)] @@ -13,9 +13,18 @@ pub enum TimerMessage { Wipe(SharedPeer), } +pub struct TimerHandle { + tx: unsync::oneshot::Sender<()> +} + +impl TimerHandle { + pub fn cancel(self) -> Result<(), ()> { + self.tx.send(()) + } +} + pub struct Timer { handle: Handle, - timer: tokio_timer::Timer, tx: unsync::mpsc::Sender<TimerMessage>, rx: unsync::mpsc::Receiver<TimerMessage>, } @@ -23,25 +32,24 @@ pub struct Timer { impl Timer { pub fn new(handle: Handle) -> Self { let (tx, rx) = unsync::mpsc::channel::<TimerMessage>(1024); - let timer = tokio_timer::wheel() - .tick_duration(*TIMER_RESOLUTION) - .num_slots(1 << 14) - .build(); - Self { handle, timer, tx, rx } + Self { handle, tx, rx } } - pub fn send_after(&mut self, delay: Duration, message: TimerMessage) { + pub fn send_after(&mut self, delay: Duration, message: TimerMessage) -> TimerHandle { trace!("queuing timer message {:?}", &message); - let timer = self.timer.sleep(delay + (*TIMER_RESOLUTION * 2)); - let future = timer.and_then({ - let tx = self.tx.clone(); - move |_| { - tx.clone().send(message).then(|_| Ok(())) - } - }).then(|_| Ok(())); + let (cancel_tx, mut cancel_rx) = unsync::oneshot::channel(); + let tx = self.tx.clone(); + let future = Delay::new(Instant::now() + delay + (*TIMER_RESOLUTION * 2)) + .map_err(|e| panic!("timer failed; err={:?}", e)) + .and_then(move |_| { + if let Ok(Async::Ready(())) = cancel_rx.poll() { + trace!("timer cancel signal sent, won't send message."); + } + tx.send(message).then(|_| Ok(())) + }); self.handle.spawn(future); + TimerHandle { tx: cancel_tx } } - } impl Stream for Timer { |