aboutsummaryrefslogtreecommitdiffstats
path: root/src/wireguard/router/queue.rs
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/wireguard/router/queue.rs81
1 files changed, 74 insertions, 7 deletions
diff --git a/src/wireguard/router/queue.rs b/src/wireguard/router/queue.rs
index ec4492e..6517ba4 100644
--- a/src/wireguard/router/queue.rs
+++ b/src/wireguard/router/queue.rs
@@ -97,25 +97,92 @@ impl<J: SequentialJob> Queue<J> {
mod tests {
use super::*;
- use std::sync::Arc;
use std::thread;
+ use std::sync::Arc;
+ use std::time::Duration;
+
use rand::thread_rng;
use rand::Rng;
- struct TestJob {}
+ #[test]
+ fn test_consume_queue() {
+ struct TestJob {
+ cnt: Arc<AtomicUsize>,
+ wait_sequential: Duration,
+ }
+
+ impl SequentialJob for TestJob {
+ fn is_ready(&self) -> bool {
+ true
+ }
+
+ fn sequential_work(self) {
+ thread::sleep(self.wait_sequential);
+ self.cnt.fetch_add(1, Ordering::SeqCst);
+ }
+ }
+
+ fn hammer(queue: &Arc<Queue<TestJob>>, cnt: Arc<AtomicUsize>) -> usize {
+ let mut jobs = 0;
+ let mut rng = thread_rng();
+ for _ in 0..10_000 {
+ if rng.gen() {
+ let wait_sequential: u64 = rng.gen();
+ let wait_sequential = wait_sequential % 1000;
+
+ let wait_parallel: u64 = rng.gen();
+ let wait_parallel = wait_parallel % 1000;
- impl SequentialJob for TestJob {
- fn is_ready(&self) -> bool {
- true
+ thread::sleep(Duration::from_micros(wait_parallel));
+
+ queue.push(TestJob {
+ cnt: cnt.clone(),
+ wait_sequential: Duration::from_micros(wait_sequential),
+ });
+ jobs += 1;
+ } else {
+ queue.consume();
+ }
+ }
+ queue.consume();
+ jobs
}
- fn sequential_work(self) {}
+ let queue = Arc::new(Queue::new());
+ let counter = Arc::new(AtomicUsize::new(0));
+
+ // repeatedly apply operations randomly from concurrent threads
+ let other = {
+ let queue = queue.clone();
+ let counter = counter.clone();
+ thread::spawn(move || hammer(&queue, counter))
+ };
+ let mut jobs = hammer(&queue, counter.clone());
+
+ // wait, consume and check empty
+ jobs += other.join().unwrap();
+ assert_eq!(queue.queue.lock().len(), 0, "elements left in queue");
+ assert_eq!(
+ jobs,
+ counter.load(Ordering::Acquire),
+ "did not consume every job"
+ );
}
/* Fuzz the Queue */
#[test]
- fn test_queue() {
+ fn test_fuzz_queue() {
+ struct TestJob {}
+
+ impl SequentialJob for TestJob {
+ fn is_ready(&self) -> bool {
+ true
+ }
+
+ fn sequential_work(self) {}
+ }
+
fn hammer(queue: &Arc<Queue<TestJob>>) {
let mut rng = thread_rng();
for _ in 0..1_000_000 {