diff options
author | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-12-10 18:17:48 +0100 |
---|---|---|
committer | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-12-10 18:17:48 +0100 |
commit | 656679638750f84d1c8b75d8c44974ed45d36092 (patch) | |
tree | 9118055a5333afc6b1c3df3e77d315e55e5f8023 /src/wireguard/queue.rs | |
parent | Formatting (diff) | |
download | wireguard-rs-656679638750f84d1c8b75d8c44974ed45d36092.tar.xz wireguard-rs-656679638750f84d1c8b75d8c44974ed45d36092.zip |
Remove crossbeam dependency
Diffstat (limited to 'src/wireguard/queue.rs')
-rw-r--r-- | src/wireguard/queue.rs | 64 |
1 files changed, 64 insertions, 0 deletions
diff --git a/src/wireguard/queue.rs b/src/wireguard/queue.rs new file mode 100644 index 0000000..a0fcf03 --- /dev/null +++ b/src/wireguard/queue.rs @@ -0,0 +1,64 @@ +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::mpsc::sync_channel; +use std::sync::mpsc::{Receiver, SyncSender}; +use std::sync::Mutex; + +/// A simple parallel queue used to pass work to a worker pool. +/// +/// Unlike e.g. the crossbeam multi-producer multi-consumer queue +/// the ParallelQueue offers fewer features and instead improves speed: +/// +/// The crossbeam channel ensures that elements are consumed +/// even if not every Receiver is being read from. +/// This is not ensured by ParallelQueue. +pub struct ParallelQueue<T> { + next: AtomicUsize, // next round-robin index + queues: Vec<Mutex<Option<SyncSender<T>>>>, // work queues (1 per thread) +} + +impl<T> ParallelQueue<T> { + /// Create a new ParallelQueue instance + /// + /// # Arguments + /// + /// - `queues`: number of readers/writers + /// - `capacity`: capacity of each internal queue + /// + pub fn new(queues: usize, capacity: usize) -> (Self, Vec<Receiver<T>>) { + let mut rxs = vec![]; + let mut txs = vec![]; + + for _ in 0..queues { + let (tx, rx) = sync_channel(capacity); + txs.push(Mutex::new(Some(tx))); + rxs.push(rx); + } + + ( + ParallelQueue { + next: AtomicUsize::new(0), + queues: txs, + }, + rxs, + ) + } + + pub fn send(&self, v: T) { + let len = self.queues.len(); + let idx = self.next.fetch_add(1, Ordering::SeqCst); + match self.queues[idx % len].lock().unwrap().as_ref() { + Some(que) => { + // TODO: consider best way to propergate Result + let _ = que.send(v); + } + _ => (), + } + } + + pub fn close(&self) { + for i in 0..self.queues.len() { + let queue = &self.queues[i]; + *queue.lock().unwrap() = None; + } + } +} |