diff options
author | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-12-14 13:37:51 +0100 |
---|---|---|
committer | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-12-14 13:37:51 +0100 |
commit | e0db9861bcf7194c29888c28184785f969199c38 (patch) | |
tree | 76c14e6ccf9bfac6880f1ce99ad1d96f06d62788 /src/wireguard/router | |
parent | Remove crossbeam dependency (diff) | |
download | wireguard-rs-e0db9861bcf7194c29888c28184785f969199c38.tar.xz wireguard-rs-e0db9861bcf7194c29888c28184785f969199c38.zip |
Added profiler feature
Diffstat (limited to '')
-rw-r--r-- | src/wireguard/router/constants.rs | 4 | ||||
-rw-r--r-- | src/wireguard/router/inbound.rs | 4 | ||||
-rw-r--r-- | src/wireguard/router/outbound.rs | 34 | ||||
-rw-r--r-- | src/wireguard/router/pool.rs | 47 | ||||
-rw-r--r-- | src/wireguard/router/runq.rs | 27 |
5 files changed, 84 insertions, 32 deletions
diff --git a/src/wireguard/router/constants.rs b/src/wireguard/router/constants.rs index 0ca824a..6129fd7 100644 --- a/src/wireguard/router/constants.rs +++ b/src/wireguard/router/constants.rs @@ -4,4 +4,6 @@ pub const MAX_STAGED_PACKETS: usize = 128; // performance constants -pub const WORKER_QUEUE_SIZE: usize = MAX_STAGED_PACKETS; +pub const PARALLEL_QUEUE_SIZE: usize = MAX_STAGED_PACKETS; +pub const INORDER_QUEUE_SIZE: usize = PARALLEL_QUEUE_SIZE; +pub const MAX_INORDER_CONSUME: usize = INORDER_QUEUE_SIZE; diff --git a/src/wireguard/router/inbound.rs b/src/wireguard/router/inbound.rs index 5a27c95..db6d3f3 100644 --- a/src/wireguard/router/inbound.rs +++ b/src/wireguard/router/inbound.rs @@ -1,3 +1,4 @@ +use super::constants::MAX_INORDER_CONSUME; use super::device::DecryptionState; use super::device::Device; use super::messages::TransportHeader; @@ -185,6 +186,7 @@ pub fn sequential<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>( // handle message from the peers inbound queue device.run_inbound.run(|peer| { - peer.inbound.handle(|body| work(&peer, body)); + peer.inbound + .handle(|body| work(&peer, body), MAX_INORDER_CONSUME) }); } diff --git a/src/wireguard/router/outbound.rs b/src/wireguard/router/outbound.rs index 9ecffd8..a555ecb 100644 --- a/src/wireguard/router/outbound.rs +++ b/src/wireguard/router/outbound.rs @@ -1,3 +1,4 @@ +use super::constants::MAX_INORDER_CONSUME; use super::device::Device; use super::messages::{TransportHeader, TYPE_TRANSPORT}; use super::peer::Peer; @@ -88,20 +89,23 @@ pub fn sequential<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>( device: Device<E, C, T, B>, ) { device.run_outbound.run(|peer| { - peer.outbound.handle(|body| { - log::trace!("worker, sequential section, obtained job"); - - // send to peer - let xmit = peer.send(&body.msg[..]).is_ok(); - - // trigger callback - C::send( - &peer.opaque, - body.msg.len(), - xmit, - &body.keypair, - body.counter, - ); - }); + peer.outbound.handle( + |body| { + log::trace!("worker, sequential section, obtained job"); + + // send to peer + let xmit = peer.send(&body.msg[..]).is_ok(); + + // trigger callback + C::send( + &peer.opaque, + body.msg.len(), + xmit, + &body.keypair, + body.counter, + ); + }, + MAX_INORDER_CONSUME, + ) }); } diff --git a/src/wireguard/router/pool.rs b/src/wireguard/router/pool.rs index 07a9bfa..686c788 100644 --- a/src/wireguard/router/pool.rs +++ b/src/wireguard/router/pool.rs @@ -4,10 +4,9 @@ use std::mem; use std::sync::mpsc::Receiver; use std::sync::Arc; +use super::constants::INORDER_QUEUE_SIZE; use super::runq::{RunQueue, ToKey}; -const INORDER_QUEUE_SIZE: usize = 64; - pub struct InnerJob<P, B> { // peer (used by worker to schedule/handle inorder queue), // when the peer is None, the job is complete @@ -52,28 +51,50 @@ pub struct InorderQueue<P, B> { } impl<P, B> InorderQueue<P, B> { - pub fn send(&self, job: Job<P, B>) -> bool { - self.queue.lock().push_back(job).is_ok() - } - pub fn new() -> InorderQueue<P, B> { InorderQueue { queue: Mutex::new(ArrayDeque::new()), } } + /// Add a new job to the in-order queue + /// + /// # Arguments + /// + /// - `job`: The job added to the back of the queue + /// + /// # Returns + /// + /// True if the element was added, + /// false to indicate that the queue is full. + pub fn send(&self, job: Job<P, B>) -> bool { + self.queue.lock().push_back(job).is_ok() + } + + /// Consume completed jobs from the in-order queue + /// + /// # Arguments + /// + /// - `f`: function to apply to the body of each jobof each job. + /// - `limit`: maximum number of jobs to handle before returning + /// + /// # Returns + /// + /// A boolean indicating if the limit was reached: + /// true indicating that the limit was reached, + /// while false implies that the queue is empty or an uncompleted job was reached. #[inline(always)] - pub fn handle<F: Fn(&mut B)>(&self, f: F) { + pub fn handle<F: Fn(&mut B)>(&self, f: F, mut limit: usize) -> bool { // take the mutex let mut queue = self.queue.lock(); - loop { + while limit > 0 { // attempt to extract front element let front = queue.pop_front(); let elem = match front { Some(elem) => elem, _ => { - return; + return false; } }; @@ -90,13 +111,17 @@ impl<P, B> InorderQueue<P, B> { // job not complete yet, return job to front if ret { queue.push_front(elem).unwrap(); - return; + return false; } + limit -= 1; } + + // did not complete all jobs + true } } -/// Allows easy construction of a semi-parallel worker. +/// Allows easy construction of a parallel worker. /// Applicable for both decryption and encryption workers. #[inline(always)] pub fn worker_parallel< diff --git a/src/wireguard/router/runq.rs b/src/wireguard/router/runq.rs index 936a53c..44e11a1 100644 --- a/src/wireguard/router/runq.rs +++ b/src/wireguard/router/runq.rs @@ -58,7 +58,21 @@ impl<T: ToKey> RunQueue<T> { } } - pub fn run<F: Fn(&T) -> ()>(&self, f: F) { + /// Run (consume from) the run queue using the provided function. + /// The function should return wheter the given element should be rescheduled. + /// + /// # Arguments + /// + /// - `f` : function to apply to every element + /// + /// # Note + /// + /// The function f may be called again even when the element was not inserted back in to the + /// queue since the last applciation and no rescheduling was requested. + /// + /// This happens then the function handles all work for T, + /// but T is added to the run queue while the function is running. + pub fn run<F: Fn(&T) -> bool>(&self, f: F) { let mut inner = self.inner.lock().unwrap(); loop { // fetch next element @@ -86,10 +100,16 @@ impl<T: ToKey> RunQueue<T> { mem::drop(inner); // drop guard // handle element - f(&elem); + let rerun = f(&elem); - // retake lock and check if should be added back to queue + // if the function requested a re-run add the element to the back of the queue inner = self.inner.lock().unwrap(); + if rerun { + inner.queue.push_back(elem); + continue; + } + + // otherwise check if new requests have come in since we ran the function match inner.members.entry(key) { Entry::Occupied(occ) => { if *occ.get() == old_n { @@ -111,7 +131,6 @@ impl<T: ToKey> RunQueue<T> { #[cfg(test)] mod tests { use super::*; - use std::sync::Arc; use std::thread; use std::time::Duration; |