diff options
author | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-09-15 21:10:23 +0200 |
---|---|---|
committer | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-09-15 21:10:23 +0200 |
commit | 32c030367cb017f0318cb97ccf27f8788acadf72 (patch) | |
tree | 6641288107f77ee34bea82301927e9af85df8c17 /src/wireguard.rs | |
parent | Sent staged packets when key-pair confirmed (diff) | |
download | wireguard-rs-32c030367cb017f0318cb97ccf27f8788acadf72.tar.xz wireguard-rs-32c030367cb017f0318cb97ccf27f8788acadf72.zip |
WIP: Handshake queue and workers
Diffstat (limited to '')
-rw-r--r-- | src/wireguard.rs | 55 |
1 files changed, 48 insertions, 7 deletions
diff --git a/src/wireguard.rs b/src/wireguard.rs index 0bd5da7..71b981e 100644 --- a/src/wireguard.rs +++ b/src/wireguard.rs @@ -2,12 +2,20 @@ use crate::handshake; use crate::router; use crate::types::{Bind, Tun}; -use byteorder::{ByteOrder, LittleEndian}; - +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::mpsc::sync_channel; +use std::sync::Arc; use std::thread; +use std::time::{Duration, Instant}; +use byteorder::{ByteOrder, LittleEndian}; +use crossbeam_channel::bounded; use x25519_dalek::StaticSecret; +const SIZE_HANDSHAKE_QUEUE: usize = 128; +const THRESHOLD_UNDER_LOAD: usize = SIZE_HANDSHAKE_QUEUE / 4; +const DURATION_UNDER_LOAD: Duration = Duration::from_millis(10_000); + pub struct Timers {} pub struct Events(); @@ -23,18 +31,30 @@ impl router::Callbacks for Events { } pub struct Wireguard<T: Tun, B: Bind> { - router: router::Device<Events, T, B>, - handshake: Option<handshake::Device<()>>, + router: Arc<router::Device<Events, T, B>>, + handshake: Option<Arc<handshake::Device<()>>>, } impl<T: Tun, B: Bind> Wireguard<T, B> { + fn start(&self) {} + fn new(tun: T, bind: B) -> Wireguard<T, B> { - let router = router::Device::new(num_cpus::get(), tun.clone(), bind.clone()); + let router = Arc::new(router::Device::new( + num_cpus::get(), + tun.clone(), + bind.clone(), + )); + + let handshake_staged = Arc::new(AtomicUsize::new(0)); // start UDP read IO thread + let (handshake_tx, handshake_rx) = bounded(128); { let tun = tun.clone(); thread::spawn(move || { + let mut under_load = + Instant::now() - DURATION_UNDER_LOAD - Duration::from_millis(1000); + loop { // read UDP packet into vector let size = tun.mtu() + 148; // maximum message size @@ -45,14 +65,27 @@ impl<T: Tun, B: Bind> Wireguard<T, B> { msg.truncate(size); // message type de-multiplexer - if msg.len() < 4 { + if msg.len() < std::mem::size_of::<u32>() { continue; } + match LittleEndian::read_u32(&msg[..]) { handshake::TYPE_COOKIE_REPLY | handshake::TYPE_INITIATION | handshake::TYPE_RESPONSE => { - // handshake message + // detect if under load + if handshake_staged.fetch_add(1, Ordering::SeqCst) + > THRESHOLD_UNDER_LOAD + { + under_load = Instant::now() + } + + // pass source address along if under load + if under_load.elapsed() < DURATION_UNDER_LOAD { + handshake_tx.send((msg, Some(src))).unwrap(); + } else { + handshake_tx.send((msg, None)).unwrap(); + } } router::TYPE_TRANSPORT => { // transport message @@ -63,6 +96,14 @@ impl<T: Tun, B: Bind> Wireguard<T, B> { }); } + // start handshake workers + for _ in 0..num_cpus::get() { + let handshake_rx = handshake_rx.clone(); + thread::spawn(move || loop { + let (msg, src) = handshake_rx.recv().unwrap(); // TODO handle error + }); + } + // start TUN read IO thread thread::spawn(move || {}); |