aboutsummaryrefslogtreecommitdiffstats
path: root/src/wireguard/queue.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/wireguard/queue.rs')
-rw-r--r--src/wireguard/queue.rs64
1 files changed, 64 insertions, 0 deletions
diff --git a/src/wireguard/queue.rs b/src/wireguard/queue.rs
new file mode 100644
index 0000000..a0fcf03
--- /dev/null
+++ b/src/wireguard/queue.rs
@@ -0,0 +1,64 @@
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::mpsc::sync_channel;
+use std::sync::mpsc::{Receiver, SyncSender};
+use std::sync::Mutex;
+
+/// A simple parallel queue used to pass work to a worker pool.
+///
+/// Unlike e.g. the crossbeam multi-producer multi-consumer queue
+/// the ParallelQueue offers fewer features and instead improves speed:
+///
+/// The crossbeam channel ensures that elements are consumed
+/// even if not every Receiver is being read from.
+/// This is not ensured by ParallelQueue.
+pub struct ParallelQueue<T> {
+ next: AtomicUsize, // next round-robin index
+ queues: Vec<Mutex<Option<SyncSender<T>>>>, // work queues (1 per thread)
+}
+
+impl<T> ParallelQueue<T> {
+ /// Create a new ParallelQueue instance
+ ///
+ /// # Arguments
+ ///
+ /// - `queues`: number of readers/writers
+ /// - `capacity`: capacity of each internal queue
+ ///
+ pub fn new(queues: usize, capacity: usize) -> (Self, Vec<Receiver<T>>) {
+ let mut rxs = vec![];
+ let mut txs = vec![];
+
+ for _ in 0..queues {
+ let (tx, rx) = sync_channel(capacity);
+ txs.push(Mutex::new(Some(tx)));
+ rxs.push(rx);
+ }
+
+ (
+ ParallelQueue {
+ next: AtomicUsize::new(0),
+ queues: txs,
+ },
+ rxs,
+ )
+ }
+
+ pub fn send(&self, v: T) {
+ let len = self.queues.len();
+ let idx = self.next.fetch_add(1, Ordering::SeqCst);
+ match self.queues[idx % len].lock().unwrap().as_ref() {
+ Some(que) => {
+ // TODO: consider best way to propergate Result
+ let _ = que.send(v);
+ }
+ _ => (),
+ }
+ }
+
+ pub fn close(&self) {
+ for i in 0..self.queues.len() {
+ let queue = &self.queues[i];
+ *queue.lock().unwrap() = None;
+ }
+ }
+}