diff options
author | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-12-09 13:21:12 +0100 |
---|---|---|
committer | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-12-09 13:21:12 +0100 |
commit | 115fa574a807877594c3b8cf467798fc0524d007 (patch) | |
tree | 07d487f3f8ef130d17536fb93b02f937b7bf66ae /src/wireguard/router/pool.rs | |
parent | Fixed inbound job bug (add to sequential queue) (diff) | |
download | wireguard-rs-115fa574a807877594c3b8cf467798fc0524d007.tar.xz wireguard-rs-115fa574a807877594c3b8cf467798fc0524d007.zip |
Move to run queue
Diffstat (limited to '')
-rw-r--r-- | src/wireguard/router/pool.rs | 77 |
1 files changed, 41 insertions, 36 deletions
diff --git a/src/wireguard/router/pool.rs b/src/wireguard/router/pool.rs index 9c72372..98b1144 100644 --- a/src/wireguard/router/pool.rs +++ b/src/wireguard/router/pool.rs @@ -2,6 +2,9 @@ use arraydeque::ArrayDeque; use spin::{Mutex, MutexGuard}; use std::sync::mpsc::Receiver; use std::sync::Arc; +use std::mem; + +use super::runq::{RunQueue, ToKey}; const INORDER_QUEUE_SIZE: usize = 64; @@ -60,51 +63,53 @@ impl<P, B> InorderQueue<P, B> { } #[inline(always)] - pub fn handle<F: Fn(&mut InnerJob<P, B>)>(&self, f: F) { + pub fn handle<F: Fn(&mut B)>(&self, f: F) { // take the mutex let mut queue = self.queue.lock(); - // handle all complete messages - while queue - .pop_front() - .and_then(|j| { - // check if job is complete - let ret = if let Some(mut guard) = j.complete() { - f(&mut *guard); - false - } else { - true - }; - - // return job to cyclic buffer if not complete - if ret { - let _res = queue.push_front(j); - debug_assert!(_res.is_ok()); - None - } else { - // add job back to pool - Some(()) + loop { + // attempt to extract front element + let front = queue.pop_front(); + let elem = match front { + Some(elem) => elem, + _ => { + return; } - }) - .is_some() - {} + }; + + // apply function if job complete + let ret = if let Some(mut guard) = elem.complete() { + mem::drop(queue); + f(&mut guard.body); + queue = self.queue.lock(); + false + } else { + true + }; + + // job not complete yet, return job to front + if ret { + queue.push_front(elem).unwrap(); + return; + } + } } } /// Allows easy construction of a semi-parallel worker. /// Applicable for both decryption and encryption workers. #[inline(always)] -pub fn worker_template< - P, // represents a peer (atomic reference counted pointer) +pub fn worker_parallel< + P : ToKey, // represents a peer (atomic reference counted pointer) B, // inner body type (message buffer, key material, ...) + D, // device W: Fn(&P, &mut B), - S: Fn(&P, &mut B), - Q: Fn(&P) -> &InorderQueue<P, B>, + Q: Fn(&D) -> &RunQueue<P>, >( - receiver: Receiver<Job<P, B>>, // receiever for new jobs - work_parallel: W, // perform parallel / out-of-order work on peer - work_sequential: S, // perform sequential work on peer - queue: Q, // resolve a peer to an inorder queue + device: D, + queue: Q, + receiver: Receiver<Job<P, B>>, + work: W, ) { log::trace!("router worker started"); loop { @@ -123,11 +128,11 @@ pub fn worker_template< let peer = job.peer.take().unwrap(); // process job - work_parallel(&peer, &mut job.body); + work(&peer, &mut job.body); peer }; - + // process inorder jobs for peer - queue(&peer).handle(|j| work_sequential(&peer, &mut j.body)); + queue(&device).insert(peer); } -} +}
\ No newline at end of file |