From 106c5e8b5c865c8396f824f4f5aa14d1bf0952b1 Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Sun, 16 Feb 2020 18:12:43 +0100 Subject: Work on router optimizations --- src/wireguard/router/queue.rs | 92 +++++++++++++++++++++++++++++++++++++------ 1 file changed, 80 insertions(+), 12 deletions(-) (limited to 'src/wireguard/router/queue.rs') diff --git a/src/wireguard/router/queue.rs b/src/wireguard/router/queue.rs index 045fd51..ec4492e 100644 --- a/src/wireguard/router/queue.rs +++ b/src/wireguard/router/queue.rs @@ -4,29 +4,36 @@ use spin::Mutex; use std::mem; use std::sync::atomic::{AtomicUsize, Ordering}; -const QUEUE_SIZE: usize = 1024; - -pub trait Job: Sized { - fn queue(&self) -> &Queue; +use super::constants::INORDER_QUEUE_SIZE; +pub trait SequentialJob { fn is_ready(&self) -> bool; - fn parallel_work(&self); - fn sequential_work(self); } +pub trait ParallelJob: Sized + SequentialJob { + fn queue(&self) -> &Queue; + + fn parallel_work(&self); +} -pub struct Queue { +pub struct Queue { contenders: AtomicUsize, - queue: Mutex>, + queue: Mutex>, + + #[cfg(debug)] + _flag: Mutex<()>, } -impl Queue { +impl Queue { pub fn new() -> Queue { Queue { contenders: AtomicUsize::new(0), queue: Mutex::new(ArrayDeque::new()), + + #[cfg(debug)] + _flag: Mutex::new(()), } } @@ -36,14 +43,22 @@ impl Queue { pub fn consume(&self) { // check if we are the first contender - let pos = self.contenders.fetch_add(1, Ordering::Acquire); + let pos = self.contenders.fetch_add(1, Ordering::SeqCst); if pos > 0 { - assert!(pos < usize::max_value(), "contenders overflow"); + assert!(usize::max_value() > pos, "contenders overflow"); + return; } // enter the critical section let mut contenders = 1; // myself while contenders > 0 { + // check soundness in debug builds + #[cfg(debug)] + let _flag = self + ._flag + .try_lock() + .expect("contenders should ensure mutual exclusion"); + // handle every ready element loop { let mut queue = self.queue.lock(); @@ -69,8 +84,61 @@ impl Queue { job.sequential_work(); } + #[cfg(debug)] + mem::drop(_flag); + // decrease contenders - contenders = self.contenders.fetch_sub(contenders, Ordering::Acquire) - contenders; + contenders = self.contenders.fetch_sub(contenders, Ordering::SeqCst) - contenders; + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::sync::Arc; + use std::thread; + + use rand::thread_rng; + use rand::Rng; + + struct TestJob {} + + impl SequentialJob for TestJob { + fn is_ready(&self) -> bool { + true + } + + fn sequential_work(self) {} + } + + /* Fuzz the Queue */ + #[test] + fn test_queue() { + fn hammer(queue: &Arc>) { + let mut rng = thread_rng(); + for _ in 0..1_000_000 { + if rng.gen() { + queue.push(TestJob {}); + } else { + queue.consume(); + } + } } + + let queue = Arc::new(Queue::new()); + + // repeatedly apply operations randomly from concurrent threads + let other = { + let queue = queue.clone(); + thread::spawn(move || hammer(&queue)) + }; + hammer(&queue); + + // wait, consume and check empty + other.join().unwrap(); + queue.consume(); + assert_eq!(queue.queue.lock().len(), 0); } } -- cgit v1.2.3-59-g8ed1b