From fd3ba63e80e7f97ae89ab5938fe6594c2d1c0869 Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Mon, 16 Dec 2019 15:26:15 +0100 Subject: Revert to crossbeam --- Cargo.lock | 21 ++++++++++++++++++ Cargo.toml | 1 + src/main.rs | 9 +++----- src/wireguard/queue.rs | 47 ++++++++++------------------------------ src/wireguard/router/inbound.rs | 2 +- src/wireguard/router/outbound.rs | 2 +- src/wireguard/router/pool.rs | 7 +++--- src/wireguard/router/workers.rs | 5 +---- 8 files changed, 43 insertions(+), 51 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b4a076d..1b1feb6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -175,6 +175,24 @@ dependencies = [ "pkg-config 0.3.17 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "crossbeam-channel" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "crossbeam-utils" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "autocfg 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "cfg-if 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "crypto-mac" version = "0.7.0" @@ -1212,6 +1230,7 @@ dependencies = [ "chacha20poly1305 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "clear_on_drop 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", "cpuprofiler 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-channel 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "daemonize 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "digest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1303,6 +1322,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum clear_on_drop 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "97276801e127ffb46b66ce23f35cc96bd454fa311294bced4bbace7baa8b1d17" "checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" "checksum cpuprofiler 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "43f8479dbcfd2bbaa0c0c26779b913052b375981cdf533091f2127ea3d42e52b" +"checksum crossbeam-channel 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "acec9a3b0b3559f15aee4f90746c4e5e293b701c0f7d3925d24e01645267b68c" +"checksum crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ce446db02cdc3165b94ae73111e570793400d0794e46125cc4056c81cbb039f4" "checksum crypto-mac 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4434400df11d95d556bac068ddfedd482915eb18fe8bea89bc80b6e4b1c179e5" "checksum curve25519-dalek 1.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8b7dcd30ba50cdf88b55b033456138b7c0ac4afdc436d82e1b79f370f24cc66d" "checksum daemonize 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "70c24513e34f53b640819f0ac9f705b673fcf4006d7aab8778bee72ebfc89815" diff --git a/Cargo.toml b/Cargo.toml index 86eed49..4ef943c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ clear_on_drop = "0.2.3" env_logger = "0.6" num_cpus = "^1.10" daemonize = "0.4.1" +crossbeam-channel = "0.4" cpuprofiler = { version = "*", optional = true } [target.'cfg(unix)'.dependencies] diff --git a/src/main.rs b/src/main.rs index 087b54b..59b21e5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -180,14 +180,11 @@ fn main() { }); } Err(err) => { - log::info!("UAPI error: {}", err); - break; + log::info!("UAPI connection error: {}", err); + profiler_stop(); + exit(0); } } - - // exit - profiler_stop(); - exit(0); }); // block until all tun readers closed diff --git a/src/wireguard/queue.rs b/src/wireguard/queue.rs index f484320..4c004c4 100644 --- a/src/wireguard/queue.rs +++ b/src/wireguard/queue.rs @@ -1,19 +1,8 @@ -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::mpsc::sync_channel; -use std::sync::mpsc::{Receiver, SyncSender}; +use crossbeam_channel::{bounded, Receiver, Sender}; use std::sync::Mutex; -/// A simple parallel queue used to pass work to a worker pool. -/// -/// Unlike e.g. the crossbeam multi-producer multi-consumer queue -/// the ParallelQueue offers fewer features and instead improves speed: -/// -/// The crossbeam channel ensures that elements are consumed -/// even if not every Receiver is being read from. -/// This is not ensured by ParallelQueue. pub struct ParallelQueue { - next: AtomicUsize, // next round-robin index - queues: Vec>>>, // work queues (1 per thread) + queue: Mutex>>, // work queues (1 per thread) } impl ParallelQueue { @@ -25,40 +14,26 @@ impl ParallelQueue { /// - `capacity`: capacity of each internal queue /// pub fn new(queues: usize, capacity: usize) -> (Self, Vec>) { - let mut rxs = vec![]; - let mut txs = vec![]; - + let mut receivers = Vec::with_capacity(queues); + let (tx, rx) = bounded(capacity); for _ in 0..queues { - let (tx, rx) = sync_channel(capacity); - txs.push(Mutex::new(Some(tx))); - rxs.push(rx); + receivers.push(rx.clone()); } - ( ParallelQueue { - next: AtomicUsize::new(0), - queues: txs, + queue: Mutex::new(Some(tx)), }, - rxs, + receivers, ) } pub fn send(&self, v: T) { - let len = self.queues.len(); - let idx = self.next.fetch_add(1, Ordering::SeqCst); - match self.queues[idx % len].lock().unwrap().as_ref() { - Some(que) => { - // TODO: consider best way to propergate Result - let _ = que.send(v); - } - _ => (), - } + self.queue.lock().unwrap().as_ref().map(|s| { + let _ = s.send(v); + }); } pub fn close(&self) { - for i in 0..self.queues.len() { - let queue = &self.queues[i]; - *queue.lock().unwrap() = None; - } + *self.queue.lock().unwrap() = None; } } diff --git a/src/wireguard/router/inbound.rs b/src/wireguard/router/inbound.rs index db6d3f3..96c2e33 100644 --- a/src/wireguard/router/inbound.rs +++ b/src/wireguard/router/inbound.rs @@ -8,12 +8,12 @@ use super::runq::RunQueue; use super::types::Callbacks; use super::{tun, udp, Endpoint}; +use crossbeam_channel::Receiver; use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305}; use zerocopy::{AsBytes, LayoutVerified}; use std::mem; use std::sync::atomic::Ordering; -use std::sync::mpsc::Receiver; use std::sync::Arc; pub const SIZE_TAG: usize = 16; diff --git a/src/wireguard/router/outbound.rs b/src/wireguard/router/outbound.rs index a555ecb..a0a1c72 100644 --- a/src/wireguard/router/outbound.rs +++ b/src/wireguard/router/outbound.rs @@ -8,9 +8,9 @@ use super::KeyPair; use super::REJECT_AFTER_MESSAGES; use super::{tun, udp, Endpoint}; -use std::sync::mpsc::Receiver; use std::sync::Arc; +use crossbeam_channel::Receiver; use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305}; use zerocopy::{AsBytes, LayoutVerified}; diff --git a/src/wireguard/router/pool.rs b/src/wireguard/router/pool.rs index 686c788..3fc0026 100644 --- a/src/wireguard/router/pool.rs +++ b/src/wireguard/router/pool.rs @@ -1,9 +1,10 @@ -use arraydeque::ArrayDeque; -use spin::{Mutex, MutexGuard}; use std::mem; -use std::sync::mpsc::Receiver; use std::sync::Arc; +use arraydeque::ArrayDeque; +use crossbeam_channel::Receiver; +use spin::{Mutex, MutexGuard}; + use super::constants::INORDER_QUEUE_SIZE; use super::runq::{RunQueue, ToKey}; diff --git a/src/wireguard/router/workers.rs b/src/wireguard/router/workers.rs index 3ed6311..43464a0 100644 --- a/src/wireguard/router/workers.rs +++ b/src/wireguard/router/workers.rs @@ -1,13 +1,10 @@ -use std::sync::mpsc::Receiver; use std::sync::Arc; -use futures::sync::oneshot; -use futures::*; - use log::{debug, trace}; use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305}; +use crossbeam_channel::Receiver; use std::sync::atomic::Ordering; use zerocopy::{AsBytes, LayoutVerified}; -- cgit v1.2.3-59-g8ed1b