diff options
author | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-12-16 15:26:15 +0100 |
---|---|---|
committer | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-12-16 15:26:15 +0100 |
commit | fd3ba63e80e7f97ae89ab5938fe6594c2d1c0869 (patch) | |
tree | eb4d998b2e2b19440466c17a22d5742cd9ebdfc7 /src | |
parent | Removed unused atexit (diff) | |
download | wireguard-rs-fd3ba63e80e7f97ae89ab5938fe6594c2d1c0869.tar.xz wireguard-rs-fd3ba63e80e7f97ae89ab5938fe6594c2d1c0869.zip |
Revert to crossbeam
Diffstat (limited to 'src')
-rw-r--r-- | src/main.rs | 9 | ||||
-rw-r--r-- | src/wireguard/queue.rs | 47 | ||||
-rw-r--r-- | src/wireguard/router/inbound.rs | 2 | ||||
-rw-r--r-- | src/wireguard/router/outbound.rs | 2 | ||||
-rw-r--r-- | src/wireguard/router/pool.rs | 7 | ||||
-rw-r--r-- | src/wireguard/router/workers.rs | 5 |
6 files changed, 21 insertions, 51 deletions
diff --git a/src/main.rs b/src/main.rs index 087b54b..59b21e5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -180,14 +180,11 @@ fn main() { }); } Err(err) => { - log::info!("UAPI error: {}", err); - break; + log::info!("UAPI connection error: {}", err); + profiler_stop(); + exit(0); } } - - // exit - profiler_stop(); - exit(0); }); // block until all tun readers closed diff --git a/src/wireguard/queue.rs b/src/wireguard/queue.rs index f484320..4c004c4 100644 --- a/src/wireguard/queue.rs +++ b/src/wireguard/queue.rs @@ -1,19 +1,8 @@ -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::mpsc::sync_channel; -use std::sync::mpsc::{Receiver, SyncSender}; +use crossbeam_channel::{bounded, Receiver, Sender}; use std::sync::Mutex; -/// A simple parallel queue used to pass work to a worker pool. -/// -/// Unlike e.g. the crossbeam multi-producer multi-consumer queue -/// the ParallelQueue offers fewer features and instead improves speed: -/// -/// The crossbeam channel ensures that elements are consumed -/// even if not every Receiver is being read from. -/// This is not ensured by ParallelQueue. pub struct ParallelQueue<T> { - next: AtomicUsize, // next round-robin index - queues: Vec<Mutex<Option<SyncSender<T>>>>, // work queues (1 per thread) + queue: Mutex<Option<Sender<T>>>, // work queues (1 per thread) } impl<T> ParallelQueue<T> { @@ -25,40 +14,26 @@ impl<T> ParallelQueue<T> { /// - `capacity`: capacity of each internal queue /// pub fn new(queues: usize, capacity: usize) -> (Self, Vec<Receiver<T>>) { - let mut rxs = vec![]; - let mut txs = vec![]; - + let mut receivers = Vec::with_capacity(queues); + let (tx, rx) = bounded(capacity); for _ in 0..queues { - let (tx, rx) = sync_channel(capacity); - txs.push(Mutex::new(Some(tx))); - rxs.push(rx); + receivers.push(rx.clone()); } - ( ParallelQueue { - next: AtomicUsize::new(0), - queues: txs, + queue: Mutex::new(Some(tx)), }, - rxs, + receivers, ) } pub fn send(&self, v: T) { - let len = self.queues.len(); - let idx = self.next.fetch_add(1, Ordering::SeqCst); - match self.queues[idx % len].lock().unwrap().as_ref() { - Some(que) => { - // TODO: consider best way to propergate Result - let _ = que.send(v); - } - _ => (), - } + self.queue.lock().unwrap().as_ref().map(|s| { + let _ = s.send(v); + }); } pub fn close(&self) { - for i in 0..self.queues.len() { - let queue = &self.queues[i]; - *queue.lock().unwrap() = None; - } + *self.queue.lock().unwrap() = None; } } diff --git a/src/wireguard/router/inbound.rs b/src/wireguard/router/inbound.rs index db6d3f3..96c2e33 100644 --- a/src/wireguard/router/inbound.rs +++ b/src/wireguard/router/inbound.rs @@ -8,12 +8,12 @@ use super::runq::RunQueue; use super::types::Callbacks; use super::{tun, udp, Endpoint}; +use crossbeam_channel::Receiver; use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305}; use zerocopy::{AsBytes, LayoutVerified}; use std::mem; use std::sync::atomic::Ordering; -use std::sync::mpsc::Receiver; use std::sync::Arc; pub const SIZE_TAG: usize = 16; diff --git a/src/wireguard/router/outbound.rs b/src/wireguard/router/outbound.rs index a555ecb..a0a1c72 100644 --- a/src/wireguard/router/outbound.rs +++ b/src/wireguard/router/outbound.rs @@ -8,9 +8,9 @@ use super::KeyPair; use super::REJECT_AFTER_MESSAGES; use super::{tun, udp, Endpoint}; -use std::sync::mpsc::Receiver; use std::sync::Arc; +use crossbeam_channel::Receiver; use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305}; use zerocopy::{AsBytes, LayoutVerified}; diff --git a/src/wireguard/router/pool.rs b/src/wireguard/router/pool.rs index 686c788..3fc0026 100644 --- a/src/wireguard/router/pool.rs +++ b/src/wireguard/router/pool.rs @@ -1,9 +1,10 @@ -use arraydeque::ArrayDeque; -use spin::{Mutex, MutexGuard}; use std::mem; -use std::sync::mpsc::Receiver; use std::sync::Arc; +use arraydeque::ArrayDeque; +use crossbeam_channel::Receiver; +use spin::{Mutex, MutexGuard}; + use super::constants::INORDER_QUEUE_SIZE; use super::runq::{RunQueue, ToKey}; diff --git a/src/wireguard/router/workers.rs b/src/wireguard/router/workers.rs index 3ed6311..43464a0 100644 --- a/src/wireguard/router/workers.rs +++ b/src/wireguard/router/workers.rs @@ -1,13 +1,10 @@ -use std::sync::mpsc::Receiver; use std::sync::Arc; -use futures::sync::oneshot; -use futures::*; - use log::{debug, trace}; use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305}; +use crossbeam_channel::Receiver; use std::sync::atomic::Ordering; use zerocopy::{AsBytes, LayoutVerified}; |