diff options
author | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-12-06 21:45:21 +0100 |
---|---|---|
committer | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-12-06 21:45:21 +0100 |
commit | 74e576a9c21b0de451e0588428fbbb99b24eb074 (patch) | |
tree | 381ad26325ae4bee1f6a17449110ac941c2a9192 /src/wireguard/router/queue.rs | |
parent | Moving away from peer threads (diff) | |
download | wireguard-rs-74e576a9c21b0de451e0588428fbbb99b24eb074.tar.xz wireguard-rs-74e576a9c21b0de451e0588428fbbb99b24eb074.zip |
Fixed inbound job bug (add to sequential queue)
Diffstat (limited to 'src/wireguard/router/queue.rs')
-rw-r--r-- | src/wireguard/router/queue.rs | 46 |
1 files changed, 46 insertions, 0 deletions
diff --git a/src/wireguard/router/queue.rs b/src/wireguard/router/queue.rs new file mode 100644 index 0000000..5d0165c --- /dev/null +++ b/src/wireguard/router/queue.rs @@ -0,0 +1,46 @@ +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::mpsc::sync_channel; +use std::sync::mpsc::{Receiver, SyncSender}; + +use spin::Mutex; + +pub struct ParallelQueue<T> { + next: AtomicUsize, // next round-robin index + queues: Vec<Mutex<SyncSender<T>>>, // work queues (1 per thread) +} + +impl<T> ParallelQueue<T> { + pub fn new(queues: usize) -> (Vec<Receiver<T>>, Self) { + let mut rxs = vec![]; + let mut txs = vec![]; + + for _ in 0..queues { + let (tx, rx) = sync_channel(128); + txs.push(Mutex::new(tx)); + rxs.push(rx); + } + + ( + rxs, + ParallelQueue { + next: AtomicUsize::new(0), + queues: txs, + }, + ) + } + + pub fn send(&self, v: T) { + let len = self.queues.len(); + let idx = self.next.fetch_add(1, Ordering::SeqCst); + let que = self.queues[idx % len].lock(); + que.send(v).unwrap(); + } + + pub fn close(&self) { + for i in 0..self.queues.len() { + let (tx, _) = sync_channel(0); + let queue = &self.queues[i]; + *queue.lock() = tx; + } + } +} |