aboutsummaryrefslogtreecommitdiffstats
path: root/src/wireguard/router
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2019-12-14 13:37:51 +0100
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2019-12-14 13:37:51 +0100
commite0db9861bcf7194c29888c28184785f969199c38 (patch)
tree76c14e6ccf9bfac6880f1ce99ad1d96f06d62788 /src/wireguard/router
parentRemove crossbeam dependency (diff)
downloadwireguard-rs-e0db9861bcf7194c29888c28184785f969199c38.tar.xz
wireguard-rs-e0db9861bcf7194c29888c28184785f969199c38.zip
Added profiler feature
Diffstat (limited to '')
-rw-r--r--src/wireguard/router/constants.rs4
-rw-r--r--src/wireguard/router/inbound.rs4
-rw-r--r--src/wireguard/router/outbound.rs34
-rw-r--r--src/wireguard/router/pool.rs47
-rw-r--r--src/wireguard/router/runq.rs27
5 files changed, 84 insertions, 32 deletions
diff --git a/src/wireguard/router/constants.rs b/src/wireguard/router/constants.rs
index 0ca824a..6129fd7 100644
--- a/src/wireguard/router/constants.rs
+++ b/src/wireguard/router/constants.rs
@@ -4,4 +4,6 @@ pub const MAX_STAGED_PACKETS: usize = 128;
// performance constants
-pub const WORKER_QUEUE_SIZE: usize = MAX_STAGED_PACKETS;
+pub const PARALLEL_QUEUE_SIZE: usize = MAX_STAGED_PACKETS;
+pub const INORDER_QUEUE_SIZE: usize = PARALLEL_QUEUE_SIZE;
+pub const MAX_INORDER_CONSUME: usize = INORDER_QUEUE_SIZE;
diff --git a/src/wireguard/router/inbound.rs b/src/wireguard/router/inbound.rs
index 5a27c95..db6d3f3 100644
--- a/src/wireguard/router/inbound.rs
+++ b/src/wireguard/router/inbound.rs
@@ -1,3 +1,4 @@
+use super::constants::MAX_INORDER_CONSUME;
use super::device::DecryptionState;
use super::device::Device;
use super::messages::TransportHeader;
@@ -185,6 +186,7 @@ pub fn sequential<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>(
// handle message from the peers inbound queue
device.run_inbound.run(|peer| {
- peer.inbound.handle(|body| work(&peer, body));
+ peer.inbound
+ .handle(|body| work(&peer, body), MAX_INORDER_CONSUME)
});
}
diff --git a/src/wireguard/router/outbound.rs b/src/wireguard/router/outbound.rs
index 9ecffd8..a555ecb 100644
--- a/src/wireguard/router/outbound.rs
+++ b/src/wireguard/router/outbound.rs
@@ -1,3 +1,4 @@
+use super::constants::MAX_INORDER_CONSUME;
use super::device::Device;
use super::messages::{TransportHeader, TYPE_TRANSPORT};
use super::peer::Peer;
@@ -88,20 +89,23 @@ pub fn sequential<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>(
device: Device<E, C, T, B>,
) {
device.run_outbound.run(|peer| {
- peer.outbound.handle(|body| {
- log::trace!("worker, sequential section, obtained job");
-
- // send to peer
- let xmit = peer.send(&body.msg[..]).is_ok();
-
- // trigger callback
- C::send(
- &peer.opaque,
- body.msg.len(),
- xmit,
- &body.keypair,
- body.counter,
- );
- });
+ peer.outbound.handle(
+ |body| {
+ log::trace!("worker, sequential section, obtained job");
+
+ // send to peer
+ let xmit = peer.send(&body.msg[..]).is_ok();
+
+ // trigger callback
+ C::send(
+ &peer.opaque,
+ body.msg.len(),
+ xmit,
+ &body.keypair,
+ body.counter,
+ );
+ },
+ MAX_INORDER_CONSUME,
+ )
});
}
diff --git a/src/wireguard/router/pool.rs b/src/wireguard/router/pool.rs
index 07a9bfa..686c788 100644
--- a/src/wireguard/router/pool.rs
+++ b/src/wireguard/router/pool.rs
@@ -4,10 +4,9 @@ use std::mem;
use std::sync::mpsc::Receiver;
use std::sync::Arc;
+use super::constants::INORDER_QUEUE_SIZE;
use super::runq::{RunQueue, ToKey};
-const INORDER_QUEUE_SIZE: usize = 64;
-
pub struct InnerJob<P, B> {
// peer (used by worker to schedule/handle inorder queue),
// when the peer is None, the job is complete
@@ -52,28 +51,50 @@ pub struct InorderQueue<P, B> {
}
impl<P, B> InorderQueue<P, B> {
- pub fn send(&self, job: Job<P, B>) -> bool {
- self.queue.lock().push_back(job).is_ok()
- }
-
pub fn new() -> InorderQueue<P, B> {
InorderQueue {
queue: Mutex::new(ArrayDeque::new()),
}
}
+ /// Add a new job to the in-order queue
+ ///
+ /// # Arguments
+ ///
+ /// - `job`: The job added to the back of the queue
+ ///
+ /// # Returns
+ ///
+ /// True if the element was added,
+ /// false to indicate that the queue is full.
+ pub fn send(&self, job: Job<P, B>) -> bool {
+ self.queue.lock().push_back(job).is_ok()
+ }
+
+ /// Consume completed jobs from the in-order queue
+ ///
+ /// # Arguments
+ ///
+ /// - `f`: function to apply to the body of each jobof each job.
+ /// - `limit`: maximum number of jobs to handle before returning
+ ///
+ /// # Returns
+ ///
+ /// A boolean indicating if the limit was reached:
+ /// true indicating that the limit was reached,
+ /// while false implies that the queue is empty or an uncompleted job was reached.
#[inline(always)]
- pub fn handle<F: Fn(&mut B)>(&self, f: F) {
+ pub fn handle<F: Fn(&mut B)>(&self, f: F, mut limit: usize) -> bool {
// take the mutex
let mut queue = self.queue.lock();
- loop {
+ while limit > 0 {
// attempt to extract front element
let front = queue.pop_front();
let elem = match front {
Some(elem) => elem,
_ => {
- return;
+ return false;
}
};
@@ -90,13 +111,17 @@ impl<P, B> InorderQueue<P, B> {
// job not complete yet, return job to front
if ret {
queue.push_front(elem).unwrap();
- return;
+ return false;
}
+ limit -= 1;
}
+
+ // did not complete all jobs
+ true
}
}
-/// Allows easy construction of a semi-parallel worker.
+/// Allows easy construction of a parallel worker.
/// Applicable for both decryption and encryption workers.
#[inline(always)]
pub fn worker_parallel<
diff --git a/src/wireguard/router/runq.rs b/src/wireguard/router/runq.rs
index 936a53c..44e11a1 100644
--- a/src/wireguard/router/runq.rs
+++ b/src/wireguard/router/runq.rs
@@ -58,7 +58,21 @@ impl<T: ToKey> RunQueue<T> {
}
}
- pub fn run<F: Fn(&T) -> ()>(&self, f: F) {
+ /// Run (consume from) the run queue using the provided function.
+ /// The function should return wheter the given element should be rescheduled.
+ ///
+ /// # Arguments
+ ///
+ /// - `f` : function to apply to every element
+ ///
+ /// # Note
+ ///
+ /// The function f may be called again even when the element was not inserted back in to the
+ /// queue since the last applciation and no rescheduling was requested.
+ ///
+ /// This happens then the function handles all work for T,
+ /// but T is added to the run queue while the function is running.
+ pub fn run<F: Fn(&T) -> bool>(&self, f: F) {
let mut inner = self.inner.lock().unwrap();
loop {
// fetch next element
@@ -86,10 +100,16 @@ impl<T: ToKey> RunQueue<T> {
mem::drop(inner); // drop guard
// handle element
- f(&elem);
+ let rerun = f(&elem);
- // retake lock and check if should be added back to queue
+ // if the function requested a re-run add the element to the back of the queue
inner = self.inner.lock().unwrap();
+ if rerun {
+ inner.queue.push_back(elem);
+ continue;
+ }
+
+ // otherwise check if new requests have come in since we ran the function
match inner.members.entry(key) {
Entry::Occupied(occ) => {
if *occ.get() == old_n {
@@ -111,7 +131,6 @@ impl<T: ToKey> RunQueue<T> {
#[cfg(test)]
mod tests {
use super::*;
- use std::sync::Arc;
use std::thread;
use std::time::Duration;