diff options
Diffstat (limited to 'src/wireguard/workers.rs')
-rw-r--r-- | src/wireguard/workers.rs | 40 |
1 files changed, 24 insertions, 16 deletions
diff --git a/src/wireguard/workers.rs b/src/wireguard/workers.rs index b65f49a..aeb6063 100644 --- a/src/wireguard/workers.rs +++ b/src/wireguard/workers.rs @@ -25,7 +25,7 @@ use super::handshake::MAX_HANDSHAKE_MSG_SIZE; use super::handshake::{TYPE_COOKIE_REPLY, TYPE_INITIATION, TYPE_RESPONSE}; use super::router::{CAPACITY_MESSAGE_POSTFIX, SIZE_MESSAGE_PREFIX, TYPE_TRANSPORT}; -use super::Wireguard; +use super::wireguard::WireGuard; pub enum HandshakeJob<E> { Message(Vec<u8>, E), @@ -54,7 +54,7 @@ const fn padding(size: usize, mtu: usize) -> usize { min(mtu, size + (pad - size % pad) % pad) } -pub fn tun_worker<T: Tun, B: UDP>(wg: &Wireguard<T, B>, reader: T::Reader) { +pub fn tun_worker<T: Tun, B: UDP>(wg: &WireGuard<T, B>, reader: T::Reader) { loop { // create vector big enough for any transport message (based on MTU) let mtu = wg.mtu.load(Ordering::Relaxed); @@ -100,7 +100,7 @@ pub fn tun_worker<T: Tun, B: UDP>(wg: &Wireguard<T, B>, reader: T::Reader) { } } -pub fn udp_worker<T: Tun, B: UDP>(wg: &Wireguard<T, B>, reader: B::Reader) { +pub fn udp_worker<T: Tun, B: UDP>(wg: &WireGuard<T, B>, reader: B::Reader) { let mut last_under_load = Instant::now() - TIME_HORIZON; loop { @@ -160,7 +160,7 @@ pub fn udp_worker<T: Tun, B: UDP>(wg: &Wireguard<T, B>, reader: B::Reader) { } pub fn handshake_worker<T: Tun, B: UDP>( - wg: &Wireguard<T, B>, + wg: &WireGuard<T, B>, rx: Receiver<HandshakeJob<B::Endpoint>>, ) { debug!("{} : handshake worker, started", wg); @@ -170,30 +170,38 @@ pub fn handshake_worker<T: Tun, B: UDP>( // process elements from the handshake queue for job in rx { - // decrement pending pakcets (under_load) + // check if under load let job: HandshakeJob<B::Endpoint> = job; - wg.pending.fetch_sub(1, Ordering::SeqCst); + let pending = wg.pending.fetch_sub(1, Ordering::SeqCst); + let mut under_load = false; + + // immediate go under load if too many handshakes pending + if pending > THRESHOLD_UNDER_LOAD { + *wg.last_under_load.lock() = Instant::now(); + under_load = true; + } + + // remain under load for a while + if !under_load { + let elapsed = wg.last_under_load.lock().elapsed(); + if elapsed > DURATION_UNDER_LOAD { + under_load = true; + } + } - // demultiplex staged handshake jobs and handshake messages + // de-multiplex staged handshake jobs and handshake messages match job { HandshakeJob::Message(msg, src) => { - // feed message to handshake device - let src_validate = (&src).into_address(); // TODO avoid - // process message let device = wg.handshake.read(); match device.process( &mut rng, &msg[..], - None, - /* - if wg.under_load.load(Ordering::Relaxed) { - debug!("{} : handshake worker, under load", wg); - Some(&src_validate) + if under_load { + Some(src.into_address()) } else { None } - */ ) { Ok((pk, resp, keypair)) => { // send response (might be cookie reply or handshake response) |