diff options
Diffstat (limited to 'src/wireguard/queue.rs')
-rw-r--r-- | src/wireguard/queue.rs | 47 |
1 files changed, 11 insertions, 36 deletions
diff --git a/src/wireguard/queue.rs b/src/wireguard/queue.rs index f484320..4c004c4 100644 --- a/src/wireguard/queue.rs +++ b/src/wireguard/queue.rs @@ -1,19 +1,8 @@ -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::mpsc::sync_channel; -use std::sync::mpsc::{Receiver, SyncSender}; +use crossbeam_channel::{bounded, Receiver, Sender}; use std::sync::Mutex; -/// A simple parallel queue used to pass work to a worker pool. -/// -/// Unlike e.g. the crossbeam multi-producer multi-consumer queue -/// the ParallelQueue offers fewer features and instead improves speed: -/// -/// The crossbeam channel ensures that elements are consumed -/// even if not every Receiver is being read from. -/// This is not ensured by ParallelQueue. pub struct ParallelQueue<T> { - next: AtomicUsize, // next round-robin index - queues: Vec<Mutex<Option<SyncSender<T>>>>, // work queues (1 per thread) + queue: Mutex<Option<Sender<T>>>, // work queues (1 per thread) } impl<T> ParallelQueue<T> { @@ -25,40 +14,26 @@ impl<T> ParallelQueue<T> { /// - `capacity`: capacity of each internal queue /// pub fn new(queues: usize, capacity: usize) -> (Self, Vec<Receiver<T>>) { - let mut rxs = vec![]; - let mut txs = vec![]; - + let mut receivers = Vec::with_capacity(queues); + let (tx, rx) = bounded(capacity); for _ in 0..queues { - let (tx, rx) = sync_channel(capacity); - txs.push(Mutex::new(Some(tx))); - rxs.push(rx); + receivers.push(rx.clone()); } - ( ParallelQueue { - next: AtomicUsize::new(0), - queues: txs, + queue: Mutex::new(Some(tx)), }, - rxs, + receivers, ) } pub fn send(&self, v: T) { - let len = self.queues.len(); - let idx = self.next.fetch_add(1, Ordering::SeqCst); - match self.queues[idx % len].lock().unwrap().as_ref() { - Some(que) => { - // TODO: consider best way to propergate Result - let _ = que.send(v); - } - _ => (), - } + self.queue.lock().unwrap().as_ref().map(|s| { + let _ = s.send(v); + }); } pub fn close(&self) { - for i in 0..self.queues.len() { - let queue = &self.queues[i]; - *queue.lock().unwrap() = None; - } + *self.queue.lock().unwrap() = None; } } |