From 106c5e8b5c865c8396f824f4f5aa14d1bf0952b1 Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Sun, 16 Feb 2020 18:12:43 +0100 Subject: Work on router optimizations --- src/wireguard/router/runq.rs | 164 ------------------------------------------- 1 file changed, 164 deletions(-) delete mode 100644 src/wireguard/router/runq.rs (limited to 'src/wireguard/router/runq.rs') diff --git a/src/wireguard/router/runq.rs b/src/wireguard/router/runq.rs deleted file mode 100644 index 44e11a1..0000000 --- a/src/wireguard/router/runq.rs +++ /dev/null @@ -1,164 +0,0 @@ -use std::hash::Hash; -use std::mem; -use std::sync::{Condvar, Mutex}; - -use std::collections::hash_map::Entry; -use std::collections::HashMap; -use std::collections::VecDeque; - -pub trait ToKey { - type Key: Hash + Eq; - fn to_key(&self) -> Self::Key; -} - -pub struct RunQueue { - cvar: Condvar, - inner: Mutex>, -} - -struct Inner { - stop: bool, - queue: VecDeque, - members: HashMap, -} - -impl RunQueue { - pub fn close(&self) { - let mut inner = self.inner.lock().unwrap(); - inner.stop = true; - self.cvar.notify_all(); - } - - pub fn new() -> RunQueue { - RunQueue { - cvar: Condvar::new(), - inner: Mutex::new(Inner { - stop: false, - queue: VecDeque::new(), - members: HashMap::new(), - }), - } - } - - pub fn insert(&self, v: T) { - let key = v.to_key(); - let mut inner = self.inner.lock().unwrap(); - match inner.members.entry(key) { - Entry::Occupied(mut elem) => { - *elem.get_mut() += 1; - } - Entry::Vacant(spot) => { - // add entry to back of queue - spot.insert(0); - inner.queue.push_back(v); - - // wake a thread - self.cvar.notify_one(); - } - } - } - - /// Run (consume from) the run queue using the provided function. - /// The function should return wheter the given element should be rescheduled. - /// - /// # Arguments - /// - /// - `f` : function to apply to every element - /// - /// # Note - /// - /// The function f may be called again even when the element was not inserted back in to the - /// queue since the last applciation and no rescheduling was requested. - /// - /// This happens then the function handles all work for T, - /// but T is added to the run queue while the function is running. - pub fn run bool>(&self, f: F) { - let mut inner = self.inner.lock().unwrap(); - loop { - // fetch next element - let elem = loop { - // run-queue closed - if inner.stop { - return; - } - - // try to pop from queue - match inner.queue.pop_front() { - Some(elem) => { - break elem; - } - None => (), - }; - - // wait for an element to be inserted - inner = self.cvar.wait(inner).unwrap(); - }; - - // fetch current request number - let key = elem.to_key(); - let old_n = *inner.members.get(&key).unwrap(); - mem::drop(inner); // drop guard - - // handle element - let rerun = f(&elem); - - // if the function requested a re-run add the element to the back of the queue - inner = self.inner.lock().unwrap(); - if rerun { - inner.queue.push_back(elem); - continue; - } - - // otherwise check if new requests have come in since we ran the function - match inner.members.entry(key) { - Entry::Occupied(occ) => { - if *occ.get() == old_n { - // no new requests since last, remove entry. - occ.remove(); - } else { - // new requests, reschedule. - inner.queue.push_back(elem); - } - } - Entry::Vacant(_) => { - unreachable!(); - } - } - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::thread; - use std::time::Duration; - - /* - #[test] - fn test_wait() { - let queue: Arc> = Arc::new(RunQueue::new()); - - { - let queue = queue.clone(); - thread::spawn(move || { - queue.run(|e| { - println!("t0 {}", e); - thread::sleep(Duration::from_millis(100)); - }) - }); - } - - { - let queue = queue.clone(); - thread::spawn(move || { - queue.run(|e| { - println!("t1 {}", e); - thread::sleep(Duration::from_millis(100)); - }) - }); - } - - } - */ -} -- cgit v1.2.3-59-g8ed1b