diff options
Diffstat (limited to 'src/wireguard/workers.rs')
-rw-r--r-- | src/wireguard/workers.rs | 26 |
1 files changed, 9 insertions, 17 deletions
diff --git a/src/wireguard/workers.rs b/src/wireguard/workers.rs index aeb6063..62d531d 100644 --- a/src/wireguard/workers.rs +++ b/src/wireguard/workers.rs @@ -19,7 +19,8 @@ use super::udp::UDP; // constants use super::constants::{ - DURATION_UNDER_LOAD, MESSAGE_PADDING_MULTIPLE, THRESHOLD_UNDER_LOAD, TIME_HORIZON, + DURATION_UNDER_LOAD, MAX_QUEUED_INCOMING_HANDSHAKES, MESSAGE_PADDING_MULTIPLE, + THRESHOLD_UNDER_LOAD, TIME_HORIZON, }; use super::handshake::MAX_HANDSHAKE_MSG_SIZE; use super::handshake::{TYPE_COOKIE_REPLY, TYPE_INITIATION, TYPE_RESPONSE}; @@ -131,19 +132,7 @@ pub fn udp_worker<T: Tun, B: UDP>(wg: &WireGuard<T, B>, reader: B::Reader) { match LittleEndian::read_u32(&msg[..]) { TYPE_COOKIE_REPLY | TYPE_INITIATION | TYPE_RESPONSE => { debug!("{} : reader, received handshake message", wg); - - // add one to pending - let pending = wg.pending.fetch_add(1, Ordering::SeqCst); - - // update under_load flag - if pending > THRESHOLD_UNDER_LOAD { - debug!("{} : reader, set under load (pending = {})", wg, pending); - last_under_load = Instant::now(); - } else if last_under_load.elapsed() > DURATION_UNDER_LOAD { - debug!("{} : reader, clear under load", wg); - } - - // add to handshake queue + wg.pending.fetch_add(1, Ordering::SeqCst); wg.queue.send(HandshakeJob::Message(msg, src)); } TYPE_TRANSPORT => { @@ -174,13 +163,15 @@ pub fn handshake_worker<T: Tun, B: UDP>( let job: HandshakeJob<B::Endpoint> = job; let pending = wg.pending.fetch_sub(1, Ordering::SeqCst); let mut under_load = false; - + + debug_assert!(pending < MAX_QUEUED_INCOMING_HANDSHAKES + (1 << 16)); + // immediate go under load if too many handshakes pending if pending > THRESHOLD_UNDER_LOAD { *wg.last_under_load.lock() = Instant::now(); under_load = true; } - + // remain under load for a while if !under_load { let elapsed = wg.last_under_load.lock().elapsed(); @@ -188,6 +179,7 @@ pub fn handshake_worker<T: Tun, B: UDP>( under_load = true; } } + log::trace!("{} : handshake worker, under_load = {}", wg, under_load); // de-multiplex staged handshake jobs and handshake messages match job { @@ -201,7 +193,7 @@ pub fn handshake_worker<T: Tun, B: UDP>( Some(src.into_address()) } else { None - } + }, ) { Ok((pk, resp, keypair)) => { // send response (might be cookie reply or handshake response) |