aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorJake McGinty <me@jake.su>2018-04-24 21:59:43 -0700
committerJake McGinty <me@jake.su>2018-04-24 21:59:45 -0700
commita6f923d220fa873311a732866c148afc6ea592f2 (patch)
treeac067c4def2f5e3ac7b65826a0e308fbe5ca685b /src
parenttests: more informative error bubbling for netns failures (diff)
downloadwireguard-rs-a6f923d220fa873311a732866c148afc6ea592f2.tar.xz
wireguard-rs-a6f923d220fa873311a732866c148afc6ea592f2.zip
timer: use tokio-timer 0.2 and make timers cancelable
Diffstat (limited to 'src')
-rw-r--r--src/interface/peer_server.rs1
-rw-r--r--src/lib.rs2
-rw-r--r--src/timer.rs44
3 files changed, 27 insertions, 20 deletions
diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs
index 954b3a0..1e082b0 100644
--- a/src/interface/peer_server.rs
+++ b/src/interface/peer_server.rs
@@ -398,7 +398,6 @@ impl PeerServer {
}
Ok(())
}
-
}
impl Future for PeerServer {
diff --git a/src/lib.rs b/src/lib.rs
index d39ba78..5302f19 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -30,10 +30,10 @@ extern crate snow;
extern crate socket2;
extern crate subtle;
extern crate test;
+extern crate tokio;
extern crate tokio_io;
extern crate tokio_uds;
extern crate tokio_utun;
-extern crate tokio_timer;
extern crate tokio_signal;
extern crate treebitmap;
extern crate x25519_dalek;
diff --git a/src/timer.rs b/src/timer.rs
index bbf80f3..ca00e29 100644
--- a/src/timer.rs
+++ b/src/timer.rs
@@ -1,8 +1,8 @@
use consts::TIMER_RESOLUTION;
-use futures::{Future, Stream, Sink, Poll, unsync};
-use std::time::Duration;
+use futures::{Async, Future, Stream, Sink, Poll, unsync};
+use std::time::{Instant, Duration};
+use tokio::timer::Delay;
use tokio_core::reactor::Handle;
-use tokio_timer;
use interface::SharedPeer;
#[derive(Debug)]
@@ -13,9 +13,18 @@ pub enum TimerMessage {
Wipe(SharedPeer),
}
+pub struct TimerHandle {
+ tx: unsync::oneshot::Sender<()>
+}
+
+impl TimerHandle {
+ pub fn cancel(self) -> Result<(), ()> {
+ self.tx.send(())
+ }
+}
+
pub struct Timer {
handle: Handle,
- timer: tokio_timer::Timer,
tx: unsync::mpsc::Sender<TimerMessage>,
rx: unsync::mpsc::Receiver<TimerMessage>,
}
@@ -23,25 +32,24 @@ pub struct Timer {
impl Timer {
pub fn new(handle: Handle) -> Self {
let (tx, rx) = unsync::mpsc::channel::<TimerMessage>(1024);
- let timer = tokio_timer::wheel()
- .tick_duration(*TIMER_RESOLUTION)
- .num_slots(1 << 14)
- .build();
- Self { handle, timer, tx, rx }
+ Self { handle, tx, rx }
}
- pub fn send_after(&mut self, delay: Duration, message: TimerMessage) {
+ pub fn send_after(&mut self, delay: Duration, message: TimerMessage) -> TimerHandle {
trace!("queuing timer message {:?}", &message);
- let timer = self.timer.sleep(delay + (*TIMER_RESOLUTION * 2));
- let future = timer.and_then({
- let tx = self.tx.clone();
- move |_| {
- tx.clone().send(message).then(|_| Ok(()))
- }
- }).then(|_| Ok(()));
+ let (cancel_tx, mut cancel_rx) = unsync::oneshot::channel();
+ let tx = self.tx.clone();
+ let future = Delay::new(Instant::now() + delay + (*TIMER_RESOLUTION * 2))
+ .map_err(|e| panic!("timer failed; err={:?}", e))
+ .and_then(move |_| {
+ if let Ok(Async::Ready(())) = cancel_rx.poll() {
+ trace!("timer cancel signal sent, won't send message.");
+ }
+ tx.send(message).then(|_| Ok(()))
+ });
self.handle.spawn(future);
+ TimerHandle { tx: cancel_tx }
}
-
}
impl Stream for Timer {