From 656679638750f84d1c8b75d8c44974ed45d36092 Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Tue, 10 Dec 2019 18:17:48 +0100 Subject: Remove crossbeam dependency --- src/wireguard/wireguard.rs | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) (limited to 'src/wireguard/wireguard.rs') diff --git a/src/wireguard/wireguard.rs b/src/wireguard/wireguard.rs index 2b0e779..d0c0e53 100644 --- a/src/wireguard/wireguard.rs +++ b/src/wireguard/wireguard.rs @@ -4,6 +4,7 @@ use super::router; use super::timers::{Events, Timers}; use super::{Peer, PeerInner}; +use super::queue::ParallelQueue; use super::tun; use super::tun::Reader as TunReader; @@ -35,7 +36,6 @@ use rand::Rng; use spin::{Mutex, RwLock}; use byteorder::{ByteOrder, LittleEndian}; -use crossbeam_channel::{bounded, Sender}; use x25519_dalek::{PublicKey, StaticSecret}; const SIZE_HANDSHAKE_QUEUE: usize = 128; @@ -99,7 +99,7 @@ pub struct WireguardInner { handshake: RwLock, under_load: AtomicBool, pending: AtomicUsize, // num of pending handshake packets in queue - queue: Mutex>>, + queue: ParallelQueue>, } impl PeerInner { @@ -123,7 +123,7 @@ impl PeerInner { if !self.handshake_queued.swap(true, Ordering::SeqCst) { self.wg.pending.fetch_add(1, Ordering::SeqCst); - self.queue.lock().send(HandshakeJob::New(self.pk)).unwrap(); + self.wg.queue.send(HandshakeJob::New(self.pk)); } } } @@ -288,7 +288,6 @@ impl Wireguard { walltime_last_handshake: Mutex::new(None), last_handshake_sent: Mutex::new(Instant::now() - TIME_HORIZON), handshake_queued: AtomicBool::new(false), - queue: Mutex::new(self.state.queue.lock().clone()), rx_bytes: AtomicU64::new(0), tx_bytes: AtomicU64::new(0), timers: RwLock::new(Timers::dummy(&self.runner)), @@ -386,10 +385,7 @@ impl Wireguard { } // add to handshake queue - wg.queue - .lock() - .send(HandshakeJob::Message(msg, src)) - .unwrap(); + wg.queue.send(HandshakeJob::Message(msg, src)); } router::TYPE_TRANSPORT => { debug!("{} : reader, received transport message", wg); @@ -477,12 +473,14 @@ impl Wireguard { } pub fn new(writer: T::Writer) -> Wireguard { + // workers equal to number of physical cores + let cpus = num_cpus::get(); + // create device state let mut rng = OsRng::new().unwrap(); // handshake queue - let (tx, rx): (Sender>, _) = bounded(SIZE_HANDSHAKE_QUEUE); - + let (tx, mut rxs) = ParallelQueue::new(cpus, 128); let wg = Arc::new(WireguardInner { enabled: RwLock::new(false), tun_readers: WaitHandle::new(), @@ -494,13 +492,12 @@ impl Wireguard { pending: AtomicUsize::new(0), handshake: RwLock::new(handshake::Device::new()), under_load: AtomicBool::new(false), - queue: Mutex::new(tx), + queue: tx, }); // start handshake workers - for _ in 0..num_cpus::get() { + while let Some(rx) = rxs.pop() { let wg = wg.clone(); - let rx = rx.clone(); thread::spawn(move || { debug!("{} : handshake worker, started", wg); @@ -509,16 +506,18 @@ impl Wireguard { // process elements from the handshake queue for job in rx { - // decrement pending + // decrement pending pakcets (under_load) + let job: HandshakeJob = job; wg.pending.fetch_sub(1, Ordering::SeqCst); - let device = wg.handshake.read(); + // demultiplex staged handshake jobs and handshake messages match job { HandshakeJob::Message(msg, src) => { // feed message to handshake device let src_validate = (&src).into_address(); // TODO avoid // process message + let device = wg.handshake.read(); match device.process( &mut rng, &msg[..], @@ -599,6 +598,7 @@ impl Wireguard { "{} : handshake worker, new handshake requested for {}", wg, peer ); + let device = wg.handshake.read(); let _ = device.begin(&mut rng, &peer.pk).map(|msg| { let _ = peer.router.send(&msg[..]).map_err(|e| { debug!("{} : handshake worker, failed to send handshake initiation, error = {}", wg, e) -- cgit v1.2.3-59-g8ed1b