diff options
-rw-r--r-- | Cargo.lock | 10 | ||||
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | src/router/peer.rs | 50 | ||||
-rw-r--r-- | src/wireguard.rs | 55 |
4 files changed, 90 insertions, 26 deletions
@@ -178,6 +178,14 @@ dependencies = [ ] [[package]] +name = "crossbeam-channel" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "crossbeam-utils 0.6.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] name = "crossbeam-deque" version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1582,6 +1590,7 @@ dependencies = [ "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "chacha20poly1305 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "clear_on_drop 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-channel 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "crossbeam-deque 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "digest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1676,6 +1685,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum chacha20poly1305 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "40cd3ddeae0b0ea7fe848a06e4fbf3f02463648b9395bd1139368ce42b44543e" "checksum clear_on_drop 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "97276801e127ffb46b66ce23f35cc96bd454fa311294bced4bbace7baa8b1d17" "checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" +"checksum crossbeam-channel 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "c8ec7fcd21571dc78f96cc96243cab8d8f035247c3efd16c687be154c3fa9efa" "checksum crossbeam-deque 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b18cd2e169ad86297e6bc0ad9aa679aee9daa4f19e8163860faf7c164e4f5a71" "checksum crossbeam-epoch 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "fedcd6772e37f3da2a9af9bf12ebe046c0dfe657992377b4df982a2b54cd37a9" "checksum crossbeam-queue 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7c979cd6cfe72335896575c6b5688da489e420d36a27a0b9eb0c73db574b4a4b" @@ -22,6 +22,7 @@ futures = "0.1.28" arraydeque = "0.4.5" treebitmap = "^0.4" crossbeam-deque = "0.7" +crossbeam-channel = "0.3.9" hjul = "0.1.2" ring = "0.16.7" chacha20poly1305 = "^0.1" diff --git a/src/router/peer.rs b/src/router/peer.rs index 728be11..952e439 100644 --- a/src/router/peer.rs +++ b/src/router/peer.rs @@ -308,28 +308,40 @@ impl<C: Callbacks, T: Tun, B: Bind> PeerInner<C, T, B> { let mut header: LayoutVerified<&mut [u8], TransportHeader> = header; // check if has key - let key = match self.ekey.lock().as_mut() { - None => { - // add to staged packets (create no job) - debug!("execute callback: call_need_key"); - C::need_key(&self.opaque); + let key = { + let mut ekey = self.ekey.lock(); + let key = match ekey.as_mut() { + None => None, + Some(mut state) => { + // avoid integer overflow in nonce + if state.nonce >= REJECT_AFTER_MESSAGES - 1 { + *ekey = None; + None + } else { + // there should be no stacked packets lingering around + debug_assert_eq!(self.staged_packets.lock().len(), 0); + debug!("encryption state available, nonce = {}", state.nonce); + + // set transport message fields + header.f_counter.set(state.nonce); + header.f_receiver.set(state.id); + state.nonce += 1; + Some(state.key) + } + } + }; + + // If not suitable key was found: + // 1. Stage packet for later transmission + // 2. Request new key + if key.is_none() { self.staged_packets.lock().push_back(msg); + C::need_key(&self.opaque); return None; - } - Some(mut state) => { - // avoid integer overflow in nonce - if state.nonce >= REJECT_AFTER_MESSAGES - 1 { - return None; - } - debug!("encryption state available, nonce = {}", state.nonce); + }; - // set transport message fields - header.f_counter.set(state.nonce); - header.f_receiver.set(state.id); - state.nonce += 1; - state.key - } - }; + key + }?; // add job to in-order queue and return sendeer to device for inclusion in worker pool let (tx, rx) = oneshot(); diff --git a/src/wireguard.rs b/src/wireguard.rs index 0bd5da7..71b981e 100644 --- a/src/wireguard.rs +++ b/src/wireguard.rs @@ -2,12 +2,20 @@ use crate::handshake; use crate::router; use crate::types::{Bind, Tun}; -use byteorder::{ByteOrder, LittleEndian}; - +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::mpsc::sync_channel; +use std::sync::Arc; use std::thread; +use std::time::{Duration, Instant}; +use byteorder::{ByteOrder, LittleEndian}; +use crossbeam_channel::bounded; use x25519_dalek::StaticSecret; +const SIZE_HANDSHAKE_QUEUE: usize = 128; +const THRESHOLD_UNDER_LOAD: usize = SIZE_HANDSHAKE_QUEUE / 4; +const DURATION_UNDER_LOAD: Duration = Duration::from_millis(10_000); + pub struct Timers {} pub struct Events(); @@ -23,18 +31,30 @@ impl router::Callbacks for Events { } pub struct Wireguard<T: Tun, B: Bind> { - router: router::Device<Events, T, B>, - handshake: Option<handshake::Device<()>>, + router: Arc<router::Device<Events, T, B>>, + handshake: Option<Arc<handshake::Device<()>>>, } impl<T: Tun, B: Bind> Wireguard<T, B> { + fn start(&self) {} + fn new(tun: T, bind: B) -> Wireguard<T, B> { - let router = router::Device::new(num_cpus::get(), tun.clone(), bind.clone()); + let router = Arc::new(router::Device::new( + num_cpus::get(), + tun.clone(), + bind.clone(), + )); + + let handshake_staged = Arc::new(AtomicUsize::new(0)); // start UDP read IO thread + let (handshake_tx, handshake_rx) = bounded(128); { let tun = tun.clone(); thread::spawn(move || { + let mut under_load = + Instant::now() - DURATION_UNDER_LOAD - Duration::from_millis(1000); + loop { // read UDP packet into vector let size = tun.mtu() + 148; // maximum message size @@ -45,14 +65,27 @@ impl<T: Tun, B: Bind> Wireguard<T, B> { msg.truncate(size); // message type de-multiplexer - if msg.len() < 4 { + if msg.len() < std::mem::size_of::<u32>() { continue; } + match LittleEndian::read_u32(&msg[..]) { handshake::TYPE_COOKIE_REPLY | handshake::TYPE_INITIATION | handshake::TYPE_RESPONSE => { - // handshake message + // detect if under load + if handshake_staged.fetch_add(1, Ordering::SeqCst) + > THRESHOLD_UNDER_LOAD + { + under_load = Instant::now() + } + + // pass source address along if under load + if under_load.elapsed() < DURATION_UNDER_LOAD { + handshake_tx.send((msg, Some(src))).unwrap(); + } else { + handshake_tx.send((msg, None)).unwrap(); + } } router::TYPE_TRANSPORT => { // transport message @@ -63,6 +96,14 @@ impl<T: Tun, B: Bind> Wireguard<T, B> { }); } + // start handshake workers + for _ in 0..num_cpus::get() { + let handshake_rx = handshake_rx.clone(); + thread::spawn(move || loop { + let (msg, src) = handshake_rx.recv().unwrap(); // TODO handle error + }); + } + // start TUN read IO thread thread::spawn(move || {}); |