From 32c030367cb017f0318cb97ccf27f8788acadf72 Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Sun, 15 Sep 2019 21:10:23 +0200 Subject: WIP: Handshake queue and workers --- src/router/peer.rs | 50 ++++++++++++++++++++++++++++++------------------- src/wireguard.rs | 55 +++++++++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 79 insertions(+), 26 deletions(-) (limited to 'src') diff --git a/src/router/peer.rs b/src/router/peer.rs index 728be11..952e439 100644 --- a/src/router/peer.rs +++ b/src/router/peer.rs @@ -308,28 +308,40 @@ impl PeerInner { let mut header: LayoutVerified<&mut [u8], TransportHeader> = header; // check if has key - let key = match self.ekey.lock().as_mut() { - None => { - // add to staged packets (create no job) - debug!("execute callback: call_need_key"); - C::need_key(&self.opaque); + let key = { + let mut ekey = self.ekey.lock(); + let key = match ekey.as_mut() { + None => None, + Some(mut state) => { + // avoid integer overflow in nonce + if state.nonce >= REJECT_AFTER_MESSAGES - 1 { + *ekey = None; + None + } else { + // there should be no stacked packets lingering around + debug_assert_eq!(self.staged_packets.lock().len(), 0); + debug!("encryption state available, nonce = {}", state.nonce); + + // set transport message fields + header.f_counter.set(state.nonce); + header.f_receiver.set(state.id); + state.nonce += 1; + Some(state.key) + } + } + }; + + // If not suitable key was found: + // 1. Stage packet for later transmission + // 2. Request new key + if key.is_none() { self.staged_packets.lock().push_back(msg); + C::need_key(&self.opaque); return None; - } - Some(mut state) => { - // avoid integer overflow in nonce - if state.nonce >= REJECT_AFTER_MESSAGES - 1 { - return None; - } - debug!("encryption state available, nonce = {}", state.nonce); + }; - // set transport message fields - header.f_counter.set(state.nonce); - header.f_receiver.set(state.id); - state.nonce += 1; - state.key - } - }; + key + }?; // add job to in-order queue and return sendeer to device for inclusion in worker pool let (tx, rx) = oneshot(); diff --git a/src/wireguard.rs b/src/wireguard.rs index 0bd5da7..71b981e 100644 --- a/src/wireguard.rs +++ b/src/wireguard.rs @@ -2,12 +2,20 @@ use crate::handshake; use crate::router; use crate::types::{Bind, Tun}; -use byteorder::{ByteOrder, LittleEndian}; - +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::mpsc::sync_channel; +use std::sync::Arc; use std::thread; +use std::time::{Duration, Instant}; +use byteorder::{ByteOrder, LittleEndian}; +use crossbeam_channel::bounded; use x25519_dalek::StaticSecret; +const SIZE_HANDSHAKE_QUEUE: usize = 128; +const THRESHOLD_UNDER_LOAD: usize = SIZE_HANDSHAKE_QUEUE / 4; +const DURATION_UNDER_LOAD: Duration = Duration::from_millis(10_000); + pub struct Timers {} pub struct Events(); @@ -23,18 +31,30 @@ impl router::Callbacks for Events { } pub struct Wireguard { - router: router::Device, - handshake: Option>, + router: Arc>, + handshake: Option>>, } impl Wireguard { + fn start(&self) {} + fn new(tun: T, bind: B) -> Wireguard { - let router = router::Device::new(num_cpus::get(), tun.clone(), bind.clone()); + let router = Arc::new(router::Device::new( + num_cpus::get(), + tun.clone(), + bind.clone(), + )); + + let handshake_staged = Arc::new(AtomicUsize::new(0)); // start UDP read IO thread + let (handshake_tx, handshake_rx) = bounded(128); { let tun = tun.clone(); thread::spawn(move || { + let mut under_load = + Instant::now() - DURATION_UNDER_LOAD - Duration::from_millis(1000); + loop { // read UDP packet into vector let size = tun.mtu() + 148; // maximum message size @@ -45,14 +65,27 @@ impl Wireguard { msg.truncate(size); // message type de-multiplexer - if msg.len() < 4 { + if msg.len() < std::mem::size_of::() { continue; } + match LittleEndian::read_u32(&msg[..]) { handshake::TYPE_COOKIE_REPLY | handshake::TYPE_INITIATION | handshake::TYPE_RESPONSE => { - // handshake message + // detect if under load + if handshake_staged.fetch_add(1, Ordering::SeqCst) + > THRESHOLD_UNDER_LOAD + { + under_load = Instant::now() + } + + // pass source address along if under load + if under_load.elapsed() < DURATION_UNDER_LOAD { + handshake_tx.send((msg, Some(src))).unwrap(); + } else { + handshake_tx.send((msg, None)).unwrap(); + } } router::TYPE_TRANSPORT => { // transport message @@ -63,6 +96,14 @@ impl Wireguard { }); } + // start handshake workers + for _ in 0..num_cpus::get() { + let handshake_rx = handshake_rx.clone(); + thread::spawn(move || loop { + let (msg, src) = handshake_rx.recv().unwrap(); // TODO handle error + }); + } + // start TUN read IO thread thread::spawn(move || {}); -- cgit v1.2.3-59-g8ed1b