summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/timers/timer.rs76
1 files changed, 38 insertions, 38 deletions
diff --git a/src/timers/timer.rs b/src/timers/timer.rs
index 5426c79..a0774ac 100644
--- a/src/timers/timer.rs
+++ b/src/timers/timer.rs
@@ -12,7 +12,7 @@ extern crate test;
type TimerID = u64;
type TimerKey = (u64, usize);
-type Callback = (Arc<AtomicBool>, Box<dyn Fn() -> () + Send + 'static>);
+type Callback = (Weak<TimerInner>, Box<dyn Fn() -> () + Send + 'static>);
const ACCURACY: Duration = Duration::from_millis(100);
const OFFSET: Duration = Duration::from_millis(1000);
@@ -24,13 +24,16 @@ struct RunnerInner {
callback: spin::Mutex<HashMap<TimerID, Callback>>,
}
-pub struct Timer {
- pending: Arc<AtomicBool>,
- runner: Weak<RunnerInner>,
+struct TimerInner {
id: u64,
+ pending: AtomicBool,
+ runner: Weak<RunnerInner>,
cnt: AtomicUsize,
}
+#[derive(Clone)]
+pub struct Timer(Arc<TimerInner>);
+
pub struct Runner(Arc<RunnerInner>, Option<thread::JoinHandle<()>>);
impl Runner {
@@ -64,13 +67,13 @@ impl Runner {
// handle expired events
for key in &expired {
- if let Some((pending, callback)) = inner.callback.lock().get(&key.0) {
- if pending.swap(false, Ordering::SeqCst) {
+ let callbacks = inner.callback.lock();
+ let (timer, callback) = callbacks.get(&key.0).unwrap();
+ if let Some(timer) = timer.upgrade() {
+ if timer.pending.swap(false, Ordering::SeqCst) {
callback();
}
- } else {
- unreachable!()
- };
+ }
}
}
})
@@ -81,56 +84,59 @@ impl Runner {
pub fn timer(&self, callback: Box<dyn Fn() -> () + Send + 'static>) -> Timer {
let id = self.0.keys.fetch_add(1, Ordering::Relaxed);
- let pending = Arc::new(AtomicBool::new(false));
+ let inner = Arc::new(TimerInner {
+ id,
+ pending: AtomicBool::new(false),
+ runner: Arc::downgrade(&self.0.clone()),
+ cnt: AtomicUsize::new(0),
+ });
assert!(id < u64::MAX, "wrapping of ids");
self.0
.callback
.lock()
- .insert(id, (pending.clone(), callback));
+ .insert(id, (Arc::downgrade(&inner), callback));
- Timer {
- id,
- pending: pending,
- runner: Arc::downgrade(&self.0.clone()),
- cnt: AtomicUsize::new(0),
- }
+ Timer(inner)
}
}
impl Timer {
pub fn reset(&self, duration: Duration) {
- if let Some(runner) = self.runner.upgrade() {
+ let timer = &self.0;
+ if let Some(runner) = timer.runner.upgrade() {
let mut wheel = runner.wheel.lock();
- let cnt = self.cnt.fetch_add(1, Ordering::SeqCst);
- self.pending.store(true, Ordering::SeqCst);
- wheel.stop((self.id, cnt));
- wheel.start((self.id, cnt + 1), duration - OFFSET);
+ let cnt = timer.cnt.fetch_add(1, Ordering::SeqCst);
+ timer.pending.store(true, Ordering::SeqCst);
+ wheel.stop((timer.id, cnt));
+ wheel.start((timer.id, cnt + 1), duration - OFFSET);
}
}
pub fn start(&self, duration: Duration) {
- if self.pending.load(Ordering::Acquire) {
+ let timer = &self.0;
+ if timer.pending.load(Ordering::Acquire) {
return;
}
- if let Some(runner) = self.runner.upgrade() {
+ if let Some(runner) = timer.runner.upgrade() {
let mut wheel = runner.wheel.lock();
- if !self.pending.swap(true, Ordering::SeqCst) {
- let cnt = self.cnt.fetch_add(1, Ordering::SeqCst);
- wheel.start((self.id, cnt + 1), duration - OFFSET);
+ if !timer.pending.swap(true, Ordering::SeqCst) {
+ let cnt = timer.cnt.fetch_add(1, Ordering::SeqCst);
+ wheel.start((timer.id, cnt + 1), duration - OFFSET);
}
}
}
pub fn stop(&self) {
- if self.pending.load(Ordering::Acquire) {
- if let Some(runner) = self.runner.upgrade() {
+ let timer = &self.0;
+ if timer.pending.load(Ordering::Acquire) {
+ if let Some(runner) = timer.runner.upgrade() {
let mut wheel = runner.wheel.lock();
- if self.pending.swap(false, Ordering::SeqCst) {
- let cnt = self.cnt.load(Ordering::SeqCst);
- wheel.stop((self.id, cnt));
+ if timer.pending.swap(false, Ordering::SeqCst) {
+ let cnt = timer.cnt.load(Ordering::SeqCst);
+ wheel.stop((timer.id, cnt));
}
}
}
@@ -146,12 +152,6 @@ impl Drop for Runner {
}
}
-impl Drop for Timer {
- fn drop(&mut self) {
- self.stop();
- }
-}
-
#[cfg(test)]
mod tests {
use super::*;