summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Cargo.toml1
-rw-r--r--src/platform/linux/tun.rs5
-rw-r--r--src/wireguard/peer.rs22
-rw-r--r--src/wireguard/router/peer.rs18
-rw-r--r--src/wireguard/timers.rs34
-rw-r--r--src/wireguard/workers.rs14
6 files changed, 69 insertions, 25 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 4ef943c..1298a28 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -41,6 +41,7 @@ features = ["nightly"]
[features]
profiler = ["cpuprofiler"]
+start_up = []
[dev-dependencies]
pnet = "^0.22"
diff --git a/src/platform/linux/tun.rs b/src/platform/linux/tun.rs
index 9ccda86..c282a4b 100644
--- a/src/platform/linux/tun.rs
+++ b/src/platform/linux/tun.rs
@@ -299,7 +299,10 @@ impl LinuxTunStatus {
Err(LinuxTunError::Closed)
} else {
Ok(LinuxTunStatus {
- events: vec![TunEvent::Up(1500)],
+ events: vec![
+ #[cfg(feature = "start_up")]
+ TunEvent::Up(1500),
+ ],
index: get_ifindex(&name),
fd,
name,
diff --git a/src/wireguard/peer.rs b/src/wireguard/peer.rs
index e02d2e0..1af4df3 100644
--- a/src/wireguard/peer.rs
+++ b/src/wireguard/peer.rs
@@ -4,8 +4,8 @@ use super::timers::{Events, Timers};
use super::tun::Tun;
use super::udp::UDP;
-use super::wireguard::WireGuard;
use super::constants::REKEY_TIMEOUT;
+use super::wireguard::WireGuard;
use super::workers::HandshakeJob;
use std::fmt;
@@ -60,21 +60,31 @@ impl<T: Tun, B: UDP> PeerInner<T, B> {
* The function is ratelimited.
*/
pub fn packet_send_handshake_initiation(&self) {
- // the function is rate limited
+ log::trace!("{} : packet_send_handshake_initiation", self);
+ // the function is rate limited
{
let mut lhs = self.last_handshake_sent.lock();
if lhs.elapsed() < REKEY_TIMEOUT {
+ log::trace!("{} : packet_send_handshake_initiation, rate-limited!", self);
return;
}
*lhs = Instant::now();
}
// create a new handshake job for the peer
-
if !self.handshake_queued.swap(true, Ordering::SeqCst) {
self.wg.pending.fetch_add(1, Ordering::SeqCst);
self.wg.queue.send(HandshakeJob::New(self.pk));
+ log::trace!(
+ "{} : packet_send_handshake_initiation, handshake queued",
+ self
+ );
+ } else {
+ log::trace!(
+ "{} : packet_send_handshake_initiation, handshake already queued",
+ self
+ );
}
}
@@ -89,6 +99,12 @@ impl<T: Tun, B: UDP> PeerInner<T, B> {
}
}
+impl<T: Tun, B: UDP> fmt::Display for PeerInner<T, B> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "peer(id = {})", self.id)
+ }
+}
+
impl<T: Tun, B: UDP> fmt::Display for Peer<T, B> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "peer(id = {})", self.id)
diff --git a/src/wireguard/router/peer.rs b/src/wireguard/router/peer.rs
index 23a3e62..b8110f0 100644
--- a/src/wireguard/router/peer.rs
+++ b/src/wireguard/router/peer.rs
@@ -232,7 +232,7 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Peer<E, C, T,
match staged.pop_front() {
Some(msg) => {
sent = true;
- self.send_raw(msg);
+ self.send_raw(msg, false);
}
None => break sent,
}
@@ -240,10 +240,11 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Peer<E, C, T,
}
// Treat the msg as the payload of a transport message
- // Unlike device.send, peer.send_raw does not buffer messages when a key is not available.
- fn send_raw(&self, msg: Vec<u8>) -> bool {
+ //
+ // Returns true if the message was queued for transmission.
+ fn send_raw(&self, msg: Vec<u8>, stage: bool) -> bool {
log::debug!("peer.send_raw");
- match self.send_job(msg, false) {
+ match self.send_job(msg, stage) {
Some(job) => {
self.device.queue_outbound.send(job);
debug!("send_raw: got obtained send_job");
@@ -300,7 +301,11 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Peer<E, C, T,
}
pub fn send_job(&self, msg: Vec<u8>, stage: bool) -> Option<Job<Self, Outbound>> {
- debug!("peer.send_job");
+ debug!(
+ "peer.send_job, msg.len() = {}, stage = {}",
+ msg.len(),
+ stage
+ );
debug_assert!(
msg.len() >= mem::size_of::<TransportHeader>(),
"received message with size: {:}",
@@ -333,6 +338,7 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Peer<E, C, T,
// 1. Stage packet for later transmission
// 2. Request new key
if keypair.is_none() && stage {
+ log::trace!("packet staged");
self.staged_packets.lock().push_back(msg);
C::need_key(&self.opaque);
return None;
@@ -491,7 +497,7 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> PeerHandle<E,
pub fn send_keepalive(&self) -> bool {
debug!("peer.send_keepalive");
- self.peer.send_raw(vec![0u8; SIZE_MESSAGE_PREFIX])
+ self.peer.send_raw(vec![0u8; SIZE_MESSAGE_PREFIX], true)
}
/// Map a subnet to the peer
diff --git a/src/wireguard/timers.rs b/src/wireguard/timers.rs
index b8c6d99..6b852bb 100644
--- a/src/wireguard/timers.rs
+++ b/src/wireguard/timers.rs
@@ -80,7 +80,7 @@ impl<T: Tun, B: UDP> PeerInner<T, B> {
if timers.keepalive_interval > 0 {
timers
.send_persistent_keepalive
- .start(Duration::from_secs(timers.keepalive_interval));
+ .start(Duration::from_secs(0));
}
}
@@ -108,6 +108,7 @@ impl<T: Tun, B: UDP> PeerInner<T, B> {
* - handshake
*/
pub fn timers_any_authenticated_packet_sent(&self) {
+ log::trace!("timers_any_authenticated_packet_sent");
let timers = self.timers();
if timers.enabled {
timers.send_keepalive.stop()
@@ -120,6 +121,7 @@ impl<T: Tun, B: UDP> PeerInner<T, B> {
* - handshake
*/
pub fn timers_any_authenticated_packet_received(&self) {
+ log::trace!("timers_any_authenticated_packet_received");
let timers = self.timers();
if timers.enabled {
timers.new_handshake.stop();
@@ -128,6 +130,7 @@ impl<T: Tun, B: UDP> PeerInner<T, B> {
/* Should be called after a handshake initiation message is sent. */
pub fn timers_handshake_initiated(&self) {
+ log::trace!("timers_handshake_initiated");
let timers = self.timers();
if timers.enabled {
timers.send_keepalive.stop();
@@ -139,6 +142,7 @@ impl<T: Tun, B: UDP> PeerInner<T, B> {
* or when getting key confirmation via the first data message.
*/
pub fn timers_handshake_complete(&self) {
+ log::trace!("timers_handshake_complete");
let timers = self.timers();
if timers.enabled {
timers.retransmit_handshake.stop();
@@ -154,6 +158,7 @@ impl<T: Tun, B: UDP> PeerInner<T, B> {
* handshake response or after receiving a handshake response.
*/
pub fn timers_session_derived(&self) {
+ log::trace!("timers_session_derived");
let timers = self.timers();
if timers.enabled {
timers.zero_key_material.reset(REJECT_AFTER_TIME * 3);
@@ -164,6 +169,7 @@ impl<T: Tun, B: UDP> PeerInner<T, B> {
* keepalive, data, or handshake is sent, or after one is received.
*/
pub fn timers_any_authenticated_packet_traversal(&self) {
+ log::trace!("timers_any_authenticated_packet_traversal");
let timers = self.timers();
if timers.enabled && timers.keepalive_interval > 0 {
// push persistent_keepalive into the future
@@ -174,6 +180,7 @@ impl<T: Tun, B: UDP> PeerInner<T, B> {
}
fn timers_set_retransmit_handshake(&self) {
+ log::trace!("timers_set_retransmit_handshake");
let timers = self.timers();
if timers.enabled {
timers.retransmit_handshake.reset(REKEY_TIMEOUT);
@@ -205,11 +212,11 @@ impl<T: Tun, B: UDP> PeerInner<T, B> {
// stop the keepalive timer with the old interval
timers.send_persistent_keepalive.stop();
- // restart the persistent_keepalive timer with the new interval
+ // cause immediate expiry of persistent_keepalive timer
if secs > 0 && timers.enabled {
timers
.send_persistent_keepalive
- .start(Duration::from_secs(secs));
+ .reset(Duration::from_secs(0));
}
}
@@ -233,6 +240,8 @@ impl Timers {
retransmit_handshake: {
let peer = peer.clone();
runner.timer(move || {
+ log::trace!("{} : timer fired (retransmit_handshake)", peer);
+
// ignore if timers are disabled
let timers = peer.timers();
if !timers.enabled {
@@ -269,6 +278,8 @@ impl Timers {
send_keepalive: {
let peer = peer.clone();
runner.timer(move || {
+ log::trace!("{} : timer fired (send_keepalive)", peer);
+
// ignore if timers are disabled
let timers = peer.timers();
if !timers.enabled {
@@ -284,7 +295,8 @@ impl Timers {
new_handshake: {
let peer = peer.clone();
runner.timer(move || {
- debug!(
+ log::trace!("{} : timer fired (new_handshake)", peer);
+ log::debug!(
"Retrying handshake with {} because we stopped hearing back after {} seconds",
peer,
(KEEPALIVE_TIMEOUT + REKEY_TIMEOUT).as_secs()
@@ -296,16 +308,19 @@ impl Timers {
zero_key_material: {
let peer = peer.clone();
runner.timer(move || {
+ log::trace!("{} : timer fired (zero_key_material)", peer);
peer.router.zero_keys();
})
},
send_persistent_keepalive: {
let peer = peer.clone();
runner.timer(move || {
+ log::trace!("{} : timer fired (send_persistent_keepalive)", peer);
let timers = peer.timers();
if timers.enabled && timers.keepalive_interval > 0 {
- peer.router.send_keepalive();
timers.send_keepalive.stop();
+ let queued = peer.router.send_keepalive();
+ log::trace!("{} : keepalive queued {}", peer, queued);
timers
.send_persistent_keepalive
.start(Duration::from_secs(timers.keepalive_interval));
@@ -331,8 +346,7 @@ impl Timers {
}
}
-/* Instance of the router callbacks */
-
+/* instance of the router callbacks */
pub struct Events<T, B>(PhantomData<(T, B)>);
impl<T: Tun, B: UDP> Callbacks for Events<T, B> {
@@ -343,6 +357,8 @@ impl<T: Tun, B: UDP> Callbacks for Events<T, B> {
*/
#[inline(always)]
fn send(peer: &Self::Opaque, size: usize, sent: bool, keypair: &Arc<KeyPair>, counter: u64) {
+ log::trace!("{} : EVENT(send)", peer);
+
// update timers and stats
peer.timers_any_authenticated_packet_traversal();
@@ -373,6 +389,8 @@ impl<T: Tun, B: UDP> Callbacks for Events<T, B> {
*/
#[inline(always)]
fn recv(peer: &Self::Opaque, size: usize, sent: bool, keypair: &Arc<KeyPair>) {
+ log::trace!("{} : EVENT(recv)", peer);
+
// update timers and stats
peer.timers_any_authenticated_packet_traversal();
@@ -407,11 +425,13 @@ impl<T: Tun, B: UDP> Callbacks for Events<T, B> {
*/
#[inline(always)]
fn need_key(peer: &Self::Opaque) {
+ log::trace!("{} : EVENT(need_key)", peer);
peer.packet_send_queued_handshake_initiation(false);
}
#[inline(always)]
fn key_confirmed(peer: &Self::Opaque) {
+ log::trace!("{} : EVENT(key_confirmed)", peer);
peer.timers_handshake_complete();
}
}
diff --git a/src/wireguard/workers.rs b/src/wireguard/workers.rs
index 62d531d..e1d3899 100644
--- a/src/wireguard/workers.rs
+++ b/src/wireguard/workers.rs
@@ -20,7 +20,7 @@ use super::udp::UDP;
// constants
use super::constants::{
DURATION_UNDER_LOAD, MAX_QUEUED_INCOMING_HANDSHAKES, MESSAGE_PADDING_MULTIPLE,
- THRESHOLD_UNDER_LOAD, TIME_HORIZON,
+ THRESHOLD_UNDER_LOAD,
};
use super::handshake::MAX_HANDSHAKE_MSG_SIZE;
use super::handshake::{TYPE_COOKIE_REPLY, TYPE_INITIATION, TYPE_RESPONSE};
@@ -102,8 +102,6 @@ pub fn tun_worker<T: Tun, B: UDP>(wg: &WireGuard<T, B>, reader: T::Reader) {
}
pub fn udp_worker<T: Tun, B: UDP>(wg: &WireGuard<T, B>, reader: B::Reader) {
- let mut last_under_load = Instant::now() - TIME_HORIZON;
-
loop {
// create vector big enough for any message given current MTU
let mtu = wg.mtu.load(Ordering::Relaxed);
@@ -160,26 +158,26 @@ pub fn handshake_worker<T: Tun, B: UDP>(
// process elements from the handshake queue
for job in rx {
// check if under load
+ let mut under_load = false;
let job: HandshakeJob<B::Endpoint> = job;
let pending = wg.pending.fetch_sub(1, Ordering::SeqCst);
- let mut under_load = false;
-
debug_assert!(pending < MAX_QUEUED_INCOMING_HANDSHAKES + (1 << 16));
// immediate go under load if too many handshakes pending
if pending > THRESHOLD_UNDER_LOAD {
+ log::trace!("{} : handshake worker, under load (above threshold)", wg);
*wg.last_under_load.lock() = Instant::now();
under_load = true;
}
- // remain under load for a while
+ // remain under load for DURATION_UNDER_LOAD
if !under_load {
let elapsed = wg.last_under_load.lock().elapsed();
- if elapsed > DURATION_UNDER_LOAD {
+ if DURATION_UNDER_LOAD >= elapsed {
+ log::trace!("{} : handshake worker, under load (recent)", wg);
under_load = true;
}
}
- log::trace!("{} : handshake worker, under_load = {}", wg, under_load);
// de-multiplex staged handshake jobs and handshake messages
match job {