diff options
Diffstat (limited to 'src/wireguard/queue.rs')
-rw-r--r-- | src/wireguard/queue.rs | 64 |
1 files changed, 64 insertions, 0 deletions
diff --git a/src/wireguard/queue.rs b/src/wireguard/queue.rs new file mode 100644 index 0000000..a0fcf03 --- /dev/null +++ b/src/wireguard/queue.rs @@ -0,0 +1,64 @@ +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::mpsc::sync_channel; +use std::sync::mpsc::{Receiver, SyncSender}; +use std::sync::Mutex; + +/// A simple parallel queue used to pass work to a worker pool. +/// +/// Unlike e.g. the crossbeam multi-producer multi-consumer queue +/// the ParallelQueue offers fewer features and instead improves speed: +/// +/// The crossbeam channel ensures that elements are consumed +/// even if not every Receiver is being read from. +/// This is not ensured by ParallelQueue. +pub struct ParallelQueue<T> { + next: AtomicUsize, // next round-robin index + queues: Vec<Mutex<Option<SyncSender<T>>>>, // work queues (1 per thread) +} + +impl<T> ParallelQueue<T> { + /// Create a new ParallelQueue instance + /// + /// # Arguments + /// + /// - `queues`: number of readers/writers + /// - `capacity`: capacity of each internal queue + /// + pub fn new(queues: usize, capacity: usize) -> (Self, Vec<Receiver<T>>) { + let mut rxs = vec![]; + let mut txs = vec![]; + + for _ in 0..queues { + let (tx, rx) = sync_channel(capacity); + txs.push(Mutex::new(Some(tx))); + rxs.push(rx); + } + + ( + ParallelQueue { + next: AtomicUsize::new(0), + queues: txs, + }, + rxs, + ) + } + + pub fn send(&self, v: T) { + let len = self.queues.len(); + let idx = self.next.fetch_add(1, Ordering::SeqCst); + match self.queues[idx % len].lock().unwrap().as_ref() { + Some(que) => { + // TODO: consider best way to propergate Result + let _ = que.send(v); + } + _ => (), + } + } + + pub fn close(&self) { + for i in 0..self.queues.len() { + let queue = &self.queues[i]; + *queue.lock().unwrap() = None; + } + } +} |