From 626b3b2314c135963b29e44d00b6dd8a11e61481 Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Wed, 26 Feb 2020 23:01:18 +0100 Subject: Additional in-order queue test for router --- src/wireguard/router/queue.rs | 81 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 74 insertions(+), 7 deletions(-) (limited to 'src/wireguard/router') diff --git a/src/wireguard/router/queue.rs b/src/wireguard/router/queue.rs index ec4492e..6517ba4 100644 --- a/src/wireguard/router/queue.rs +++ b/src/wireguard/router/queue.rs @@ -97,25 +97,92 @@ impl Queue { mod tests { use super::*; - use std::sync::Arc; use std::thread; + use std::sync::Arc; + use std::time::Duration; + use rand::thread_rng; use rand::Rng; - struct TestJob {} + #[test] + fn test_consume_queue() { + struct TestJob { + cnt: Arc, + wait_sequential: Duration, + } + + impl SequentialJob for TestJob { + fn is_ready(&self) -> bool { + true + } + + fn sequential_work(self) { + thread::sleep(self.wait_sequential); + self.cnt.fetch_add(1, Ordering::SeqCst); + } + } + + fn hammer(queue: &Arc>, cnt: Arc) -> usize { + let mut jobs = 0; + let mut rng = thread_rng(); + for _ in 0..10_000 { + if rng.gen() { + let wait_sequential: u64 = rng.gen(); + let wait_sequential = wait_sequential % 1000; + + let wait_parallel: u64 = rng.gen(); + let wait_parallel = wait_parallel % 1000; - impl SequentialJob for TestJob { - fn is_ready(&self) -> bool { - true + thread::sleep(Duration::from_micros(wait_parallel)); + + queue.push(TestJob { + cnt: cnt.clone(), + wait_sequential: Duration::from_micros(wait_sequential), + }); + jobs += 1; + } else { + queue.consume(); + } + } + queue.consume(); + jobs } - fn sequential_work(self) {} + let queue = Arc::new(Queue::new()); + let counter = Arc::new(AtomicUsize::new(0)); + + // repeatedly apply operations randomly from concurrent threads + let other = { + let queue = queue.clone(); + let counter = counter.clone(); + thread::spawn(move || hammer(&queue, counter)) + }; + let mut jobs = hammer(&queue, counter.clone()); + + // wait, consume and check empty + jobs += other.join().unwrap(); + assert_eq!(queue.queue.lock().len(), 0, "elements left in queue"); + assert_eq!( + jobs, + counter.load(Ordering::Acquire), + "did not consume every job" + ); } /* Fuzz the Queue */ #[test] - fn test_queue() { + fn test_fuzz_queue() { + struct TestJob {} + + impl SequentialJob for TestJob { + fn is_ready(&self) -> bool { + true + } + + fn sequential_work(self) {} + } + fn hammer(queue: &Arc>) { let mut rng = thread_rng(); for _ in 0..1_000_000 { -- cgit v1.2.3-59-g8ed1b