From dfe4a22920e31f30f0e7ceb7c0d588dd48af13ad Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Mon, 16 Sep 2019 22:33:46 +0200 Subject: WIP: Work on handshake worker --- src/handshake/device.rs | 6 ++--- src/handshake/mod.rs | 2 +- src/handshake/noise.rs | 10 +++---- src/handshake/peer.rs | 10 +++---- src/wireguard.rs | 72 ++++++++++++++++++++++++++++++++++++++++++------- 5 files changed, 76 insertions(+), 24 deletions(-) (limited to 'src') diff --git a/src/handshake/device.rs b/src/handshake/device.rs index 5396854..638d63f 100644 --- a/src/handshake/device.rs +++ b/src/handshake/device.rs @@ -4,7 +4,7 @@ use std::net::SocketAddr; use std::sync::Mutex; use zerocopy::AsBytes; -use byteorder::{LittleEndian, ByteOrder}; +use byteorder::{ByteOrder, LittleEndian}; use rand::prelude::*; @@ -35,7 +35,7 @@ pub struct Device { */ impl Device where - T: Copy, + T: Clone, { /// Initialize a new handshake state machine /// @@ -270,7 +270,7 @@ where // return unconfirmed keypair and the response as vector Ok(( - Some(peer.identifier), + Some(peer.identifier.clone()), Some(resp.as_bytes().to_owned()), Some(keys), )) diff --git a/src/handshake/mod.rs b/src/handshake/mod.rs index 8452de8..6d017cc 100644 --- a/src/handshake/mod.rs +++ b/src/handshake/mod.rs @@ -18,4 +18,4 @@ mod types; // publicly exposed interface pub use device::Device; -pub use messages::{TYPE_COOKIE_REPLY, TYPE_INITIATION, TYPE_RESPONSE }; +pub use messages::{TYPE_COOKIE_REPLY, TYPE_INITIATION, TYPE_RESPONSE}; diff --git a/src/handshake/noise.rs b/src/handshake/noise.rs index 9fc0eb4..eafb9e9 100644 --- a/src/handshake/noise.rs +++ b/src/handshake/noise.rs @@ -215,7 +215,7 @@ mod tests { } } -pub fn create_initiation( +pub fn create_initiation( rng: &mut R, device: &Device, peer: &Peer, @@ -296,7 +296,7 @@ pub fn create_initiation( }) } -pub fn consume_initiation<'a, T: Copy>( +pub fn consume_initiation<'a, T: Clone>( device: &'a Device, msg: &NoiseInitiation, ) -> Result<(&'a Peer, TemporaryState), HandshakeError> { @@ -370,7 +370,7 @@ pub fn consume_initiation<'a, T: Copy>( }) } -pub fn create_response( +pub fn create_response( rng: &mut R, peer: &Peer, sender: u32, // sending identifier @@ -456,7 +456,7 @@ pub fn create_response( * allow concurrent processing of potential responses to the initiation, * in order to better mitigate DoS from malformed response messages. */ -pub fn consume_response( +pub fn consume_response( device: &Device, msg: &NoiseResponse, ) -> Result, HandshakeError> { @@ -530,7 +530,7 @@ pub fn consume_response( // return confirmed key-pair Ok(( - Some(peer.identifier), + Some(peer.identifier.clone()), None, Some(KeyPair { birth, diff --git a/src/handshake/peer.rs b/src/handshake/peer.rs index 6f0f5af..4c6f2fd 100644 --- a/src/handshake/peer.rs +++ b/src/handshake/peer.rs @@ -55,19 +55,19 @@ pub enum State { impl Drop for State { fn drop(&mut self) { match self { - State::InitiationSent{hs, ck, ..} => { + State::InitiationSent { hs, ck, .. } => { // eph_sk already cleared by dalek-x25519 hs.clear(); ck.clear(); - }, - _ => () + } + _ => (), } } } impl Peer where - T: Copy, + T: Clone, { pub fn new( identifier: T, // external identifier @@ -141,4 +141,4 @@ where *last_initiation_consumption = Some(Instant::now()); Ok(()) } -} \ No newline at end of file +} diff --git a/src/wireguard.rs b/src/wireguard.rs index 71b981e..2c166b4 100644 --- a/src/wireguard.rs +++ b/src/wireguard.rs @@ -1,13 +1,15 @@ use crate::handshake; use crate::router; -use crate::types::{Bind, Tun}; +use crate::types::{Bind, Endpoint, Tun}; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::mpsc::sync_channel; use std::sync::Arc; use std::thread; use std::time::{Duration, Instant}; +use log::debug; +use rand::rngs::OsRng; + use byteorder::{ByteOrder, LittleEndian}; use crossbeam_channel::bounded; use x25519_dalek::StaticSecret; @@ -16,6 +18,14 @@ const SIZE_HANDSHAKE_QUEUE: usize = 128; const THRESHOLD_UNDER_LOAD: usize = SIZE_HANDSHAKE_QUEUE / 4; const DURATION_UNDER_LOAD: Duration = Duration::from_millis(10_000); +#[derive(Clone)] +pub struct Peer(Arc>); + +pub struct PeerInner { + peer: router::Peer, + timers: Timers, +} + pub struct Timers {} pub struct Events(); @@ -38,7 +48,7 @@ pub struct Wireguard { impl Wireguard { fn start(&self) {} - fn new(tun: T, bind: B) -> Wireguard { + fn new(tun: T, bind: B, sk: StaticSecret) -> Wireguard { let router = Arc::new(router::Device::new( num_cpus::get(), tun.clone(), @@ -46,11 +56,14 @@ impl Wireguard { )); let handshake_staged = Arc::new(AtomicUsize::new(0)); + let handshake_device: Arc>> = + Arc::new(handshake::Device::new(sk)); // start UDP read IO thread let (handshake_tx, handshake_rx) = bounded(128); { let tun = tun.clone(); + let bind = bind.clone(); thread::spawn(move || { let mut under_load = Instant::now() - DURATION_UNDER_LOAD - Duration::from_millis(1000); @@ -81,11 +94,9 @@ impl Wireguard { } // pass source address along if under load - if under_load.elapsed() < DURATION_UNDER_LOAD { - handshake_tx.send((msg, Some(src))).unwrap(); - } else { - handshake_tx.send((msg, None)).unwrap(); - } + handshake_tx + .send((msg, src, under_load.elapsed() < DURATION_UNDER_LOAD)) + .unwrap(); } router::TYPE_TRANSPORT => { // transport message @@ -98,9 +109,50 @@ impl Wireguard { // start handshake workers for _ in 0..num_cpus::get() { + let bind = bind.clone(); let handshake_rx = handshake_rx.clone(); - thread::spawn(move || loop { - let (msg, src) = handshake_rx.recv().unwrap(); // TODO handle error + let handshake_device = handshake_device.clone(); + thread::spawn(move || { + // prepare OsRng instance for this thread + let mut rng = OsRng::new().unwrap(); + + // process elements from the handshake queue + for (msg, src, under_load) in handshake_rx { + // feed message to handshake device + let src_validate = (&src).into_address(); // TODO avoid + match handshake_device.process( + &mut rng, + &msg[..], + if under_load { + Some(&src_validate) + } else { + None + }, + ) { + Ok((identity, msg, keypair)) => { + // send response + if let Some(msg) = msg { + let _ = bind.send(&msg[..], &src).map_err(|e| { + debug!( + "handshake worker, failed to send response, error = {:?}", + e + ) + }); + } + + // update timers + if let Some(identity) = identity { + // add keypair to peer and free any unused ids + if let Some(keypair) = keypair { + for id in identity.0.peer.add_keypair(keypair) { + handshake_device.release(id); + } + } + } + } + Err(e) => debug!("handshake worker, error = {:?}", e), + } + } }); } -- cgit v1.2.3-59-g8ed1b