diff options
Diffstat (limited to 'src/wireguard/wireguard.rs')
-rw-r--r-- | src/wireguard/wireguard.rs | 47 |
1 files changed, 41 insertions, 6 deletions
diff --git a/src/wireguard/wireguard.rs b/src/wireguard/wireguard.rs index 25544d9..233559e 100644 --- a/src/wireguard/wireguard.rs +++ b/src/wireguard/wireguard.rs @@ -21,6 +21,7 @@ use std::collections::HashMap; use log::debug; use rand::rngs::OsRng; +use rand::Rng; use spin::{Mutex, RwLock, RwLockReadGuard}; use byteorder::{ByteOrder, LittleEndian}; @@ -37,6 +38,8 @@ pub struct Peer<T: Tun, B: Bind> { } pub struct PeerInner<B: Bind> { + pub id: u64, + pub keepalive: AtomicUsize, // keepalive interval pub rx_bytes: AtomicU64, pub tx_bytes: AtomicU64, @@ -50,6 +53,9 @@ pub struct PeerInner<B: Bind> { } pub struct WireguardInner<T: Tun, B: Bind> { + // identifier (for logging) + id: u32, + // provides access to the MTU value of the tun device // (otherwise owned solely by the router and a dedicated read IO thread) mtu: T::MTU, @@ -96,7 +102,13 @@ impl<B: Bind> PeerInner<B> { impl<T: Tun, B: Bind> fmt::Display for Peer<T, B> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "peer()") + write!(f, "peer(id = {})", self.id) + } +} + +impl<T: Tun, B: Bind> fmt::Display for WireguardInner<T, B> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "wireguard({:x})", self.id) } } @@ -209,7 +221,9 @@ impl<T: Tun, B: Bind> Wireguard<T, B> { } pub fn new_peer(&self, pk: PublicKey) { + let mut rng = OsRng::new().unwrap(); let state = Arc::new(PeerInner { + id: rng.gen(), pk, last_handshake: Mutex::new(SystemTime::UNIX_EPOCH), handshake_queued: AtomicBool::new(false), @@ -277,11 +291,17 @@ impl<T: Tun, B: Bind> Wireguard<T, B> { handshake::TYPE_COOKIE_REPLY | handshake::TYPE_INITIATION | handshake::TYPE_RESPONSE => { + debug!("{} : reader, received handshake message", wg); + + let pending = wg.pending.fetch_add(1, Ordering::SeqCst); + // update under_load flag - if wg.pending.fetch_add(1, Ordering::SeqCst) > THRESHOLD_UNDER_LOAD { + if pending > THRESHOLD_UNDER_LOAD { + debug!("{} : reader, set under load (pending = {})", wg, pending); last_under_load = Instant::now(); wg.under_load.store(true, Ordering::SeqCst); } else if last_under_load.elapsed() > DURATION_UNDER_LOAD { + debug!("{} : reader, clear under load", wg); wg.under_load.store(false, Ordering::SeqCst); } @@ -291,6 +311,8 @@ impl<T: Tun, B: Bind> Wireguard<T, B> { .unwrap(); } router::TYPE_TRANSPORT => { + debug!("{} : reader, received transport message", wg); + // transport message let _ = wg.router.recv(src, msg).map_err(|e| { debug!("Failed to handle incoming transport message: {}", e); @@ -313,6 +335,7 @@ impl<T: Tun, B: Bind> Wireguard<T, B> { let mut rng = OsRng::new().unwrap(); let (tx, rx): (Sender<HandshakeJob<B::Endpoint>>, _) = bounded(SIZE_HANDSHAKE_QUEUE); let wg = Arc::new(WireguardInner { + id: rng.gen(), mtu: mtu.clone(), peers: RwLock::new(HashMap::new()), send: RwLock::new(None), @@ -331,12 +354,13 @@ impl<T: Tun, B: Bind> Wireguard<T, B> { let wg = wg.clone(); let rx = rx.clone(); thread::spawn(move || { + debug!("{} : handshake worker, started", wg); + // prepare OsRng instance for this thread let mut rng = OsRng::new().unwrap(); // process elements from the handshake queue for job in rx { - wg.pending.fetch_sub(1, Ordering::SeqCst); let state = wg.handshake.read(); if !state.active { continue; @@ -344,6 +368,8 @@ impl<T: Tun, B: Bind> Wireguard<T, B> { match job { HandshakeJob::Message(msg, src) => { + wg.pending.fetch_sub(1, Ordering::SeqCst); + // feed message to handshake device let src_validate = (&src).into_address(); // TODO avoid @@ -352,6 +378,7 @@ impl<T: Tun, B: Bind> Wireguard<T, B> { &mut rng, &msg[..], if wg.under_load.load(Ordering::Relaxed) { + debug!("{} : handshake worker, under load", wg); Some(&src_validate) } else { None @@ -364,9 +391,14 @@ impl<T: Tun, B: Bind> Wireguard<T, B> { resp_len = msg.len() as u64; let send: &Option<B::Writer> = &*wg.send.read(); if let Some(writer) = send.as_ref() { + debug!( + "{} : handshake worker, send response ({} bytes)", + wg, resp_len + ); let _ = writer.write(&msg[..], &src).map_err(|e| { debug!( - "handshake worker, failed to send response, error = {}", + "{} : handshake worker, failed to send response, error = {}", + wg, e ) }); @@ -387,11 +419,13 @@ impl<T: Tun, B: Bind> Wireguard<T, B> { // update timers after sending handshake response if resp_len > 0 { + debug!("{} : handshake worker, handshake response sent", wg); peer.state.sent_handshake_response(); } // add resulting keypair to peer keypair.map(|kp| { + debug!("{} : handshake worker, new keypair", wg); // free any unused ids for id in peer.router.add_keypair(kp) { state.device.release(id); @@ -400,14 +434,15 @@ impl<T: Tun, B: Bind> Wireguard<T, B> { } } } - Err(e) => debug!("handshake worker, error = {:?}", e), + Err(e) => debug!("{} : handshake worker, error = {:?}", wg, e), } } HandshakeJob::New(pk) => { + debug!("{} : handshake worker, new handshake requested", wg); let _ = state.device.begin(&mut rng, &pk).map(|msg| { if let Some(peer) = wg.peers.read().get(pk.as_bytes()) { let _ = peer.router.send(&msg[..]).map_err(|e| { - debug!("handshake worker, failed to send handshake initiation, error = {}", e) + debug!("{} : handshake worker, failed to send handshake initiation, error = {}", wg, e) }); peer.state.sent_handshake_initiation(); } |