summaryrefslogtreecommitdiffstats
path: root/src/wireguard/router/queue.rs
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2019-12-06 21:45:21 +0100
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2019-12-06 21:45:21 +0100
commit74e576a9c21b0de451e0588428fbbb99b24eb074 (patch)
tree381ad26325ae4bee1f6a17449110ac941c2a9192 /src/wireguard/router/queue.rs
parentMoving away from peer threads (diff)
downloadwireguard-rs-74e576a9c21b0de451e0588428fbbb99b24eb074.tar.xz
wireguard-rs-74e576a9c21b0de451e0588428fbbb99b24eb074.zip
Fixed inbound job bug (add to sequential queue)
Diffstat (limited to 'src/wireguard/router/queue.rs')
-rw-r--r--src/wireguard/router/queue.rs46
1 files changed, 46 insertions, 0 deletions
diff --git a/src/wireguard/router/queue.rs b/src/wireguard/router/queue.rs
new file mode 100644
index 0000000..5d0165c
--- /dev/null
+++ b/src/wireguard/router/queue.rs
@@ -0,0 +1,46 @@
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::mpsc::sync_channel;
+use std::sync::mpsc::{Receiver, SyncSender};
+
+use spin::Mutex;
+
+pub struct ParallelQueue<T> {
+ next: AtomicUsize, // next round-robin index
+ queues: Vec<Mutex<SyncSender<T>>>, // work queues (1 per thread)
+}
+
+impl<T> ParallelQueue<T> {
+ pub fn new(queues: usize) -> (Vec<Receiver<T>>, Self) {
+ let mut rxs = vec![];
+ let mut txs = vec![];
+
+ for _ in 0..queues {
+ let (tx, rx) = sync_channel(128);
+ txs.push(Mutex::new(tx));
+ rxs.push(rx);
+ }
+
+ (
+ rxs,
+ ParallelQueue {
+ next: AtomicUsize::new(0),
+ queues: txs,
+ },
+ )
+ }
+
+ pub fn send(&self, v: T) {
+ let len = self.queues.len();
+ let idx = self.next.fetch_add(1, Ordering::SeqCst);
+ let que = self.queues[idx % len].lock();
+ que.send(v).unwrap();
+ }
+
+ pub fn close(&self) {
+ for i in 0..self.queues.len() {
+ let (tx, _) = sync_channel(0);
+ let queue = &self.queues[i];
+ *queue.lock() = tx;
+ }
+ }
+}