aboutsummaryrefslogtreecommitdiffstats
path: root/src/wireguard/router/queue.rs
blob: d5d657a9895531617b27b9f3e24f7c1849874dc0 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
use arraydeque::ArrayDeque;
use spin::Mutex;

use core::mem;
use core::sync::atomic::{AtomicUsize, Ordering};

use super::constants::INORDER_QUEUE_SIZE;

pub trait SequentialJob {
    fn is_ready(&self) -> bool;

    fn sequential_work(self);
}

pub trait ParallelJob: Sized + SequentialJob {
    fn queue(&self) -> &Queue<Self>;

    fn parallel_work(&self);
}

pub struct Queue<J: SequentialJob> {
    contenders: AtomicUsize,
    queue: Mutex<ArrayDeque<[J; INORDER_QUEUE_SIZE]>>,

    #[cfg(debug)]
    _flag: Mutex<()>,
}

impl<J: SequentialJob> Queue<J> {
    pub fn new() -> Queue<J> {
        Queue {
            contenders: AtomicUsize::new(0),
            queue: Mutex::new(ArrayDeque::new()),

            #[cfg(debug)]
            _flag: Mutex::new(()),
        }
    }

    pub fn push(&self, job: J) -> bool {
        self.queue.lock().push_back(job).is_ok()
    }

    pub fn consume(&self) {
        // check if we are the first contender
        let pos = self.contenders.fetch_add(1, Ordering::SeqCst);
        if pos > 0 {
            assert!(usize::max_value() > pos, "contenders overflow");
            return;
        }

        // enter the critical section
        let mut contenders = 1; // myself
        while contenders > 0 {
            // check soundness in debug builds
            #[cfg(debug)]
            let _flag = self
                ._flag
                .try_lock()
                .expect("contenders should ensure mutual exclusion");

            // handle every ready element
            loop {
                let mut queue = self.queue.lock();

                // check if front job is ready
                match queue.front() {
                    None => break,
                    Some(job) => {
                        if job.is_ready() {
                            ()
                        } else {
                            break;
                        }
                    }
                };

                // take the job out of the queue
                let job = queue.pop_front().unwrap();
                debug_assert!(job.is_ready());
                mem::drop(queue);

                // process element
                job.sequential_work();
            }

            #[cfg(debug)]
            mem::drop(_flag);

            // decrease contenders
            contenders = self.contenders.fetch_sub(contenders, Ordering::SeqCst) - contenders;
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    use std::thread;

    use std::sync::Arc;
    use std::time::Duration;

    use rand::thread_rng;
    use rand::Rng;

    #[test]
    fn test_consume_queue() {
        struct TestJob {
            cnt: Arc<AtomicUsize>,
            wait_sequential: Duration,
        }

        impl SequentialJob for TestJob {
            fn is_ready(&self) -> bool {
                true
            }

            fn sequential_work(self) {
                thread::sleep(self.wait_sequential);
                self.cnt.fetch_add(1, Ordering::SeqCst);
            }
        }

        fn hammer(queue: &Arc<Queue<TestJob>>, cnt: Arc<AtomicUsize>) -> usize {
            let mut jobs = 0;
            let mut rng = thread_rng();
            for _ in 0..10_000 {
                if rng.gen() {
                    let wait_sequential: u64 = rng.gen();
                    let wait_sequential = wait_sequential % 1000;

                    let wait_parallel: u64 = rng.gen();
                    let wait_parallel = wait_parallel % 1000;

                    thread::sleep(Duration::from_micros(wait_parallel));

                    queue.push(TestJob {
                        cnt: cnt.clone(),
                        wait_sequential: Duration::from_micros(wait_sequential),
                    });
                    jobs += 1;
                } else {
                    queue.consume();
                }
            }
            queue.consume();
            jobs
        }

        let queue = Arc::new(Queue::new());
        let counter = Arc::new(AtomicUsize::new(0));

        // repeatedly apply operations randomly from concurrent threads
        let other = {
            let queue = queue.clone();
            let counter = counter.clone();
            thread::spawn(move || hammer(&queue, counter))
        };
        let mut jobs = hammer(&queue, counter.clone());

        // wait, consume and check empty
        jobs += other.join().unwrap();
        assert_eq!(queue.queue.lock().len(), 0, "elements left in queue");
        assert_eq!(
            jobs,
            counter.load(Ordering::Acquire),
            "did not consume every job"
        );
    }

    /* Fuzz the Queue */
    #[test]
    fn test_fuzz_queue() {
        struct TestJob {}

        impl SequentialJob for TestJob {
            fn is_ready(&self) -> bool {
                true
            }

            fn sequential_work(self) {}
        }

        fn hammer(queue: &Arc<Queue<TestJob>>) {
            let mut rng = thread_rng();
            for _ in 0..1_000_000 {
                if rng.gen() {
                    queue.push(TestJob {});
                } else {
                    queue.consume();
                }
            }
        }

        let queue = Arc::new(Queue::new());

        // repeatedly apply operations randomly from concurrent threads
        let other = {
            let queue = queue.clone();
            thread::spawn(move || hammer(&queue))
        };
        hammer(&queue);

        // wait, consume and check empty
        other.join().unwrap();
        queue.consume();
        assert_eq!(queue.queue.lock().len(), 0);
    }
}