diff options
author | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-08-22 14:59:15 +0200 |
---|---|---|
committer | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-08-22 14:59:15 +0200 |
commit | c5aba7183dc7c893ad42d41d9cac19e782715533 (patch) | |
tree | 4448808063f73dec33b989546dae450fe9a2588b /src | |
parent | Ensure peer threads are stopped on drop (diff) | |
download | wireguard-rs-c5aba7183dc7c893ad42d41d9cac19e782715533.tar.xz wireguard-rs-c5aba7183dc7c893ad42d41d9cac19e782715533.zip |
Initial version of timer framework
Diffstat (limited to 'src')
-rw-r--r-- | src/main.rs | 1 | ||||
-rw-r--r-- | src/router/device.rs | 2 | ||||
-rw-r--r-- | src/router/workers.rs | 2 | ||||
-rw-r--r-- | src/timers/mod.rs | 1 | ||||
-rw-r--r-- | src/timers/timer.rs | 146 |
5 files changed, 150 insertions, 2 deletions
diff --git a/src/main.rs b/src/main.rs index eab4b61..b45bdcf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,6 +3,7 @@ mod constants; mod handshake; mod router; +mod timers; mod types; use std::sync::Arc; diff --git a/src/router/device.rs b/src/router/device.rs index d92250e..974d019 100644 --- a/src/router/device.rs +++ b/src/router/device.rs @@ -18,7 +18,7 @@ use super::super::types::KeyPair; use super::anti_replay::AntiReplay; use super::peer; use super::peer::{Peer, PeerInner}; -use super::workers; +use super::workers::worker_parallel; pub struct DeviceInner { pub stopped: AtomicBool, diff --git a/src/router/workers.rs b/src/router/workers.rs index da5b600..2f7977c 100644 --- a/src/router/workers.rs +++ b/src/router/workers.rs @@ -136,7 +136,7 @@ pub fn worker_outbound( } } -fn worker_parallel( +pub fn worker_parallel( stopped: Arc<AtomicBool>, // stop workers (device has been dropped) parked: Arc<AtomicBool>, // thread has been parked? local: Worker<JobParallel>, // local job queue (local to thread) diff --git a/src/timers/mod.rs b/src/timers/mod.rs new file mode 100644 index 0000000..0527b0c --- /dev/null +++ b/src/timers/mod.rs @@ -0,0 +1 @@ +mod timer;
\ No newline at end of file diff --git a/src/timers/timer.rs b/src/timers/timer.rs new file mode 100644 index 0000000..484a5c2 --- /dev/null +++ b/src/timers/timer.rs @@ -0,0 +1,146 @@ +use ferris::*; +use spin; +use std::collections::HashMap; +use std::mem; +use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; +use std::sync::{Arc, Weak}; +use std::thread; +use std::time::{Duration, Instant}; +use std::u64; + +type TimerID = u64; +type TimerKey = (u64, usize); +type Callback = (Arc<AtomicBool>, Box<dyn Fn() -> () + Send + 'static>); + +const ACCURACY: Duration = Duration::from_millis(100); +const OFFSET: Duration = Duration::from_millis(1000); + +struct RunnerInner { + keys: AtomicU64, + wheel: spin::Mutex<CopyWheel<TimerKey>>, + running: AtomicBool, + callback: spin::Mutex<HashMap<TimerID, Callback>>, +} + +pub struct Timer { + pending: Arc<AtomicBool>, + runner: Weak<RunnerInner>, + id: u64, + cnt: AtomicUsize, +} + +struct Runner(Arc<RunnerInner>, Option<thread::JoinHandle<()>>); + +impl Runner { + fn new() -> Self { + let inner = Arc::new(RunnerInner { + running: AtomicBool::new(true), + callback: spin::Mutex::new(HashMap::new()), + keys: AtomicU64::new(0), + wheel: spin::Mutex::new(CopyWheel::<TimerKey>::new(vec![ + Resolution::HundredMs, + Resolution::Sec, + Resolution::Min, + ])), + }); + + // start runner thread + let handle = { + let inner = inner.clone(); + thread::spawn(move || { + let mut next = Instant::now() + ACCURACY; + while inner.running.load(Ordering::Acquire) { + // sleep + let now = Instant::now(); + if next > now { + thread::sleep(next - now); + } + next = next + ACCURACY; + + // extract expired events + let expired = inner.wheel.lock().expire(); + + // handle expired events + for key in &expired { + if let Some((pending, callback)) = inner.callback.lock().get(&key.0) { + if pending.swap(false, Ordering::SeqCst) { + callback(); + } + } else { + unreachable!() + }; + } + } + }) + }; + + Runner(inner, Some(handle)) + } + + pub fn timer(&self, callback: Box<dyn Fn() -> () + Send + 'static>) -> Timer { + let id = self.0.keys.fetch_add(1, Ordering::Relaxed); + let pending = Arc::new(AtomicBool::new(false)); + + assert!(id < u64::MAX, "wrapping of ids"); + + self.0 + .callback + .lock() + .insert(id, (pending.clone(), callback)); + + Timer { + id, + pending: pending, + runner: Arc::downgrade(&self.0.clone()), + cnt: AtomicUsize::new(0), + } + } +} + +impl Timer { + pub fn reset(&self, duration: Duration) { + if let Some(runner) = self.runner.upgrade() { + let mut wheel = runner.wheel.lock(); + let cnt = self.cnt.fetch_add(1, Ordering::SeqCst); + self.pending.store(true, Ordering::SeqCst); + wheel.stop((self.id, cnt)); + wheel.start((self.id, cnt + 1), duration - OFFSET); + } + } + + pub fn start(&self, duration: Duration) { + if self.pending.load(Ordering::Acquire) { + return; + } + + if let Some(runner) = self.runner.upgrade() { + let mut wheel = runner.wheel.lock(); + if !self.pending.swap(true, Ordering::SeqCst) { + let cnt = self.cnt.fetch_add(1, Ordering::SeqCst); + wheel.start((self.id, cnt + 1), duration - OFFSET); + } + } + } + + pub fn stop(&self) { + if self.pending.load(Ordering::Acquire) { + if let Some(runner) = self.runner.upgrade() { + let mut wheel = runner.wheel.lock(); + if self.pending.swap(false, Ordering::SeqCst) { + let cnt = self.cnt.load(Ordering::SeqCst); + wheel.stop((self.id, cnt)); + } + } + } + } +} + +impl Drop for Runner { + fn drop(&mut self) { + // stop the callback thread + self.0.running.store(false, Ordering::SeqCst); + if let Some(handle) = mem::replace(&mut self.1, None) { + handle.join().unwrap(); + } + } +} |