aboutsummaryrefslogtreecommitdiffstats
path: root/src/wireguard/router/queue.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/wireguard/router/queue.rs')
-rw-r--r--src/wireguard/router/queue.rs92
1 files changed, 80 insertions, 12 deletions
diff --git a/src/wireguard/router/queue.rs b/src/wireguard/router/queue.rs
index 045fd51..ec4492e 100644
--- a/src/wireguard/router/queue.rs
+++ b/src/wireguard/router/queue.rs
@@ -4,29 +4,36 @@ use spin::Mutex;
use std::mem;
use std::sync::atomic::{AtomicUsize, Ordering};
-const QUEUE_SIZE: usize = 1024;
-
-pub trait Job: Sized {
- fn queue(&self) -> &Queue<Self>;
+use super::constants::INORDER_QUEUE_SIZE;
+pub trait SequentialJob {
fn is_ready(&self) -> bool;
- fn parallel_work(&self);
-
fn sequential_work(self);
}
+pub trait ParallelJob: Sized + SequentialJob {
+ fn queue(&self) -> &Queue<Self>;
+
+ fn parallel_work(&self);
+}
-pub struct Queue<J: Job> {
+pub struct Queue<J: SequentialJob> {
contenders: AtomicUsize,
- queue: Mutex<ArrayDeque<[J; QUEUE_SIZE]>>,
+ queue: Mutex<ArrayDeque<[J; INORDER_QUEUE_SIZE]>>,
+
+ #[cfg(debug)]
+ _flag: Mutex<()>,
}
-impl<J: Job> Queue<J> {
+impl<J: SequentialJob> Queue<J> {
pub fn new() -> Queue<J> {
Queue {
contenders: AtomicUsize::new(0),
queue: Mutex::new(ArrayDeque::new()),
+
+ #[cfg(debug)]
+ _flag: Mutex::new(()),
}
}
@@ -36,14 +43,22 @@ impl<J: Job> Queue<J> {
pub fn consume(&self) {
// check if we are the first contender
- let pos = self.contenders.fetch_add(1, Ordering::Acquire);
+ let pos = self.contenders.fetch_add(1, Ordering::SeqCst);
if pos > 0 {
- assert!(pos < usize::max_value(), "contenders overflow");
+ assert!(usize::max_value() > pos, "contenders overflow");
+ return;
}
// enter the critical section
let mut contenders = 1; // myself
while contenders > 0 {
+ // check soundness in debug builds
+ #[cfg(debug)]
+ let _flag = self
+ ._flag
+ .try_lock()
+ .expect("contenders should ensure mutual exclusion");
+
// handle every ready element
loop {
let mut queue = self.queue.lock();
@@ -69,8 +84,61 @@ impl<J: Job> Queue<J> {
job.sequential_work();
}
+ #[cfg(debug)]
+ mem::drop(_flag);
+
// decrease contenders
- contenders = self.contenders.fetch_sub(contenders, Ordering::Acquire) - contenders;
+ contenders = self.contenders.fetch_sub(contenders, Ordering::SeqCst) - contenders;
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ use std::sync::Arc;
+ use std::thread;
+
+ use rand::thread_rng;
+ use rand::Rng;
+
+ struct TestJob {}
+
+ impl SequentialJob for TestJob {
+ fn is_ready(&self) -> bool {
+ true
+ }
+
+ fn sequential_work(self) {}
+ }
+
+ /* Fuzz the Queue */
+ #[test]
+ fn test_queue() {
+ fn hammer(queue: &Arc<Queue<TestJob>>) {
+ let mut rng = thread_rng();
+ for _ in 0..1_000_000 {
+ if rng.gen() {
+ queue.push(TestJob {});
+ } else {
+ queue.consume();
+ }
+ }
}
+
+ let queue = Arc::new(Queue::new());
+
+ // repeatedly apply operations randomly from concurrent threads
+ let other = {
+ let queue = queue.clone();
+ thread::spawn(move || hammer(&queue))
+ };
+ hammer(&queue);
+
+ // wait, consume and check empty
+ other.join().unwrap();
+ queue.consume();
+ assert_eq!(queue.queue.lock().len(), 0);
}
}