aboutsummaryrefslogtreecommitdiffstats
path: root/src/wireguard/router/pool.rs
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/wireguard/router/pool.rs77
1 files changed, 41 insertions, 36 deletions
diff --git a/src/wireguard/router/pool.rs b/src/wireguard/router/pool.rs
index 9c72372..98b1144 100644
--- a/src/wireguard/router/pool.rs
+++ b/src/wireguard/router/pool.rs
@@ -2,6 +2,9 @@ use arraydeque::ArrayDeque;
use spin::{Mutex, MutexGuard};
use std::sync::mpsc::Receiver;
use std::sync::Arc;
+use std::mem;
+
+use super::runq::{RunQueue, ToKey};
const INORDER_QUEUE_SIZE: usize = 64;
@@ -60,51 +63,53 @@ impl<P, B> InorderQueue<P, B> {
}
#[inline(always)]
- pub fn handle<F: Fn(&mut InnerJob<P, B>)>(&self, f: F) {
+ pub fn handle<F: Fn(&mut B)>(&self, f: F) {
// take the mutex
let mut queue = self.queue.lock();
- // handle all complete messages
- while queue
- .pop_front()
- .and_then(|j| {
- // check if job is complete
- let ret = if let Some(mut guard) = j.complete() {
- f(&mut *guard);
- false
- } else {
- true
- };
-
- // return job to cyclic buffer if not complete
- if ret {
- let _res = queue.push_front(j);
- debug_assert!(_res.is_ok());
- None
- } else {
- // add job back to pool
- Some(())
+ loop {
+ // attempt to extract front element
+ let front = queue.pop_front();
+ let elem = match front {
+ Some(elem) => elem,
+ _ => {
+ return;
}
- })
- .is_some()
- {}
+ };
+
+ // apply function if job complete
+ let ret = if let Some(mut guard) = elem.complete() {
+ mem::drop(queue);
+ f(&mut guard.body);
+ queue = self.queue.lock();
+ false
+ } else {
+ true
+ };
+
+ // job not complete yet, return job to front
+ if ret {
+ queue.push_front(elem).unwrap();
+ return;
+ }
+ }
}
}
/// Allows easy construction of a semi-parallel worker.
/// Applicable for both decryption and encryption workers.
#[inline(always)]
-pub fn worker_template<
- P, // represents a peer (atomic reference counted pointer)
+pub fn worker_parallel<
+ P : ToKey, // represents a peer (atomic reference counted pointer)
B, // inner body type (message buffer, key material, ...)
+ D, // device
W: Fn(&P, &mut B),
- S: Fn(&P, &mut B),
- Q: Fn(&P) -> &InorderQueue<P, B>,
+ Q: Fn(&D) -> &RunQueue<P>,
>(
- receiver: Receiver<Job<P, B>>, // receiever for new jobs
- work_parallel: W, // perform parallel / out-of-order work on peer
- work_sequential: S, // perform sequential work on peer
- queue: Q, // resolve a peer to an inorder queue
+ device: D,
+ queue: Q,
+ receiver: Receiver<Job<P, B>>,
+ work: W,
) {
log::trace!("router worker started");
loop {
@@ -123,11 +128,11 @@ pub fn worker_template<
let peer = job.peer.take().unwrap();
// process job
- work_parallel(&peer, &mut job.body);
+ work(&peer, &mut job.body);
peer
};
-
+
// process inorder jobs for peer
- queue(&peer).handle(|j| work_sequential(&peer, &mut j.body));
+ queue(&device).insert(peer);
}
-}
+} \ No newline at end of file