diff options
author | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-12-06 21:45:21 +0100 |
---|---|---|
committer | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-12-06 21:45:21 +0100 |
commit | 74e576a9c21b0de451e0588428fbbb99b24eb074 (patch) | |
tree | 381ad26325ae4bee1f6a17449110ac941c2a9192 /src/wireguard/timers.rs | |
parent | Moving away from peer threads (diff) | |
download | wireguard-rs-74e576a9c21b0de451e0588428fbbb99b24eb074.tar.xz wireguard-rs-74e576a9c21b0de451e0588428fbbb99b24eb074.zip |
Fixed inbound job bug (add to sequential queue)
Diffstat (limited to '')
-rw-r--r-- | src/wireguard/timers.rs | 89 |
1 files changed, 50 insertions, 39 deletions
diff --git a/src/wireguard/timers.rs b/src/wireguard/timers.rs index e1aabad..f292afd 100644 --- a/src/wireguard/timers.rs +++ b/src/wireguard/timers.rs @@ -3,14 +3,14 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime}; -use log::debug; use hjul::{Runner, Timer}; +use log::debug; use super::constants::*; use super::router::{message_data_len, Callbacks}; -use super::{Peer, PeerInner}; -use super::{udp, tun}; use super::types::KeyPair; +use super::{tun, udp}; +use super::{Peer, PeerInner}; pub struct Timers { // only updated during configuration @@ -36,7 +36,6 @@ impl Timers { } impl<T: tun::Tun, B: udp::UDP> PeerInner<T, B> { - pub fn get_keepalive_interval(&self) -> u64 { self.timers().keepalive_interval } @@ -57,17 +56,19 @@ impl<T: tun::Tun, B: udp::UDP> PeerInner<T, B> { timers.send_persistent_keepalive.stop(); timers.zero_key_material.stop(); timers.new_handshake.stop(); - + // reset all timer state timers.handshake_attempts.store(0, Ordering::SeqCst); - timers.sent_lastminute_handshake.store(false, Ordering::SeqCst); + timers + .sent_lastminute_handshake + .store(false, Ordering::SeqCst); timers.need_another_keepalive.store(false, Ordering::SeqCst); } pub fn start_timers(&self) { // take a write lock preventing simultaneous "stop_timers" call let mut timers = self.timers_mut(); - + // set flag to reenable timer events if timers.enabled { return; @@ -76,18 +77,20 @@ impl<T: tun::Tun, B: udp::UDP> PeerInner<T, B> { // start send_persistent_keepalive if timers.keepalive_interval > 0 { - timers.send_persistent_keepalive.start( - Duration::from_secs(timers.keepalive_interval) - ); + timers + .send_persistent_keepalive + .start(Duration::from_secs(timers.keepalive_interval)); } } /* should be called after an authenticated data packet is sent */ pub fn timers_data_sent(&self) { - let timers = self.timers(); - if timers.enabled { - timers.new_handshake.start(KEEPALIVE_TIMEOUT + REKEY_TIMEOUT); - } + let timers = self.timers(); + if timers.enabled { + timers + .new_handshake + .start(KEEPALIVE_TIMEOUT + REKEY_TIMEOUT); + } } /* should be called after an authenticated data packet is received */ @@ -139,7 +142,9 @@ impl<T: tun::Tun, B: udp::UDP> PeerInner<T, B> { if timers.enabled { timers.retransmit_handshake.stop(); timers.handshake_attempts.store(0, Ordering::SeqCst); - timers.sent_lastminute_handshake.store(false, Ordering::SeqCst); + timers + .sent_lastminute_handshake + .store(false, Ordering::SeqCst); *self.walltime_last_handshake.lock() = Some(SystemTime::now()); } } @@ -161,9 +166,9 @@ impl<T: tun::Tun, B: udp::UDP> PeerInner<T, B> { let timers = self.timers(); if timers.enabled && timers.keepalive_interval > 0 { // push persistent_keepalive into the future - timers.send_persistent_keepalive.reset(Duration::from_secs( - timers.keepalive_interval - )); + timers + .send_persistent_keepalive + .reset(Duration::from_secs(timers.keepalive_interval)); } } @@ -179,7 +184,6 @@ impl<T: tun::Tun, B: udp::UDP> PeerInner<T, B> { if timers.enabled { timers.retransmit_handshake.reset(REKEY_TIMEOUT); } - } /* Called after a handshake worker sends a handshake initiation to the peer @@ -195,7 +199,7 @@ impl<T: tun::Tun, B: udp::UDP> PeerInner<T, B> { *self.last_handshake_sent.lock() = Instant::now(); self.timers_any_authenticated_packet_traversal(); self.timers_any_authenticated_packet_sent(); - } + } pub fn set_persistent_keepalive_interval(&self, secs: u64) { let mut timers = self.timers_mut(); @@ -205,10 +209,12 @@ impl<T: tun::Tun, B: udp::UDP> PeerInner<T, B> { // stop the keepalive timer with the old interval timers.send_persistent_keepalive.stop(); - + // restart the persistent_keepalive timer with the new interval if secs > 0 && timers.enabled { - timers.send_persistent_keepalive.start(Duration::from_secs(secs)); + timers + .send_persistent_keepalive + .start(Duration::from_secs(secs)); } } @@ -220,7 +226,6 @@ impl<T: tun::Tun, B: udp::UDP> PeerInner<T, B> { } } - impl Timers { pub fn new<T, B>(runner: &Runner, running: bool, peer: Peer<T, B>) -> Timers where @@ -242,9 +247,12 @@ impl Timers { if !timers.enabled { return; } - + // check if handshake attempts remaining - let attempts = peer.timers().handshake_attempts.fetch_add(1, Ordering::SeqCst); + let attempts = peer + .timers() + .handshake_attempts + .fetch_add(1, Ordering::SeqCst); if attempts > MAX_TIMER_HANDSHAKES { debug!( "Handshake for peer {} did not complete after {} attempts, giving up", @@ -257,8 +265,8 @@ impl Timers { } else { debug!( "Handshake for {} did not complete after {} seconds, retrying (try {})", - peer, - REKEY_TIMEOUT.as_secs(), + peer, + REKEY_TIMEOUT.as_secs(), attempts ); timers.retransmit_handshake.reset(REKEY_TIMEOUT); @@ -287,7 +295,7 @@ impl Timers { runner.timer(move || { debug!( "Retrying handshake with {} because we stopped hearing back after {} seconds", - peer, + peer, (KEEPALIVE_TIMEOUT + REKEY_TIMEOUT).as_secs() ); peer.router.clear_src(); @@ -307,9 +315,9 @@ impl Timers { if timers.enabled && timers.keepalive_interval > 0 { peer.router.send_keepalive(); timers.send_keepalive.stop(); - timers.send_persistent_keepalive.start(Duration::from_secs( - timers.keepalive_interval - )); + timers + .send_persistent_keepalive + .start(Duration::from_secs(timers.keepalive_interval)); } }) }, @@ -318,7 +326,7 @@ impl Timers { pub fn dummy(runner: &Runner) -> Timers { Timers { - enabled: false, + enabled: false, keepalive_interval: 0, need_another_keepalive: AtomicBool::new(false), sent_lastminute_handshake: AtomicBool::new(false), @@ -344,9 +352,8 @@ impl<T: tun::Tun, B: udp::UDP> Callbacks for Events<T, B> { */ #[inline(always)] fn send(peer: &Self::Opaque, size: usize, sent: bool, keypair: &Arc<KeyPair>, counter: u64) { - // update timers and stats - + peer.timers_any_authenticated_packet_traversal(); peer.timers_any_authenticated_packet_sent(); peer.tx_bytes.fetch_add(size as u64, Ordering::Relaxed); @@ -375,7 +382,6 @@ impl<T: tun::Tun, B: udp::UDP> Callbacks for Events<T, B> { */ #[inline(always)] fn recv(peer: &Self::Opaque, size: usize, sent: bool, keypair: &Arc<KeyPair>) { - // update timers and stats peer.timers_any_authenticated_packet_traversal(); @@ -386,13 +392,18 @@ impl<T: tun::Tun, B: udp::UDP> Callbacks for Events<T, B> { } // keep_key_fresh - + #[inline(always)] fn keep_key_fresh(keypair: &Arc<KeyPair>) -> bool { - Instant::now() - keypair.birth > REJECT_AFTER_TIME - KEEPALIVE_TIMEOUT - REKEY_TIMEOUT + Instant::now() - keypair.birth > REJECT_AFTER_TIME - KEEPALIVE_TIMEOUT - REKEY_TIMEOUT } - if keep_key_fresh(keypair) && !peer.timers().sent_lastminute_handshake.swap(true, Ordering::Acquire) { + if keep_key_fresh(keypair) + && !peer + .timers() + .sent_lastminute_handshake + .swap(true, Ordering::Acquire) + { peer.packet_send_queued_handshake_initiation(false); } } @@ -405,7 +416,7 @@ impl<T: tun::Tun, B: udp::UDP> Callbacks for Events<T, B> { */ #[inline(always)] fn need_key(peer: &Self::Opaque) { - peer.packet_send_queued_handshake_initiation(false); + peer.packet_send_queued_handshake_initiation(false); } #[inline(always)] |