aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2019-08-22 14:59:15 +0200
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2019-08-22 14:59:15 +0200
commitc5aba7183dc7c893ad42d41d9cac19e782715533 (patch)
tree4448808063f73dec33b989546dae450fe9a2588b
parentEnsure peer threads are stopped on drop (diff)
downloadwireguard-rs-c5aba7183dc7c893ad42d41d9cac19e782715533.tar.xz
wireguard-rs-c5aba7183dc7c893ad42d41d9cac19e782715533.zip
Initial version of timer framework
-rw-r--r--Cargo.lock7
-rw-r--r--Cargo.toml1
-rw-r--r--src/main.rs1
-rw-r--r--src/router/device.rs2
-rw-r--r--src/router/workers.rs2
-rw-r--r--src/timers/mod.rs1
-rw-r--r--src/timers/timer.rs146
7 files changed, 158 insertions, 2 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 9aad567..d20e51f 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -183,6 +183,11 @@ dependencies = [
]
[[package]]
+name = "ferris"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
name = "filetime"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1028,6 +1033,7 @@ dependencies = [
"byteorder 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-deque 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"digest 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "ferris 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)",
"generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)",
"hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -1115,6 +1121,7 @@ dependencies = [
"checksum crypto-mac 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4434400df11d95d556bac068ddfedd482915eb18fe8bea89bc80b6e4b1c179e5"
"checksum curve25519-dalek 1.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "e1f8a6fc0376eb52dc18af94915cc04dfdf8353746c0e8c550ae683a0815e5c1"
"checksum digest 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "05f47366984d3ad862010e22c7ce81a7dbcaebbdfb37241a620f8b6596ee135c"
+"checksum ferris 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f1d5a00f0aaf50c99b8254add9a85fc834a046b82df1c544cd1dc0a404139f84"
"checksum filetime 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "450537dc346f0c4d738dda31e790da1da5d4bd12145aad4da0d03d713cb3794f"
"checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3"
"checksum fuchsia-cprng 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba"
diff --git a/Cargo.toml b/Cargo.toml
index 82d31ec..590965c 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -22,6 +22,7 @@ futures = "0.1.28"
arraydeque = "^0.4"
treebitmap = "^0.4"
crossbeam-deque = "0.7"
+ferris = "0.2.0" # consider replacement
[dependencies.x25519-dalek]
version = "^0.5"
diff --git a/src/main.rs b/src/main.rs
index eab4b61..b45bdcf 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -3,6 +3,7 @@
mod constants;
mod handshake;
mod router;
+mod timers;
mod types;
use std::sync::Arc;
diff --git a/src/router/device.rs b/src/router/device.rs
index d92250e..974d019 100644
--- a/src/router/device.rs
+++ b/src/router/device.rs
@@ -18,7 +18,7 @@ use super::super::types::KeyPair;
use super::anti_replay::AntiReplay;
use super::peer;
use super::peer::{Peer, PeerInner};
-use super::workers;
+use super::workers::worker_parallel;
pub struct DeviceInner {
pub stopped: AtomicBool,
diff --git a/src/router/workers.rs b/src/router/workers.rs
index da5b600..2f7977c 100644
--- a/src/router/workers.rs
+++ b/src/router/workers.rs
@@ -136,7 +136,7 @@ pub fn worker_outbound(
}
}
-fn worker_parallel(
+pub fn worker_parallel(
stopped: Arc<AtomicBool>, // stop workers (device has been dropped)
parked: Arc<AtomicBool>, // thread has been parked?
local: Worker<JobParallel>, // local job queue (local to thread)
diff --git a/src/timers/mod.rs b/src/timers/mod.rs
new file mode 100644
index 0000000..0527b0c
--- /dev/null
+++ b/src/timers/mod.rs
@@ -0,0 +1 @@
+mod timer; \ No newline at end of file
diff --git a/src/timers/timer.rs b/src/timers/timer.rs
new file mode 100644
index 0000000..484a5c2
--- /dev/null
+++ b/src/timers/timer.rs
@@ -0,0 +1,146 @@
+use ferris::*;
+use spin;
+use std::collections::HashMap;
+use std::mem;
+use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
+use std::sync::{Arc, Weak};
+use std::thread;
+use std::time::{Duration, Instant};
+use std::u64;
+
+type TimerID = u64;
+type TimerKey = (u64, usize);
+type Callback = (Arc<AtomicBool>, Box<dyn Fn() -> () + Send + 'static>);
+
+const ACCURACY: Duration = Duration::from_millis(100);
+const OFFSET: Duration = Duration::from_millis(1000);
+
+struct RunnerInner {
+ keys: AtomicU64,
+ wheel: spin::Mutex<CopyWheel<TimerKey>>,
+ running: AtomicBool,
+ callback: spin::Mutex<HashMap<TimerID, Callback>>,
+}
+
+pub struct Timer {
+ pending: Arc<AtomicBool>,
+ runner: Weak<RunnerInner>,
+ id: u64,
+ cnt: AtomicUsize,
+}
+
+struct Runner(Arc<RunnerInner>, Option<thread::JoinHandle<()>>);
+
+impl Runner {
+ fn new() -> Self {
+ let inner = Arc::new(RunnerInner {
+ running: AtomicBool::new(true),
+ callback: spin::Mutex::new(HashMap::new()),
+ keys: AtomicU64::new(0),
+ wheel: spin::Mutex::new(CopyWheel::<TimerKey>::new(vec![
+ Resolution::HundredMs,
+ Resolution::Sec,
+ Resolution::Min,
+ ])),
+ });
+
+ // start runner thread
+ let handle = {
+ let inner = inner.clone();
+ thread::spawn(move || {
+ let mut next = Instant::now() + ACCURACY;
+ while inner.running.load(Ordering::Acquire) {
+ // sleep
+ let now = Instant::now();
+ if next > now {
+ thread::sleep(next - now);
+ }
+ next = next + ACCURACY;
+
+ // extract expired events
+ let expired = inner.wheel.lock().expire();
+
+ // handle expired events
+ for key in &expired {
+ if let Some((pending, callback)) = inner.callback.lock().get(&key.0) {
+ if pending.swap(false, Ordering::SeqCst) {
+ callback();
+ }
+ } else {
+ unreachable!()
+ };
+ }
+ }
+ })
+ };
+
+ Runner(inner, Some(handle))
+ }
+
+ pub fn timer(&self, callback: Box<dyn Fn() -> () + Send + 'static>) -> Timer {
+ let id = self.0.keys.fetch_add(1, Ordering::Relaxed);
+ let pending = Arc::new(AtomicBool::new(false));
+
+ assert!(id < u64::MAX, "wrapping of ids");
+
+ self.0
+ .callback
+ .lock()
+ .insert(id, (pending.clone(), callback));
+
+ Timer {
+ id,
+ pending: pending,
+ runner: Arc::downgrade(&self.0.clone()),
+ cnt: AtomicUsize::new(0),
+ }
+ }
+}
+
+impl Timer {
+ pub fn reset(&self, duration: Duration) {
+ if let Some(runner) = self.runner.upgrade() {
+ let mut wheel = runner.wheel.lock();
+ let cnt = self.cnt.fetch_add(1, Ordering::SeqCst);
+ self.pending.store(true, Ordering::SeqCst);
+ wheel.stop((self.id, cnt));
+ wheel.start((self.id, cnt + 1), duration - OFFSET);
+ }
+ }
+
+ pub fn start(&self, duration: Duration) {
+ if self.pending.load(Ordering::Acquire) {
+ return;
+ }
+
+ if let Some(runner) = self.runner.upgrade() {
+ let mut wheel = runner.wheel.lock();
+ if !self.pending.swap(true, Ordering::SeqCst) {
+ let cnt = self.cnt.fetch_add(1, Ordering::SeqCst);
+ wheel.start((self.id, cnt + 1), duration - OFFSET);
+ }
+ }
+ }
+
+ pub fn stop(&self) {
+ if self.pending.load(Ordering::Acquire) {
+ if let Some(runner) = self.runner.upgrade() {
+ let mut wheel = runner.wheel.lock();
+ if self.pending.swap(false, Ordering::SeqCst) {
+ let cnt = self.cnt.load(Ordering::SeqCst);
+ wheel.stop((self.id, cnt));
+ }
+ }
+ }
+ }
+}
+
+impl Drop for Runner {
+ fn drop(&mut self) {
+ // stop the callback thread
+ self.0.running.store(false, Ordering::SeqCst);
+ if let Some(handle) = mem::replace(&mut self.1, None) {
+ handle.join().unwrap();
+ }
+ }
+}