From d4f5d5b7237d78ea177004a0650a550d03110b7c Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Fri, 27 Dec 2019 18:01:11 +0100 Subject: Fixed typo in under load code --- src/wireguard/peer.rs | 22 +++++++++++++++++++--- src/wireguard/router/peer.rs | 18 ++++++++++++------ src/wireguard/timers.rs | 34 +++++++++++++++++++++++++++------- src/wireguard/workers.rs | 14 ++++++-------- 4 files changed, 64 insertions(+), 24 deletions(-) (limited to 'src/wireguard') diff --git a/src/wireguard/peer.rs b/src/wireguard/peer.rs index e02d2e0..1af4df3 100644 --- a/src/wireguard/peer.rs +++ b/src/wireguard/peer.rs @@ -4,8 +4,8 @@ use super::timers::{Events, Timers}; use super::tun::Tun; use super::udp::UDP; -use super::wireguard::WireGuard; use super::constants::REKEY_TIMEOUT; +use super::wireguard::WireGuard; use super::workers::HandshakeJob; use std::fmt; @@ -60,21 +60,31 @@ impl PeerInner { * The function is ratelimited. */ pub fn packet_send_handshake_initiation(&self) { - // the function is rate limited + log::trace!("{} : packet_send_handshake_initiation", self); + // the function is rate limited { let mut lhs = self.last_handshake_sent.lock(); if lhs.elapsed() < REKEY_TIMEOUT { + log::trace!("{} : packet_send_handshake_initiation, rate-limited!", self); return; } *lhs = Instant::now(); } // create a new handshake job for the peer - if !self.handshake_queued.swap(true, Ordering::SeqCst) { self.wg.pending.fetch_add(1, Ordering::SeqCst); self.wg.queue.send(HandshakeJob::New(self.pk)); + log::trace!( + "{} : packet_send_handshake_initiation, handshake queued", + self + ); + } else { + log::trace!( + "{} : packet_send_handshake_initiation, handshake already queued", + self + ); } } @@ -89,6 +99,12 @@ impl PeerInner { } } +impl fmt::Display for PeerInner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "peer(id = {})", self.id) + } +} + impl fmt::Display for Peer { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "peer(id = {})", self.id) diff --git a/src/wireguard/router/peer.rs b/src/wireguard/router/peer.rs index 23a3e62..b8110f0 100644 --- a/src/wireguard/router/peer.rs +++ b/src/wireguard/router/peer.rs @@ -232,7 +232,7 @@ impl> Peer { sent = true; - self.send_raw(msg); + self.send_raw(msg, false); } None => break sent, } @@ -240,10 +240,11 @@ impl> Peer) -> bool { + // + // Returns true if the message was queued for transmission. + fn send_raw(&self, msg: Vec, stage: bool) -> bool { log::debug!("peer.send_raw"); - match self.send_job(msg, false) { + match self.send_job(msg, stage) { Some(job) => { self.device.queue_outbound.send(job); debug!("send_raw: got obtained send_job"); @@ -300,7 +301,11 @@ impl> Peer, stage: bool) -> Option> { - debug!("peer.send_job"); + debug!( + "peer.send_job, msg.len() = {}, stage = {}", + msg.len(), + stage + ); debug_assert!( msg.len() >= mem::size_of::(), "received message with size: {:}", @@ -333,6 +338,7 @@ impl> Peer> PeerHandle bool { debug!("peer.send_keepalive"); - self.peer.send_raw(vec![0u8; SIZE_MESSAGE_PREFIX]) + self.peer.send_raw(vec![0u8; SIZE_MESSAGE_PREFIX], true) } /// Map a subnet to the peer diff --git a/src/wireguard/timers.rs b/src/wireguard/timers.rs index b8c6d99..6b852bb 100644 --- a/src/wireguard/timers.rs +++ b/src/wireguard/timers.rs @@ -80,7 +80,7 @@ impl PeerInner { if timers.keepalive_interval > 0 { timers .send_persistent_keepalive - .start(Duration::from_secs(timers.keepalive_interval)); + .start(Duration::from_secs(0)); } } @@ -108,6 +108,7 @@ impl PeerInner { * - handshake */ pub fn timers_any_authenticated_packet_sent(&self) { + log::trace!("timers_any_authenticated_packet_sent"); let timers = self.timers(); if timers.enabled { timers.send_keepalive.stop() @@ -120,6 +121,7 @@ impl PeerInner { * - handshake */ pub fn timers_any_authenticated_packet_received(&self) { + log::trace!("timers_any_authenticated_packet_received"); let timers = self.timers(); if timers.enabled { timers.new_handshake.stop(); @@ -128,6 +130,7 @@ impl PeerInner { /* Should be called after a handshake initiation message is sent. */ pub fn timers_handshake_initiated(&self) { + log::trace!("timers_handshake_initiated"); let timers = self.timers(); if timers.enabled { timers.send_keepalive.stop(); @@ -139,6 +142,7 @@ impl PeerInner { * or when getting key confirmation via the first data message. */ pub fn timers_handshake_complete(&self) { + log::trace!("timers_handshake_complete"); let timers = self.timers(); if timers.enabled { timers.retransmit_handshake.stop(); @@ -154,6 +158,7 @@ impl PeerInner { * handshake response or after receiving a handshake response. */ pub fn timers_session_derived(&self) { + log::trace!("timers_session_derived"); let timers = self.timers(); if timers.enabled { timers.zero_key_material.reset(REJECT_AFTER_TIME * 3); @@ -164,6 +169,7 @@ impl PeerInner { * keepalive, data, or handshake is sent, or after one is received. */ pub fn timers_any_authenticated_packet_traversal(&self) { + log::trace!("timers_any_authenticated_packet_traversal"); let timers = self.timers(); if timers.enabled && timers.keepalive_interval > 0 { // push persistent_keepalive into the future @@ -174,6 +180,7 @@ impl PeerInner { } fn timers_set_retransmit_handshake(&self) { + log::trace!("timers_set_retransmit_handshake"); let timers = self.timers(); if timers.enabled { timers.retransmit_handshake.reset(REKEY_TIMEOUT); @@ -205,11 +212,11 @@ impl PeerInner { // stop the keepalive timer with the old interval timers.send_persistent_keepalive.stop(); - // restart the persistent_keepalive timer with the new interval + // cause immediate expiry of persistent_keepalive timer if secs > 0 && timers.enabled { timers .send_persistent_keepalive - .start(Duration::from_secs(secs)); + .reset(Duration::from_secs(0)); } } @@ -233,6 +240,8 @@ impl Timers { retransmit_handshake: { let peer = peer.clone(); runner.timer(move || { + log::trace!("{} : timer fired (retransmit_handshake)", peer); + // ignore if timers are disabled let timers = peer.timers(); if !timers.enabled { @@ -269,6 +278,8 @@ impl Timers { send_keepalive: { let peer = peer.clone(); runner.timer(move || { + log::trace!("{} : timer fired (send_keepalive)", peer); + // ignore if timers are disabled let timers = peer.timers(); if !timers.enabled { @@ -284,7 +295,8 @@ impl Timers { new_handshake: { let peer = peer.clone(); runner.timer(move || { - debug!( + log::trace!("{} : timer fired (new_handshake)", peer); + log::debug!( "Retrying handshake with {} because we stopped hearing back after {} seconds", peer, (KEEPALIVE_TIMEOUT + REKEY_TIMEOUT).as_secs() @@ -296,16 +308,19 @@ impl Timers { zero_key_material: { let peer = peer.clone(); runner.timer(move || { + log::trace!("{} : timer fired (zero_key_material)", peer); peer.router.zero_keys(); }) }, send_persistent_keepalive: { let peer = peer.clone(); runner.timer(move || { + log::trace!("{} : timer fired (send_persistent_keepalive)", peer); let timers = peer.timers(); if timers.enabled && timers.keepalive_interval > 0 { - peer.router.send_keepalive(); timers.send_keepalive.stop(); + let queued = peer.router.send_keepalive(); + log::trace!("{} : keepalive queued {}", peer, queued); timers .send_persistent_keepalive .start(Duration::from_secs(timers.keepalive_interval)); @@ -331,8 +346,7 @@ impl Timers { } } -/* Instance of the router callbacks */ - +/* instance of the router callbacks */ pub struct Events(PhantomData<(T, B)>); impl Callbacks for Events { @@ -343,6 +357,8 @@ impl Callbacks for Events { */ #[inline(always)] fn send(peer: &Self::Opaque, size: usize, sent: bool, keypair: &Arc, counter: u64) { + log::trace!("{} : EVENT(send)", peer); + // update timers and stats peer.timers_any_authenticated_packet_traversal(); @@ -373,6 +389,8 @@ impl Callbacks for Events { */ #[inline(always)] fn recv(peer: &Self::Opaque, size: usize, sent: bool, keypair: &Arc) { + log::trace!("{} : EVENT(recv)", peer); + // update timers and stats peer.timers_any_authenticated_packet_traversal(); @@ -407,11 +425,13 @@ impl Callbacks for Events { */ #[inline(always)] fn need_key(peer: &Self::Opaque) { + log::trace!("{} : EVENT(need_key)", peer); peer.packet_send_queued_handshake_initiation(false); } #[inline(always)] fn key_confirmed(peer: &Self::Opaque) { + log::trace!("{} : EVENT(key_confirmed)", peer); peer.timers_handshake_complete(); } } diff --git a/src/wireguard/workers.rs b/src/wireguard/workers.rs index 62d531d..e1d3899 100644 --- a/src/wireguard/workers.rs +++ b/src/wireguard/workers.rs @@ -20,7 +20,7 @@ use super::udp::UDP; // constants use super::constants::{ DURATION_UNDER_LOAD, MAX_QUEUED_INCOMING_HANDSHAKES, MESSAGE_PADDING_MULTIPLE, - THRESHOLD_UNDER_LOAD, TIME_HORIZON, + THRESHOLD_UNDER_LOAD, }; use super::handshake::MAX_HANDSHAKE_MSG_SIZE; use super::handshake::{TYPE_COOKIE_REPLY, TYPE_INITIATION, TYPE_RESPONSE}; @@ -102,8 +102,6 @@ pub fn tun_worker(wg: &WireGuard, reader: T::Reader) { } pub fn udp_worker(wg: &WireGuard, reader: B::Reader) { - let mut last_under_load = Instant::now() - TIME_HORIZON; - loop { // create vector big enough for any message given current MTU let mtu = wg.mtu.load(Ordering::Relaxed); @@ -160,26 +158,26 @@ pub fn handshake_worker( // process elements from the handshake queue for job in rx { // check if under load + let mut under_load = false; let job: HandshakeJob = job; let pending = wg.pending.fetch_sub(1, Ordering::SeqCst); - let mut under_load = false; - debug_assert!(pending < MAX_QUEUED_INCOMING_HANDSHAKES + (1 << 16)); // immediate go under load if too many handshakes pending if pending > THRESHOLD_UNDER_LOAD { + log::trace!("{} : handshake worker, under load (above threshold)", wg); *wg.last_under_load.lock() = Instant::now(); under_load = true; } - // remain under load for a while + // remain under load for DURATION_UNDER_LOAD if !under_load { let elapsed = wg.last_under_load.lock().elapsed(); - if elapsed > DURATION_UNDER_LOAD { + if DURATION_UNDER_LOAD >= elapsed { + log::trace!("{} : handshake worker, under load (recent)", wg); under_load = true; } } - log::trace!("{} : handshake worker, under_load = {}", wg, under_load); // de-multiplex staged handshake jobs and handshake messages match job { -- cgit v1.2.3-59-g8ed1b