From c5aba7183dc7c893ad42d41d9cac19e782715533 Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Thu, 22 Aug 2019 14:59:15 +0200 Subject: Initial version of timer framework --- src/timers/timer.rs | 146 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 146 insertions(+) create mode 100644 src/timers/timer.rs (limited to 'src/timers/timer.rs') diff --git a/src/timers/timer.rs b/src/timers/timer.rs new file mode 100644 index 0000000..484a5c2 --- /dev/null +++ b/src/timers/timer.rs @@ -0,0 +1,146 @@ +use ferris::*; +use spin; +use std::collections::HashMap; +use std::mem; +use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; +use std::sync::{Arc, Weak}; +use std::thread; +use std::time::{Duration, Instant}; +use std::u64; + +type TimerID = u64; +type TimerKey = (u64, usize); +type Callback = (Arc, Box () + Send + 'static>); + +const ACCURACY: Duration = Duration::from_millis(100); +const OFFSET: Duration = Duration::from_millis(1000); + +struct RunnerInner { + keys: AtomicU64, + wheel: spin::Mutex>, + running: AtomicBool, + callback: spin::Mutex>, +} + +pub struct Timer { + pending: Arc, + runner: Weak, + id: u64, + cnt: AtomicUsize, +} + +struct Runner(Arc, Option>); + +impl Runner { + fn new() -> Self { + let inner = Arc::new(RunnerInner { + running: AtomicBool::new(true), + callback: spin::Mutex::new(HashMap::new()), + keys: AtomicU64::new(0), + wheel: spin::Mutex::new(CopyWheel::::new(vec![ + Resolution::HundredMs, + Resolution::Sec, + Resolution::Min, + ])), + }); + + // start runner thread + let handle = { + let inner = inner.clone(); + thread::spawn(move || { + let mut next = Instant::now() + ACCURACY; + while inner.running.load(Ordering::Acquire) { + // sleep + let now = Instant::now(); + if next > now { + thread::sleep(next - now); + } + next = next + ACCURACY; + + // extract expired events + let expired = inner.wheel.lock().expire(); + + // handle expired events + for key in &expired { + if let Some((pending, callback)) = inner.callback.lock().get(&key.0) { + if pending.swap(false, Ordering::SeqCst) { + callback(); + } + } else { + unreachable!() + }; + } + } + }) + }; + + Runner(inner, Some(handle)) + } + + pub fn timer(&self, callback: Box () + Send + 'static>) -> Timer { + let id = self.0.keys.fetch_add(1, Ordering::Relaxed); + let pending = Arc::new(AtomicBool::new(false)); + + assert!(id < u64::MAX, "wrapping of ids"); + + self.0 + .callback + .lock() + .insert(id, (pending.clone(), callback)); + + Timer { + id, + pending: pending, + runner: Arc::downgrade(&self.0.clone()), + cnt: AtomicUsize::new(0), + } + } +} + +impl Timer { + pub fn reset(&self, duration: Duration) { + if let Some(runner) = self.runner.upgrade() { + let mut wheel = runner.wheel.lock(); + let cnt = self.cnt.fetch_add(1, Ordering::SeqCst); + self.pending.store(true, Ordering::SeqCst); + wheel.stop((self.id, cnt)); + wheel.start((self.id, cnt + 1), duration - OFFSET); + } + } + + pub fn start(&self, duration: Duration) { + if self.pending.load(Ordering::Acquire) { + return; + } + + if let Some(runner) = self.runner.upgrade() { + let mut wheel = runner.wheel.lock(); + if !self.pending.swap(true, Ordering::SeqCst) { + let cnt = self.cnt.fetch_add(1, Ordering::SeqCst); + wheel.start((self.id, cnt + 1), duration - OFFSET); + } + } + } + + pub fn stop(&self) { + if self.pending.load(Ordering::Acquire) { + if let Some(runner) = self.runner.upgrade() { + let mut wheel = runner.wheel.lock(); + if self.pending.swap(false, Ordering::SeqCst) { + let cnt = self.cnt.load(Ordering::SeqCst); + wheel.stop((self.id, cnt)); + } + } + } + } +} + +impl Drop for Runner { + fn drop(&mut self) { + // stop the callback thread + self.0.running.store(false, Ordering::SeqCst); + if let Some(handle) = mem::replace(&mut self.1, None) { + handle.join().unwrap(); + } + } +} -- cgit v1.2.3-59-g8ed1b