summaryrefslogtreecommitdiffstats
path: root/src/wireguard/workers.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/wireguard/workers.rs')
-rw-r--r--src/wireguard/workers.rs40
1 files changed, 24 insertions, 16 deletions
diff --git a/src/wireguard/workers.rs b/src/wireguard/workers.rs
index b65f49a..aeb6063 100644
--- a/src/wireguard/workers.rs
+++ b/src/wireguard/workers.rs
@@ -25,7 +25,7 @@ use super::handshake::MAX_HANDSHAKE_MSG_SIZE;
use super::handshake::{TYPE_COOKIE_REPLY, TYPE_INITIATION, TYPE_RESPONSE};
use super::router::{CAPACITY_MESSAGE_POSTFIX, SIZE_MESSAGE_PREFIX, TYPE_TRANSPORT};
-use super::Wireguard;
+use super::wireguard::WireGuard;
pub enum HandshakeJob<E> {
Message(Vec<u8>, E),
@@ -54,7 +54,7 @@ const fn padding(size: usize, mtu: usize) -> usize {
min(mtu, size + (pad - size % pad) % pad)
}
-pub fn tun_worker<T: Tun, B: UDP>(wg: &Wireguard<T, B>, reader: T::Reader) {
+pub fn tun_worker<T: Tun, B: UDP>(wg: &WireGuard<T, B>, reader: T::Reader) {
loop {
// create vector big enough for any transport message (based on MTU)
let mtu = wg.mtu.load(Ordering::Relaxed);
@@ -100,7 +100,7 @@ pub fn tun_worker<T: Tun, B: UDP>(wg: &Wireguard<T, B>, reader: T::Reader) {
}
}
-pub fn udp_worker<T: Tun, B: UDP>(wg: &Wireguard<T, B>, reader: B::Reader) {
+pub fn udp_worker<T: Tun, B: UDP>(wg: &WireGuard<T, B>, reader: B::Reader) {
let mut last_under_load = Instant::now() - TIME_HORIZON;
loop {
@@ -160,7 +160,7 @@ pub fn udp_worker<T: Tun, B: UDP>(wg: &Wireguard<T, B>, reader: B::Reader) {
}
pub fn handshake_worker<T: Tun, B: UDP>(
- wg: &Wireguard<T, B>,
+ wg: &WireGuard<T, B>,
rx: Receiver<HandshakeJob<B::Endpoint>>,
) {
debug!("{} : handshake worker, started", wg);
@@ -170,30 +170,38 @@ pub fn handshake_worker<T: Tun, B: UDP>(
// process elements from the handshake queue
for job in rx {
- // decrement pending pakcets (under_load)
+ // check if under load
let job: HandshakeJob<B::Endpoint> = job;
- wg.pending.fetch_sub(1, Ordering::SeqCst);
+ let pending = wg.pending.fetch_sub(1, Ordering::SeqCst);
+ let mut under_load = false;
+
+ // immediate go under load if too many handshakes pending
+ if pending > THRESHOLD_UNDER_LOAD {
+ *wg.last_under_load.lock() = Instant::now();
+ under_load = true;
+ }
+
+ // remain under load for a while
+ if !under_load {
+ let elapsed = wg.last_under_load.lock().elapsed();
+ if elapsed > DURATION_UNDER_LOAD {
+ under_load = true;
+ }
+ }
- // demultiplex staged handshake jobs and handshake messages
+ // de-multiplex staged handshake jobs and handshake messages
match job {
HandshakeJob::Message(msg, src) => {
- // feed message to handshake device
- let src_validate = (&src).into_address(); // TODO avoid
-
// process message
let device = wg.handshake.read();
match device.process(
&mut rng,
&msg[..],
- None,
- /*
- if wg.under_load.load(Ordering::Relaxed) {
- debug!("{} : handshake worker, under load", wg);
- Some(&src_validate)
+ if under_load {
+ Some(src.into_address())
} else {
None
}
- */
) {
Ok((pk, resp, keypair)) => {
// send response (might be cookie reply or handshake response)