From 2272e5250f7be8d28f4a9b5f75798da05ac85769 Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Sat, 24 Aug 2019 23:41:05 +0200 Subject: Move to hjul crate Moved timer code into seperate crate (`hjul'). --- src/handshake/device.rs | 5 +- src/main.rs | 1 - src/router/peer.rs | 3 + src/timers/mod.rs | 4 -- src/timers/peer.rs | 8 --- src/timers/timer.rs | 173 ------------------------------------------------ 6 files changed, 7 insertions(+), 187 deletions(-) delete mode 100644 src/timers/mod.rs delete mode 100644 src/timers/peer.rs delete mode 100644 src/timers/timer.rs (limited to 'src') diff --git a/src/handshake/device.rs b/src/handshake/device.rs index d29c41b..f439414 100644 --- a/src/handshake/device.rs +++ b/src/handshake/device.rs @@ -202,7 +202,10 @@ where rng: &mut R, // rng instance to sample randomness from msg: &[u8], // message buffer src: Option<&'a S>, // optional source endpoint, set when "under load" - ) -> Result, HandshakeError> where &'a S: Into<&'a SocketAddr> { + ) -> Result, HandshakeError> + where + &'a S: Into<&'a SocketAddr>, + { match msg.get(0) { Some(&TYPE_INITIATION) => { // parse message diff --git a/src/main.rs b/src/main.rs index b45bdcf..eab4b61 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,6 @@ mod constants; mod handshake; mod router; -mod timers; mod types; use std::sync::Arc; diff --git a/src/router/peer.rs b/src/router/peer.rs index 1edb635..9c1721a 100644 --- a/src/router/peer.rs +++ b/src/router/peer.rs @@ -314,4 +314,7 @@ impl Peer { )); res } + + pub fn send(&self, msg : Vec) { + } } diff --git a/src/timers/mod.rs b/src/timers/mod.rs deleted file mode 100644 index 0bac45a..0000000 --- a/src/timers/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -mod peer; -mod timer; - -pub use timer::{Timer, Runner}; \ No newline at end of file diff --git a/src/timers/peer.rs b/src/timers/peer.rs deleted file mode 100644 index 9859e6b..0000000 --- a/src/timers/peer.rs +++ /dev/null @@ -1,8 +0,0 @@ -use super::timer::Timer; - -struct PeerTimers { - pub send_keepalive: Timer, - pub new_handshake: Timer, - pub zero_key_material: Timer, - pub persistent_keepalive: Timer, -} diff --git a/src/timers/timer.rs b/src/timers/timer.rs deleted file mode 100644 index a0774ac..0000000 --- a/src/timers/timer.rs +++ /dev/null @@ -1,173 +0,0 @@ -use ferris::*; -use spin; -use std::collections::HashMap; -use std::mem; -use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; -use std::sync::{Arc, Weak}; -use std::thread; -use std::time::{Duration, Instant}; -use std::u64; - -extern crate test; - -type TimerID = u64; -type TimerKey = (u64, usize); -type Callback = (Weak, Box () + Send + 'static>); - -const ACCURACY: Duration = Duration::from_millis(100); -const OFFSET: Duration = Duration::from_millis(1000); - -struct RunnerInner { - keys: AtomicU64, - wheel: spin::Mutex>, - running: AtomicBool, - callback: spin::Mutex>, -} - -struct TimerInner { - id: u64, - pending: AtomicBool, - runner: Weak, - cnt: AtomicUsize, -} - -#[derive(Clone)] -pub struct Timer(Arc); - -pub struct Runner(Arc, Option>); - -impl Runner { - pub fn new() -> Self { - let inner = Arc::new(RunnerInner { - running: AtomicBool::new(true), - callback: spin::Mutex::new(HashMap::new()), - keys: AtomicU64::new(0), - wheel: spin::Mutex::new(CopyWheel::::new(vec![ - Resolution::HundredMs, - Resolution::Sec, - Resolution::Min, - ])), - }); - - // start runner thread - let handle = { - let inner = inner.clone(); - thread::spawn(move || { - let mut next = Instant::now() + ACCURACY; - while inner.running.load(Ordering::Acquire) { - // sleep for 1 tick - let now = Instant::now(); - if next > now { - thread::sleep(next - now); - } - next = now + ACCURACY; - - // extract expired events - let expired = inner.wheel.lock().expire(); - - // handle expired events - for key in &expired { - let callbacks = inner.callback.lock(); - let (timer, callback) = callbacks.get(&key.0).unwrap(); - if let Some(timer) = timer.upgrade() { - if timer.pending.swap(false, Ordering::SeqCst) { - callback(); - } - } - } - } - }) - }; - - Runner(inner, Some(handle)) - } - - pub fn timer(&self, callback: Box () + Send + 'static>) -> Timer { - let id = self.0.keys.fetch_add(1, Ordering::Relaxed); - let inner = Arc::new(TimerInner { - id, - pending: AtomicBool::new(false), - runner: Arc::downgrade(&self.0.clone()), - cnt: AtomicUsize::new(0), - }); - - assert!(id < u64::MAX, "wrapping of ids"); - - self.0 - .callback - .lock() - .insert(id, (Arc::downgrade(&inner), callback)); - - Timer(inner) - } -} - -impl Timer { - pub fn reset(&self, duration: Duration) { - let timer = &self.0; - if let Some(runner) = timer.runner.upgrade() { - let mut wheel = runner.wheel.lock(); - let cnt = timer.cnt.fetch_add(1, Ordering::SeqCst); - timer.pending.store(true, Ordering::SeqCst); - wheel.stop((timer.id, cnt)); - wheel.start((timer.id, cnt + 1), duration - OFFSET); - } - } - - pub fn start(&self, duration: Duration) { - let timer = &self.0; - if timer.pending.load(Ordering::Acquire) { - return; - } - - if let Some(runner) = timer.runner.upgrade() { - let mut wheel = runner.wheel.lock(); - if !timer.pending.swap(true, Ordering::SeqCst) { - let cnt = timer.cnt.fetch_add(1, Ordering::SeqCst); - wheel.start((timer.id, cnt + 1), duration - OFFSET); - } - } - } - - pub fn stop(&self) { - let timer = &self.0; - if timer.pending.load(Ordering::Acquire) { - if let Some(runner) = timer.runner.upgrade() { - let mut wheel = runner.wheel.lock(); - if timer.pending.swap(false, Ordering::SeqCst) { - let cnt = timer.cnt.load(Ordering::SeqCst); - wheel.stop((timer.id, cnt)); - } - } - } - } -} - -impl Drop for Runner { - fn drop(&mut self) { - self.0.running.store(false, Ordering::SeqCst); - if let Some(handle) = mem::replace(&mut self.1, None) { - handle.join().unwrap(); - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use test::Bencher; - - #[bench] - fn bench_reset(b: &mut Bencher) { - let runner = Runner::new(); - let timer = runner.timer(Box::new(|| {})); - b.iter(|| timer.reset(Duration::from_millis(1000))); - } - - #[bench] - fn bench_start(b: &mut Bencher) { - let runner = Runner::new(); - let timer = runner.timer(Box::new(|| {})); - b.iter(|| timer.start(Duration::from_millis(1000))); - } -} -- cgit v1.2.3-59-g8ed1b