From 656679638750f84d1c8b75d8c44974ed45d36092 Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Tue, 10 Dec 2019 18:17:48 +0100 Subject: Remove crossbeam dependency --- src/wireguard/mod.rs | 2 +- src/wireguard/peer.rs | 3 -- src/wireguard/queue.rs | 64 ++++++++++++++++++++++++++++++++++++++++++ src/wireguard/router/device.rs | 6 ++-- src/wireguard/router/mod.rs | 4 +-- src/wireguard/router/queue.rs | 46 ------------------------------ src/wireguard/wireguard.rs | 30 ++++++++++---------- 7 files changed, 84 insertions(+), 71 deletions(-) create mode 100644 src/wireguard/queue.rs delete mode 100644 src/wireguard/router/queue.rs (limited to 'src') diff --git a/src/wireguard/mod.rs b/src/wireguard/mod.rs index 711aa2b..f899359 100644 --- a/src/wireguard/mod.rs +++ b/src/wireguard/mod.rs @@ -5,6 +5,7 @@ mod wireguard; mod endpoint; mod handshake; mod peer; +mod queue; mod router; mod types; @@ -23,4 +24,3 @@ use super::platform::dummy; use super::platform::{tun, udp, Endpoint}; use peer::PeerInner; use types::KeyPair; -use wireguard::HandshakeJob; diff --git a/src/wireguard/peer.rs b/src/wireguard/peer.rs index 04622fd..448db96 100644 --- a/src/wireguard/peer.rs +++ b/src/wireguard/peer.rs @@ -1,6 +1,5 @@ use super::router; use super::timers::{Events, Timers}; -use super::HandshakeJob; use super::tun::Tun; use super::udp::UDP; @@ -14,7 +13,6 @@ use std::time::{Instant, SystemTime}; use spin::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}; -use crossbeam_channel::Sender; use x25519_dalek::PublicKey; pub struct Peer { @@ -33,7 +31,6 @@ pub struct PeerInner { pub walltime_last_handshake: Mutex>, pub last_handshake_sent: Mutex, // instant for last handshake pub handshake_queued: AtomicBool, // is a handshake job currently queued for the peer? - pub queue: Mutex>>, // handshake queue // stats and configuration pub pk: PublicKey, // public key, DISCUSS: avoid this. TODO: remove diff --git a/src/wireguard/queue.rs b/src/wireguard/queue.rs new file mode 100644 index 0000000..a0fcf03 --- /dev/null +++ b/src/wireguard/queue.rs @@ -0,0 +1,64 @@ +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::mpsc::sync_channel; +use std::sync::mpsc::{Receiver, SyncSender}; +use std::sync::Mutex; + +/// A simple parallel queue used to pass work to a worker pool. +/// +/// Unlike e.g. the crossbeam multi-producer multi-consumer queue +/// the ParallelQueue offers fewer features and instead improves speed: +/// +/// The crossbeam channel ensures that elements are consumed +/// even if not every Receiver is being read from. +/// This is not ensured by ParallelQueue. +pub struct ParallelQueue { + next: AtomicUsize, // next round-robin index + queues: Vec>>>, // work queues (1 per thread) +} + +impl ParallelQueue { + /// Create a new ParallelQueue instance + /// + /// # Arguments + /// + /// - `queues`: number of readers/writers + /// - `capacity`: capacity of each internal queue + /// + pub fn new(queues: usize, capacity: usize) -> (Self, Vec>) { + let mut rxs = vec![]; + let mut txs = vec![]; + + for _ in 0..queues { + let (tx, rx) = sync_channel(capacity); + txs.push(Mutex::new(Some(tx))); + rxs.push(rx); + } + + ( + ParallelQueue { + next: AtomicUsize::new(0), + queues: txs, + }, + rxs, + ) + } + + pub fn send(&self, v: T) { + let len = self.queues.len(); + let idx = self.next.fetch_add(1, Ordering::SeqCst); + match self.queues[idx % len].lock().unwrap().as_ref() { + Some(que) => { + // TODO: consider best way to propergate Result + let _ = que.send(v); + } + _ => (), + } + } + + pub fn close(&self) { + for i in 0..self.queues.len() { + let queue = &self.queues[i]; + *queue.lock().unwrap() = None; + } + } +} diff --git a/src/wireguard/router/device.rs b/src/wireguard/router/device.rs index febea45..1d3b743 100644 --- a/src/wireguard/router/device.rs +++ b/src/wireguard/router/device.rs @@ -24,7 +24,7 @@ use super::route::RoutingTable; use super::runq::RunQueue; use super::super::{tun, udp, Endpoint, KeyPair}; -use super::queue::ParallelQueue; +use super::ParallelQueue; pub struct DeviceInner> { // inbound writer (TUN) @@ -125,8 +125,8 @@ impl> Drop impl> DeviceHandle { pub fn new(num_workers: usize, tun: T) -> DeviceHandle { // allocate shared device state - let (mut outrx, queue_outbound) = ParallelQueue::new(num_workers); - let (mut inrx, queue_inbound) = ParallelQueue::new(num_workers); + let (queue_outbound, mut outrx) = ParallelQueue::new(num_workers, 128); + let (queue_inbound, mut inrx) = ParallelQueue::new(num_workers, 128); let device = Device { inner: Arc::new(DeviceInner { inbound: tun, diff --git a/src/wireguard/router/mod.rs b/src/wireguard/router/mod.rs index 49a4f96..8238d32 100644 --- a/src/wireguard/router/mod.rs +++ b/src/wireguard/router/mod.rs @@ -7,13 +7,10 @@ mod messages; mod outbound; mod peer; mod pool; -mod queue; mod route; mod runq; mod types; -// mod workers; - #[cfg(test)] mod tests; @@ -21,6 +18,7 @@ use messages::TransportHeader; use std::mem; use super::constants::REJECT_AFTER_MESSAGES; +use super::queue::ParallelQueue; use super::types::*; use super::{tun, udp, Endpoint}; diff --git a/src/wireguard/router/queue.rs b/src/wireguard/router/queue.rs deleted file mode 100644 index 5d0165c..0000000 --- a/src/wireguard/router/queue.rs +++ /dev/null @@ -1,46 +0,0 @@ -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::mpsc::sync_channel; -use std::sync::mpsc::{Receiver, SyncSender}; - -use spin::Mutex; - -pub struct ParallelQueue { - next: AtomicUsize, // next round-robin index - queues: Vec>>, // work queues (1 per thread) -} - -impl ParallelQueue { - pub fn new(queues: usize) -> (Vec>, Self) { - let mut rxs = vec![]; - let mut txs = vec![]; - - for _ in 0..queues { - let (tx, rx) = sync_channel(128); - txs.push(Mutex::new(tx)); - rxs.push(rx); - } - - ( - rxs, - ParallelQueue { - next: AtomicUsize::new(0), - queues: txs, - }, - ) - } - - pub fn send(&self, v: T) { - let len = self.queues.len(); - let idx = self.next.fetch_add(1, Ordering::SeqCst); - let que = self.queues[idx % len].lock(); - que.send(v).unwrap(); - } - - pub fn close(&self) { - for i in 0..self.queues.len() { - let (tx, _) = sync_channel(0); - let queue = &self.queues[i]; - *queue.lock() = tx; - } - } -} diff --git a/src/wireguard/wireguard.rs b/src/wireguard/wireguard.rs index 2b0e779..d0c0e53 100644 --- a/src/wireguard/wireguard.rs +++ b/src/wireguard/wireguard.rs @@ -4,6 +4,7 @@ use super::router; use super::timers::{Events, Timers}; use super::{Peer, PeerInner}; +use super::queue::ParallelQueue; use super::tun; use super::tun::Reader as TunReader; @@ -35,7 +36,6 @@ use rand::Rng; use spin::{Mutex, RwLock}; use byteorder::{ByteOrder, LittleEndian}; -use crossbeam_channel::{bounded, Sender}; use x25519_dalek::{PublicKey, StaticSecret}; const SIZE_HANDSHAKE_QUEUE: usize = 128; @@ -99,7 +99,7 @@ pub struct WireguardInner { handshake: RwLock, under_load: AtomicBool, pending: AtomicUsize, // num of pending handshake packets in queue - queue: Mutex>>, + queue: ParallelQueue>, } impl PeerInner { @@ -123,7 +123,7 @@ impl PeerInner { if !self.handshake_queued.swap(true, Ordering::SeqCst) { self.wg.pending.fetch_add(1, Ordering::SeqCst); - self.queue.lock().send(HandshakeJob::New(self.pk)).unwrap(); + self.wg.queue.send(HandshakeJob::New(self.pk)); } } } @@ -288,7 +288,6 @@ impl Wireguard { walltime_last_handshake: Mutex::new(None), last_handshake_sent: Mutex::new(Instant::now() - TIME_HORIZON), handshake_queued: AtomicBool::new(false), - queue: Mutex::new(self.state.queue.lock().clone()), rx_bytes: AtomicU64::new(0), tx_bytes: AtomicU64::new(0), timers: RwLock::new(Timers::dummy(&self.runner)), @@ -386,10 +385,7 @@ impl Wireguard { } // add to handshake queue - wg.queue - .lock() - .send(HandshakeJob::Message(msg, src)) - .unwrap(); + wg.queue.send(HandshakeJob::Message(msg, src)); } router::TYPE_TRANSPORT => { debug!("{} : reader, received transport message", wg); @@ -477,12 +473,14 @@ impl Wireguard { } pub fn new(writer: T::Writer) -> Wireguard { + // workers equal to number of physical cores + let cpus = num_cpus::get(); + // create device state let mut rng = OsRng::new().unwrap(); // handshake queue - let (tx, rx): (Sender>, _) = bounded(SIZE_HANDSHAKE_QUEUE); - + let (tx, mut rxs) = ParallelQueue::new(cpus, 128); let wg = Arc::new(WireguardInner { enabled: RwLock::new(false), tun_readers: WaitHandle::new(), @@ -494,13 +492,12 @@ impl Wireguard { pending: AtomicUsize::new(0), handshake: RwLock::new(handshake::Device::new()), under_load: AtomicBool::new(false), - queue: Mutex::new(tx), + queue: tx, }); // start handshake workers - for _ in 0..num_cpus::get() { + while let Some(rx) = rxs.pop() { let wg = wg.clone(); - let rx = rx.clone(); thread::spawn(move || { debug!("{} : handshake worker, started", wg); @@ -509,16 +506,18 @@ impl Wireguard { // process elements from the handshake queue for job in rx { - // decrement pending + // decrement pending pakcets (under_load) + let job: HandshakeJob = job; wg.pending.fetch_sub(1, Ordering::SeqCst); - let device = wg.handshake.read(); + // demultiplex staged handshake jobs and handshake messages match job { HandshakeJob::Message(msg, src) => { // feed message to handshake device let src_validate = (&src).into_address(); // TODO avoid // process message + let device = wg.handshake.read(); match device.process( &mut rng, &msg[..], @@ -599,6 +598,7 @@ impl Wireguard { "{} : handshake worker, new handshake requested for {}", wg, peer ); + let device = wg.handshake.read(); let _ = device.begin(&mut rng, &peer.pk).map(|msg| { let _ = peer.router.send(&msg[..]).map_err(|e| { debug!("{} : handshake worker, failed to send handshake initiation, error = {}", wg, e) -- cgit v1.2.3-59-g8ed1b