diff options
author | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-12-16 15:26:15 +0100 |
---|---|---|
committer | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-12-16 15:26:15 +0100 |
commit | fd3ba63e80e7f97ae89ab5938fe6594c2d1c0869 (patch) | |
tree | eb4d998b2e2b19440466c17a22d5742cd9ebdfc7 /src/wireguard/queue.rs | |
parent | Removed unused atexit (diff) | |
download | wireguard-rs-fd3ba63e80e7f97ae89ab5938fe6594c2d1c0869.tar.xz wireguard-rs-fd3ba63e80e7f97ae89ab5938fe6594c2d1c0869.zip |
Revert to crossbeam
Diffstat (limited to 'src/wireguard/queue.rs')
-rw-r--r-- | src/wireguard/queue.rs | 47 |
1 files changed, 11 insertions, 36 deletions
diff --git a/src/wireguard/queue.rs b/src/wireguard/queue.rs index f484320..4c004c4 100644 --- a/src/wireguard/queue.rs +++ b/src/wireguard/queue.rs @@ -1,19 +1,8 @@ -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::mpsc::sync_channel; -use std::sync::mpsc::{Receiver, SyncSender}; +use crossbeam_channel::{bounded, Receiver, Sender}; use std::sync::Mutex; -/// A simple parallel queue used to pass work to a worker pool. -/// -/// Unlike e.g. the crossbeam multi-producer multi-consumer queue -/// the ParallelQueue offers fewer features and instead improves speed: -/// -/// The crossbeam channel ensures that elements are consumed -/// even if not every Receiver is being read from. -/// This is not ensured by ParallelQueue. pub struct ParallelQueue<T> { - next: AtomicUsize, // next round-robin index - queues: Vec<Mutex<Option<SyncSender<T>>>>, // work queues (1 per thread) + queue: Mutex<Option<Sender<T>>>, // work queues (1 per thread) } impl<T> ParallelQueue<T> { @@ -25,40 +14,26 @@ impl<T> ParallelQueue<T> { /// - `capacity`: capacity of each internal queue /// pub fn new(queues: usize, capacity: usize) -> (Self, Vec<Receiver<T>>) { - let mut rxs = vec![]; - let mut txs = vec![]; - + let mut receivers = Vec::with_capacity(queues); + let (tx, rx) = bounded(capacity); for _ in 0..queues { - let (tx, rx) = sync_channel(capacity); - txs.push(Mutex::new(Some(tx))); - rxs.push(rx); + receivers.push(rx.clone()); } - ( ParallelQueue { - next: AtomicUsize::new(0), - queues: txs, + queue: Mutex::new(Some(tx)), }, - rxs, + receivers, ) } pub fn send(&self, v: T) { - let len = self.queues.len(); - let idx = self.next.fetch_add(1, Ordering::SeqCst); - match self.queues[idx % len].lock().unwrap().as_ref() { - Some(que) => { - // TODO: consider best way to propergate Result - let _ = que.send(v); - } - _ => (), - } + self.queue.lock().unwrap().as_ref().map(|s| { + let _ = s.send(v); + }); } pub fn close(&self) { - for i in 0..self.queues.len() { - let queue = &self.queues[i]; - *queue.lock().unwrap() = None; - } + *self.queue.lock().unwrap() = None; } } |