From e0db9861bcf7194c29888c28184785f969199c38 Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Sat, 14 Dec 2019 13:37:51 +0100 Subject: Added profiler feature --- src/wireguard/router/constants.rs | 4 +++- src/wireguard/router/inbound.rs | 4 +++- src/wireguard/router/outbound.rs | 34 +++++++++++++++------------- src/wireguard/router/pool.rs | 47 ++++++++++++++++++++++++++++++--------- src/wireguard/router/runq.rs | 27 ++++++++++++++++++---- 5 files changed, 84 insertions(+), 32 deletions(-) (limited to 'src/wireguard/router') diff --git a/src/wireguard/router/constants.rs b/src/wireguard/router/constants.rs index 0ca824a..6129fd7 100644 --- a/src/wireguard/router/constants.rs +++ b/src/wireguard/router/constants.rs @@ -4,4 +4,6 @@ pub const MAX_STAGED_PACKETS: usize = 128; // performance constants -pub const WORKER_QUEUE_SIZE: usize = MAX_STAGED_PACKETS; +pub const PARALLEL_QUEUE_SIZE: usize = MAX_STAGED_PACKETS; +pub const INORDER_QUEUE_SIZE: usize = PARALLEL_QUEUE_SIZE; +pub const MAX_INORDER_CONSUME: usize = INORDER_QUEUE_SIZE; diff --git a/src/wireguard/router/inbound.rs b/src/wireguard/router/inbound.rs index 5a27c95..db6d3f3 100644 --- a/src/wireguard/router/inbound.rs +++ b/src/wireguard/router/inbound.rs @@ -1,3 +1,4 @@ +use super::constants::MAX_INORDER_CONSUME; use super::device::DecryptionState; use super::device::Device; use super::messages::TransportHeader; @@ -185,6 +186,7 @@ pub fn sequential>( // handle message from the peers inbound queue device.run_inbound.run(|peer| { - peer.inbound.handle(|body| work(&peer, body)); + peer.inbound + .handle(|body| work(&peer, body), MAX_INORDER_CONSUME) }); } diff --git a/src/wireguard/router/outbound.rs b/src/wireguard/router/outbound.rs index 9ecffd8..a555ecb 100644 --- a/src/wireguard/router/outbound.rs +++ b/src/wireguard/router/outbound.rs @@ -1,3 +1,4 @@ +use super::constants::MAX_INORDER_CONSUME; use super::device::Device; use super::messages::{TransportHeader, TYPE_TRANSPORT}; use super::peer::Peer; @@ -88,20 +89,23 @@ pub fn sequential>( device: Device, ) { device.run_outbound.run(|peer| { - peer.outbound.handle(|body| { - log::trace!("worker, sequential section, obtained job"); - - // send to peer - let xmit = peer.send(&body.msg[..]).is_ok(); - - // trigger callback - C::send( - &peer.opaque, - body.msg.len(), - xmit, - &body.keypair, - body.counter, - ); - }); + peer.outbound.handle( + |body| { + log::trace!("worker, sequential section, obtained job"); + + // send to peer + let xmit = peer.send(&body.msg[..]).is_ok(); + + // trigger callback + C::send( + &peer.opaque, + body.msg.len(), + xmit, + &body.keypair, + body.counter, + ); + }, + MAX_INORDER_CONSUME, + ) }); } diff --git a/src/wireguard/router/pool.rs b/src/wireguard/router/pool.rs index 07a9bfa..686c788 100644 --- a/src/wireguard/router/pool.rs +++ b/src/wireguard/router/pool.rs @@ -4,10 +4,9 @@ use std::mem; use std::sync::mpsc::Receiver; use std::sync::Arc; +use super::constants::INORDER_QUEUE_SIZE; use super::runq::{RunQueue, ToKey}; -const INORDER_QUEUE_SIZE: usize = 64; - pub struct InnerJob { // peer (used by worker to schedule/handle inorder queue), // when the peer is None, the job is complete @@ -52,28 +51,50 @@ pub struct InorderQueue { } impl InorderQueue { - pub fn send(&self, job: Job) -> bool { - self.queue.lock().push_back(job).is_ok() - } - pub fn new() -> InorderQueue { InorderQueue { queue: Mutex::new(ArrayDeque::new()), } } + /// Add a new job to the in-order queue + /// + /// # Arguments + /// + /// - `job`: The job added to the back of the queue + /// + /// # Returns + /// + /// True if the element was added, + /// false to indicate that the queue is full. + pub fn send(&self, job: Job) -> bool { + self.queue.lock().push_back(job).is_ok() + } + + /// Consume completed jobs from the in-order queue + /// + /// # Arguments + /// + /// - `f`: function to apply to the body of each jobof each job. + /// - `limit`: maximum number of jobs to handle before returning + /// + /// # Returns + /// + /// A boolean indicating if the limit was reached: + /// true indicating that the limit was reached, + /// while false implies that the queue is empty or an uncompleted job was reached. #[inline(always)] - pub fn handle(&self, f: F) { + pub fn handle(&self, f: F, mut limit: usize) -> bool { // take the mutex let mut queue = self.queue.lock(); - loop { + while limit > 0 { // attempt to extract front element let front = queue.pop_front(); let elem = match front { Some(elem) => elem, _ => { - return; + return false; } }; @@ -90,13 +111,17 @@ impl InorderQueue { // job not complete yet, return job to front if ret { queue.push_front(elem).unwrap(); - return; + return false; } + limit -= 1; } + + // did not complete all jobs + true } } -/// Allows easy construction of a semi-parallel worker. +/// Allows easy construction of a parallel worker. /// Applicable for both decryption and encryption workers. #[inline(always)] pub fn worker_parallel< diff --git a/src/wireguard/router/runq.rs b/src/wireguard/router/runq.rs index 936a53c..44e11a1 100644 --- a/src/wireguard/router/runq.rs +++ b/src/wireguard/router/runq.rs @@ -58,7 +58,21 @@ impl RunQueue { } } - pub fn run ()>(&self, f: F) { + /// Run (consume from) the run queue using the provided function. + /// The function should return wheter the given element should be rescheduled. + /// + /// # Arguments + /// + /// - `f` : function to apply to every element + /// + /// # Note + /// + /// The function f may be called again even when the element was not inserted back in to the + /// queue since the last applciation and no rescheduling was requested. + /// + /// This happens then the function handles all work for T, + /// but T is added to the run queue while the function is running. + pub fn run bool>(&self, f: F) { let mut inner = self.inner.lock().unwrap(); loop { // fetch next element @@ -86,10 +100,16 @@ impl RunQueue { mem::drop(inner); // drop guard // handle element - f(&elem); + let rerun = f(&elem); - // retake lock and check if should be added back to queue + // if the function requested a re-run add the element to the back of the queue inner = self.inner.lock().unwrap(); + if rerun { + inner.queue.push_back(elem); + continue; + } + + // otherwise check if new requests have come in since we ran the function match inner.members.entry(key) { Entry::Occupied(occ) => { if *occ.get() == old_n { @@ -111,7 +131,6 @@ impl RunQueue { #[cfg(test)] mod tests { use super::*; - use std::sync::Arc; use std::thread; use std::time::Duration; -- cgit v1.2.3-59-g8ed1b