diff options
Diffstat (limited to '')
-rw-r--r-- | Cargo.lock | 21 | ||||
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | src/main.rs | 9 | ||||
-rw-r--r-- | src/wireguard/queue.rs | 47 | ||||
-rw-r--r-- | src/wireguard/router/inbound.rs | 2 | ||||
-rw-r--r-- | src/wireguard/router/outbound.rs | 2 | ||||
-rw-r--r-- | src/wireguard/router/pool.rs | 7 | ||||
-rw-r--r-- | src/wireguard/router/workers.rs | 5 |
8 files changed, 43 insertions, 51 deletions
@@ -176,6 +176,24 @@ dependencies = [ ] [[package]] +name = "crossbeam-channel" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "crossbeam-utils" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "autocfg 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "cfg-if 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] name = "crypto-mac" version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1212,6 +1230,7 @@ dependencies = [ "chacha20poly1305 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "clear_on_drop 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", "cpuprofiler 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-channel 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "daemonize 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "digest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1303,6 +1322,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum clear_on_drop 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "97276801e127ffb46b66ce23f35cc96bd454fa311294bced4bbace7baa8b1d17" "checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" "checksum cpuprofiler 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "43f8479dbcfd2bbaa0c0c26779b913052b375981cdf533091f2127ea3d42e52b" +"checksum crossbeam-channel 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "acec9a3b0b3559f15aee4f90746c4e5e293b701c0f7d3925d24e01645267b68c" +"checksum crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ce446db02cdc3165b94ae73111e570793400d0794e46125cc4056c81cbb039f4" "checksum crypto-mac 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4434400df11d95d556bac068ddfedd482915eb18fe8bea89bc80b6e4b1c179e5" "checksum curve25519-dalek 1.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8b7dcd30ba50cdf88b55b033456138b7c0ac4afdc436d82e1b79f370f24cc66d" "checksum daemonize 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "70c24513e34f53b640819f0ac9f705b673fcf4006d7aab8778bee72ebfc89815" @@ -26,6 +26,7 @@ clear_on_drop = "0.2.3" env_logger = "0.6" num_cpus = "^1.10" daemonize = "0.4.1" +crossbeam-channel = "0.4" cpuprofiler = { version = "*", optional = true } [target.'cfg(unix)'.dependencies] diff --git a/src/main.rs b/src/main.rs index 087b54b..59b21e5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -180,14 +180,11 @@ fn main() { }); } Err(err) => { - log::info!("UAPI error: {}", err); - break; + log::info!("UAPI connection error: {}", err); + profiler_stop(); + exit(0); } } - - // exit - profiler_stop(); - exit(0); }); // block until all tun readers closed diff --git a/src/wireguard/queue.rs b/src/wireguard/queue.rs index f484320..4c004c4 100644 --- a/src/wireguard/queue.rs +++ b/src/wireguard/queue.rs @@ -1,19 +1,8 @@ -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::mpsc::sync_channel; -use std::sync::mpsc::{Receiver, SyncSender}; +use crossbeam_channel::{bounded, Receiver, Sender}; use std::sync::Mutex; -/// A simple parallel queue used to pass work to a worker pool. -/// -/// Unlike e.g. the crossbeam multi-producer multi-consumer queue -/// the ParallelQueue offers fewer features and instead improves speed: -/// -/// The crossbeam channel ensures that elements are consumed -/// even if not every Receiver is being read from. -/// This is not ensured by ParallelQueue. pub struct ParallelQueue<T> { - next: AtomicUsize, // next round-robin index - queues: Vec<Mutex<Option<SyncSender<T>>>>, // work queues (1 per thread) + queue: Mutex<Option<Sender<T>>>, // work queues (1 per thread) } impl<T> ParallelQueue<T> { @@ -25,40 +14,26 @@ impl<T> ParallelQueue<T> { /// - `capacity`: capacity of each internal queue /// pub fn new(queues: usize, capacity: usize) -> (Self, Vec<Receiver<T>>) { - let mut rxs = vec![]; - let mut txs = vec![]; - + let mut receivers = Vec::with_capacity(queues); + let (tx, rx) = bounded(capacity); for _ in 0..queues { - let (tx, rx) = sync_channel(capacity); - txs.push(Mutex::new(Some(tx))); - rxs.push(rx); + receivers.push(rx.clone()); } - ( ParallelQueue { - next: AtomicUsize::new(0), - queues: txs, + queue: Mutex::new(Some(tx)), }, - rxs, + receivers, ) } pub fn send(&self, v: T) { - let len = self.queues.len(); - let idx = self.next.fetch_add(1, Ordering::SeqCst); - match self.queues[idx % len].lock().unwrap().as_ref() { - Some(que) => { - // TODO: consider best way to propergate Result - let _ = que.send(v); - } - _ => (), - } + self.queue.lock().unwrap().as_ref().map(|s| { + let _ = s.send(v); + }); } pub fn close(&self) { - for i in 0..self.queues.len() { - let queue = &self.queues[i]; - *queue.lock().unwrap() = None; - } + *self.queue.lock().unwrap() = None; } } diff --git a/src/wireguard/router/inbound.rs b/src/wireguard/router/inbound.rs index db6d3f3..96c2e33 100644 --- a/src/wireguard/router/inbound.rs +++ b/src/wireguard/router/inbound.rs @@ -8,12 +8,12 @@ use super::runq::RunQueue; use super::types::Callbacks; use super::{tun, udp, Endpoint}; +use crossbeam_channel::Receiver; use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305}; use zerocopy::{AsBytes, LayoutVerified}; use std::mem; use std::sync::atomic::Ordering; -use std::sync::mpsc::Receiver; use std::sync::Arc; pub const SIZE_TAG: usize = 16; diff --git a/src/wireguard/router/outbound.rs b/src/wireguard/router/outbound.rs index a555ecb..a0a1c72 100644 --- a/src/wireguard/router/outbound.rs +++ b/src/wireguard/router/outbound.rs @@ -8,9 +8,9 @@ use super::KeyPair; use super::REJECT_AFTER_MESSAGES; use super::{tun, udp, Endpoint}; -use std::sync::mpsc::Receiver; use std::sync::Arc; +use crossbeam_channel::Receiver; use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305}; use zerocopy::{AsBytes, LayoutVerified}; diff --git a/src/wireguard/router/pool.rs b/src/wireguard/router/pool.rs index 686c788..3fc0026 100644 --- a/src/wireguard/router/pool.rs +++ b/src/wireguard/router/pool.rs @@ -1,9 +1,10 @@ -use arraydeque::ArrayDeque; -use spin::{Mutex, MutexGuard}; use std::mem; -use std::sync::mpsc::Receiver; use std::sync::Arc; +use arraydeque::ArrayDeque; +use crossbeam_channel::Receiver; +use spin::{Mutex, MutexGuard}; + use super::constants::INORDER_QUEUE_SIZE; use super::runq::{RunQueue, ToKey}; diff --git a/src/wireguard/router/workers.rs b/src/wireguard/router/workers.rs index 3ed6311..43464a0 100644 --- a/src/wireguard/router/workers.rs +++ b/src/wireguard/router/workers.rs @@ -1,13 +1,10 @@ -use std::sync::mpsc::Receiver; use std::sync::Arc; -use futures::sync::oneshot; -use futures::*; - use log::{debug, trace}; use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305}; +use crossbeam_channel::Receiver; use std::sync::atomic::Ordering; use zerocopy::{AsBytes, LayoutVerified}; |