aboutsummaryrefslogtreecommitdiffstats
path: root/src/timer.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/timer.rs')
-rw-r--r--src/timer.rs44
1 files changed, 26 insertions, 18 deletions
diff --git a/src/timer.rs b/src/timer.rs
index bbf80f3..ca00e29 100644
--- a/src/timer.rs
+++ b/src/timer.rs
@@ -1,8 +1,8 @@
use consts::TIMER_RESOLUTION;
-use futures::{Future, Stream, Sink, Poll, unsync};
-use std::time::Duration;
+use futures::{Async, Future, Stream, Sink, Poll, unsync};
+use std::time::{Instant, Duration};
+use tokio::timer::Delay;
use tokio_core::reactor::Handle;
-use tokio_timer;
use interface::SharedPeer;
#[derive(Debug)]
@@ -13,9 +13,18 @@ pub enum TimerMessage {
Wipe(SharedPeer),
}
+pub struct TimerHandle {
+ tx: unsync::oneshot::Sender<()>
+}
+
+impl TimerHandle {
+ pub fn cancel(self) -> Result<(), ()> {
+ self.tx.send(())
+ }
+}
+
pub struct Timer {
handle: Handle,
- timer: tokio_timer::Timer,
tx: unsync::mpsc::Sender<TimerMessage>,
rx: unsync::mpsc::Receiver<TimerMessage>,
}
@@ -23,25 +32,24 @@ pub struct Timer {
impl Timer {
pub fn new(handle: Handle) -> Self {
let (tx, rx) = unsync::mpsc::channel::<TimerMessage>(1024);
- let timer = tokio_timer::wheel()
- .tick_duration(*TIMER_RESOLUTION)
- .num_slots(1 << 14)
- .build();
- Self { handle, timer, tx, rx }
+ Self { handle, tx, rx }
}
- pub fn send_after(&mut self, delay: Duration, message: TimerMessage) {
+ pub fn send_after(&mut self, delay: Duration, message: TimerMessage) -> TimerHandle {
trace!("queuing timer message {:?}", &message);
- let timer = self.timer.sleep(delay + (*TIMER_RESOLUTION * 2));
- let future = timer.and_then({
- let tx = self.tx.clone();
- move |_| {
- tx.clone().send(message).then(|_| Ok(()))
- }
- }).then(|_| Ok(()));
+ let (cancel_tx, mut cancel_rx) = unsync::oneshot::channel();
+ let tx = self.tx.clone();
+ let future = Delay::new(Instant::now() + delay + (*TIMER_RESOLUTION * 2))
+ .map_err(|e| panic!("timer failed; err={:?}", e))
+ .and_then(move |_| {
+ if let Ok(Async::Ready(())) = cancel_rx.poll() {
+ trace!("timer cancel signal sent, won't send message.");
+ }
+ tx.send(message).then(|_| Ok(()))
+ });
self.handle.spawn(future);
+ TimerHandle { tx: cancel_tx }
}
-
}
impl Stream for Timer {