diff options
-rw-r--r-- | Cargo.lock | 7 | ||||
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | src/main.rs | 1 | ||||
-rw-r--r-- | src/router/device.rs | 2 | ||||
-rw-r--r-- | src/router/workers.rs | 2 | ||||
-rw-r--r-- | src/timers/mod.rs | 1 | ||||
-rw-r--r-- | src/timers/timer.rs | 146 |
7 files changed, 158 insertions, 2 deletions
@@ -183,6 +183,11 @@ dependencies = [ ] [[package]] +name = "ferris" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] name = "filetime" version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1028,6 +1033,7 @@ dependencies = [ "byteorder 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "crossbeam-deque 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "digest 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", + "ferris 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", "generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)", "hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1115,6 +1121,7 @@ dependencies = [ "checksum crypto-mac 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4434400df11d95d556bac068ddfedd482915eb18fe8bea89bc80b6e4b1c179e5" "checksum curve25519-dalek 1.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "e1f8a6fc0376eb52dc18af94915cc04dfdf8353746c0e8c550ae683a0815e5c1" "checksum digest 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "05f47366984d3ad862010e22c7ce81a7dbcaebbdfb37241a620f8b6596ee135c" +"checksum ferris 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f1d5a00f0aaf50c99b8254add9a85fc834a046b82df1c544cd1dc0a404139f84" "checksum filetime 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "450537dc346f0c4d738dda31e790da1da5d4bd12145aad4da0d03d713cb3794f" "checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3" "checksum fuchsia-cprng 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" @@ -22,6 +22,7 @@ futures = "0.1.28" arraydeque = "^0.4" treebitmap = "^0.4" crossbeam-deque = "0.7" +ferris = "0.2.0" # consider replacement [dependencies.x25519-dalek] version = "^0.5" diff --git a/src/main.rs b/src/main.rs index eab4b61..b45bdcf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,6 +3,7 @@ mod constants; mod handshake; mod router; +mod timers; mod types; use std::sync::Arc; diff --git a/src/router/device.rs b/src/router/device.rs index d92250e..974d019 100644 --- a/src/router/device.rs +++ b/src/router/device.rs @@ -18,7 +18,7 @@ use super::super::types::KeyPair; use super::anti_replay::AntiReplay; use super::peer; use super::peer::{Peer, PeerInner}; -use super::workers; +use super::workers::worker_parallel; pub struct DeviceInner { pub stopped: AtomicBool, diff --git a/src/router/workers.rs b/src/router/workers.rs index da5b600..2f7977c 100644 --- a/src/router/workers.rs +++ b/src/router/workers.rs @@ -136,7 +136,7 @@ pub fn worker_outbound( } } -fn worker_parallel( +pub fn worker_parallel( stopped: Arc<AtomicBool>, // stop workers (device has been dropped) parked: Arc<AtomicBool>, // thread has been parked? local: Worker<JobParallel>, // local job queue (local to thread) diff --git a/src/timers/mod.rs b/src/timers/mod.rs new file mode 100644 index 0000000..0527b0c --- /dev/null +++ b/src/timers/mod.rs @@ -0,0 +1 @@ +mod timer;
\ No newline at end of file diff --git a/src/timers/timer.rs b/src/timers/timer.rs new file mode 100644 index 0000000..484a5c2 --- /dev/null +++ b/src/timers/timer.rs @@ -0,0 +1,146 @@ +use ferris::*; +use spin; +use std::collections::HashMap; +use std::mem; +use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; +use std::sync::{Arc, Weak}; +use std::thread; +use std::time::{Duration, Instant}; +use std::u64; + +type TimerID = u64; +type TimerKey = (u64, usize); +type Callback = (Arc<AtomicBool>, Box<dyn Fn() -> () + Send + 'static>); + +const ACCURACY: Duration = Duration::from_millis(100); +const OFFSET: Duration = Duration::from_millis(1000); + +struct RunnerInner { + keys: AtomicU64, + wheel: spin::Mutex<CopyWheel<TimerKey>>, + running: AtomicBool, + callback: spin::Mutex<HashMap<TimerID, Callback>>, +} + +pub struct Timer { + pending: Arc<AtomicBool>, + runner: Weak<RunnerInner>, + id: u64, + cnt: AtomicUsize, +} + +struct Runner(Arc<RunnerInner>, Option<thread::JoinHandle<()>>); + +impl Runner { + fn new() -> Self { + let inner = Arc::new(RunnerInner { + running: AtomicBool::new(true), + callback: spin::Mutex::new(HashMap::new()), + keys: AtomicU64::new(0), + wheel: spin::Mutex::new(CopyWheel::<TimerKey>::new(vec![ + Resolution::HundredMs, + Resolution::Sec, + Resolution::Min, + ])), + }); + + // start runner thread + let handle = { + let inner = inner.clone(); + thread::spawn(move || { + let mut next = Instant::now() + ACCURACY; + while inner.running.load(Ordering::Acquire) { + // sleep + let now = Instant::now(); + if next > now { + thread::sleep(next - now); + } + next = next + ACCURACY; + + // extract expired events + let expired = inner.wheel.lock().expire(); + + // handle expired events + for key in &expired { + if let Some((pending, callback)) = inner.callback.lock().get(&key.0) { + if pending.swap(false, Ordering::SeqCst) { + callback(); + } + } else { + unreachable!() + }; + } + } + }) + }; + + Runner(inner, Some(handle)) + } + + pub fn timer(&self, callback: Box<dyn Fn() -> () + Send + 'static>) -> Timer { + let id = self.0.keys.fetch_add(1, Ordering::Relaxed); + let pending = Arc::new(AtomicBool::new(false)); + + assert!(id < u64::MAX, "wrapping of ids"); + + self.0 + .callback + .lock() + .insert(id, (pending.clone(), callback)); + + Timer { + id, + pending: pending, + runner: Arc::downgrade(&self.0.clone()), + cnt: AtomicUsize::new(0), + } + } +} + +impl Timer { + pub fn reset(&self, duration: Duration) { + if let Some(runner) = self.runner.upgrade() { + let mut wheel = runner.wheel.lock(); + let cnt = self.cnt.fetch_add(1, Ordering::SeqCst); + self.pending.store(true, Ordering::SeqCst); + wheel.stop((self.id, cnt)); + wheel.start((self.id, cnt + 1), duration - OFFSET); + } + } + + pub fn start(&self, duration: Duration) { + if self.pending.load(Ordering::Acquire) { + return; + } + + if let Some(runner) = self.runner.upgrade() { + let mut wheel = runner.wheel.lock(); + if !self.pending.swap(true, Ordering::SeqCst) { + let cnt = self.cnt.fetch_add(1, Ordering::SeqCst); + wheel.start((self.id, cnt + 1), duration - OFFSET); + } + } + } + + pub fn stop(&self) { + if self.pending.load(Ordering::Acquire) { + if let Some(runner) = self.runner.upgrade() { + let mut wheel = runner.wheel.lock(); + if self.pending.swap(false, Ordering::SeqCst) { + let cnt = self.cnt.load(Ordering::SeqCst); + wheel.stop((self.id, cnt)); + } + } + } + } +} + +impl Drop for Runner { + fn drop(&mut self) { + // stop the callback thread + self.0.running.store(false, Ordering::SeqCst); + if let Some(handle) = mem::replace(&mut self.1, None) { + handle.join().unwrap(); + } + } +} |