From 74e576a9c21b0de451e0588428fbbb99b24eb074 Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Fri, 6 Dec 2019 21:45:21 +0100 Subject: Fixed inbound job bug (add to sequential queue) --- src/wireguard/router/device.rs | 63 +++------- src/wireguard/router/inbound.rs | 22 ++-- src/wireguard/router/mod.rs | 1 + src/wireguard/router/outbound.rs | 4 + src/wireguard/router/peer.rs | 4 +- src/wireguard/router/pool.rs | 1 + src/wireguard/router/queue.rs | 46 +++++++ src/wireguard/router/route.rs | 13 +- src/wireguard/router/tests.rs | 253 +++++++++++++++++++++++---------------- src/wireguard/timers.rs | 89 ++++++++------ 10 files changed, 289 insertions(+), 207 deletions(-) create mode 100644 src/wireguard/router/queue.rs (limited to 'src') diff --git a/src/wireguard/router/device.rs b/src/wireguard/router/device.rs index 88eeae1..e405446 100644 --- a/src/wireguard/router/device.rs +++ b/src/wireguard/router/device.rs @@ -1,8 +1,6 @@ use std::collections::HashMap; use std::ops::Deref; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::mpsc::sync_channel; -use std::sync::mpsc::{Receiver, SyncSender}; +use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::thread; use std::time::Instant; @@ -25,47 +23,7 @@ use super::SIZE_MESSAGE_PREFIX; use super::route::RoutingTable; use super::super::{tun, udp, Endpoint, KeyPair}; - -pub struct ParallelQueue { - next: AtomicUsize, // next round-robin index - queues: Vec>>, // work queues (1 per thread) -} - -impl ParallelQueue { - fn new(queues: usize) -> (Vec>, Self) { - let mut rxs = vec![]; - let mut txs = vec![]; - - for _ in 0..queues { - let (tx, rx) = sync_channel(128); - txs.push(Mutex::new(tx)); - rxs.push(rx); - } - - ( - rxs, - ParallelQueue { - next: AtomicUsize::new(0), - queues: txs, - }, - ) - } - - pub fn send(&self, v: T) { - let len = self.queues.len(); - let idx = self.next.fetch_add(1, Ordering::SeqCst); - let que = self.queues[idx % len].lock(); - que.send(v).unwrap(); - } - - pub fn close(&self) { - for i in 0..self.queues.len() { - let (tx, _) = sync_channel(0); - let queue = &self.queues[i]; - *queue.lock() = tx; - } - } -} +use super::queue::ParallelQueue; pub struct DeviceInner> { // inbound writer (TUN) @@ -171,16 +129,25 @@ impl> DeviceHandle< // start worker threads let mut threads = Vec::with_capacity(num_workers); + for _ in 0..num_workers { let rx = inrx.pop().unwrap(); - threads.push(thread::spawn(move || inbound::worker(rx))); + threads.push(thread::spawn(move || { + log::debug!("inbound router worker started"); + inbound::worker(rx) + })); } for _ in 0..num_workers { let rx = outrx.pop().unwrap(); - threads.push(thread::spawn(move || outbound::worker(rx))); + threads.push(thread::spawn(move || { + log::debug!("outbound router worker started"); + outbound::worker(rx) + })); } + debug_assert_eq!(threads.len(), num_workers * 2); + // return exported device handle DeviceHandle { state: Device { @@ -274,7 +241,7 @@ impl> DeviceHandle< ); log::trace!( - "Router, handle transport message: (receiver = {}, counter = {})", + "handle transport message: (receiver = {}, counter = {})", header.f_receiver, header.f_counter ); @@ -287,9 +254,9 @@ impl> DeviceHandle< // schedule for decryption and TUN write if let Some(job) = dec.peer.recv_job(src, dec.clone(), msg) { + log::trace!("schedule decryption of transport message"); self.state.inbound_queue.send(job); } - Ok(()) } diff --git a/src/wireguard/router/inbound.rs b/src/wireguard/router/inbound.rs index d4ad307..3d47bb7 100644 --- a/src/wireguard/router/inbound.rs +++ b/src/wireguard/router/inbound.rs @@ -42,6 +42,8 @@ fn parallel>( peer: &Peer, body: &mut Inbound, ) { + log::trace!("worker, parallel section, obtained job"); + // cast to header followed by payload let (header, packet): (LayoutVerified<&mut [u8], TransportHeader>, &mut [u8]) = match LayoutVerified::new_from_prefix(&mut body.msg[..]) { @@ -70,6 +72,7 @@ fn parallel>( Ok(_) => (), Err(_) => { // fault and return early + log::trace!("inbound worker: authentication failure"); body.failed = true; return; } @@ -89,9 +92,15 @@ fn parallel>( // truncate to remove tag match inner_len { None => { + log::trace!("inbound worker: cryptokey routing failed"); body.failed = true; } Some(len) => { + log::trace!( + "inbound worker: good route, length = {} {}", + len, + if len == 0 { "(keepalive)" } else { "" } + ); body.msg.truncate(mem::size_of::() + len); } } @@ -102,8 +111,11 @@ fn sequential>( peer: &Peer, body: &mut Inbound, ) { + log::trace!("worker, sequential section, obtained job"); + // decryption failed, return early if body.failed { + log::trace!("job faulted, remove from queue and ignore"); return; } @@ -116,10 +128,6 @@ fn sequential>( return; } }; - debug_assert!( - packet.len() >= CHACHA20_POLY1305.tag_len(), - "this should be checked earlier in the pipeline (decryption should fail)" - ); // check for replay if !body.state.protector.lock().update(header.f_counter.get()) { @@ -136,13 +144,9 @@ fn sequential>( // update endpoint *peer.endpoint.lock() = body.endpoint.take(); - // calculate length of IP packet + padding - let length = packet.len() - SIZE_TAG; - log::debug!("inbound worker: plaintext length = {}", length); - // check if should be written to TUN let mut sent = false; - if length > 0 { + if packet.len() > 0 { sent = match peer.device.inbound.write(&packet[..]) { Err(e) => { log::debug!("failed to write inbound packet to TUN: {:?}", e); diff --git a/src/wireguard/router/mod.rs b/src/wireguard/router/mod.rs index 3243b88..bccb0a9 100644 --- a/src/wireguard/router/mod.rs +++ b/src/wireguard/router/mod.rs @@ -7,6 +7,7 @@ mod messages; mod outbound; mod peer; mod pool; +mod queue; mod route; mod types; diff --git a/src/wireguard/router/outbound.rs b/src/wireguard/router/outbound.rs index 30b7c2c..d08637b 100644 --- a/src/wireguard/router/outbound.rs +++ b/src/wireguard/router/outbound.rs @@ -35,6 +35,8 @@ fn parallel>( _peer: &Peer, body: &mut Outbound, ) { + log::trace!("worker, parallel section, obtained job"); + // make space for the tag body.msg.extend([0u8; SIZE_TAG].iter()); @@ -77,6 +79,8 @@ fn sequential>( peer: &Peer, body: &mut Outbound, ) { + log::trace!("worker, sequential section, obtained job"); + // send to peer let xmit = peer.send(&body.msg[..]).is_ok(); diff --git a/src/wireguard/router/peer.rs b/src/wireguard/router/peer.rs index 192d4e2..40442a8 100644 --- a/src/wireguard/router/peer.rs +++ b/src/wireguard/router/peer.rs @@ -276,7 +276,9 @@ impl> Peer>, msg: Vec, ) -> Option>> { - Some(Job::new(self.clone(), Inbound::new(msg, dec, src))) + let job = Job::new(self.clone(), Inbound::new(msg, dec, src)); + self.inbound.send(job.clone()); + Some(job) } pub fn send_job(&self, msg: Vec, stage: bool) -> Option> { diff --git a/src/wireguard/router/pool.rs b/src/wireguard/router/pool.rs index 12956c1..9c72372 100644 --- a/src/wireguard/router/pool.rs +++ b/src/wireguard/router/pool.rs @@ -106,6 +106,7 @@ pub fn worker_template< work_sequential: S, // perform sequential work on peer queue: Q, // resolve a peer to an inorder queue ) { + log::trace!("router worker started"); loop { // handle new job let peer = { diff --git a/src/wireguard/router/queue.rs b/src/wireguard/router/queue.rs new file mode 100644 index 0000000..5d0165c --- /dev/null +++ b/src/wireguard/router/queue.rs @@ -0,0 +1,46 @@ +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::mpsc::sync_channel; +use std::sync::mpsc::{Receiver, SyncSender}; + +use spin::Mutex; + +pub struct ParallelQueue { + next: AtomicUsize, // next round-robin index + queues: Vec>>, // work queues (1 per thread) +} + +impl ParallelQueue { + pub fn new(queues: usize) -> (Vec>, Self) { + let mut rxs = vec![]; + let mut txs = vec![]; + + for _ in 0..queues { + let (tx, rx) = sync_channel(128); + txs.push(Mutex::new(tx)); + rxs.push(rx); + } + + ( + rxs, + ParallelQueue { + next: AtomicUsize::new(0), + queues: txs, + }, + ) + } + + pub fn send(&self, v: T) { + let len = self.queues.len(); + let idx = self.next.fetch_add(1, Ordering::SeqCst); + let que = self.queues[idx % len].lock(); + que.send(v).unwrap(); + } + + pub fn close(&self) { + for i in 0..self.queues.len() { + let (tx, _) = sync_channel(0); + let queue = &self.queues[i]; + *queue.lock() = tx; + } + } +} diff --git a/src/wireguard/router/route.rs b/src/wireguard/router/route.rs index 40dc36b..56ad32f 100644 --- a/src/wireguard/router/route.rs +++ b/src/wireguard/router/route.rs @@ -81,7 +81,7 @@ impl RoutingTable { LayoutVerified::new_from_prefix(packet)?; log::trace!( - "Router, get route for IPv4 destination: {:?}", + "router, get route for IPv4 destination: {:?}", Ipv4Addr::from(header.f_destination) ); @@ -97,7 +97,7 @@ impl RoutingTable { LayoutVerified::new_from_prefix(packet)?; log::trace!( - "Router, get route for IPv6 destination: {:?}", + "router, get route for IPv6 destination: {:?}", Ipv6Addr::from(header.f_destination) ); @@ -107,7 +107,10 @@ impl RoutingTable { .longest_match(Ipv6Addr::from(header.f_destination)) .and_then(|(_, _, p)| Some(p.clone())) } - _ => None, + v => { + log::trace!("router, invalid IP version {}", v); + None + }, } } @@ -120,7 +123,7 @@ impl RoutingTable { LayoutVerified::new_from_prefix(packet)?; log::trace!( - "Router, check route for IPv4 source: {:?}", + "router, check route for IPv4 source: {:?}", Ipv4Addr::from(header.f_source) ); @@ -142,7 +145,7 @@ impl RoutingTable { LayoutVerified::new_from_prefix(packet)?; log::trace!( - "Router, check route for IPv6 source: {:?}", + "router, check route for IPv6 source: {:?}", Ipv6Addr::from(header.f_source) ); diff --git a/src/wireguard/router/tests.rs b/src/wireguard/router/tests.rs index d96dc90..1f500c0 100644 --- a/src/wireguard/router/tests.rs +++ b/src/wireguard/router/tests.rs @@ -9,7 +9,7 @@ use num_cpus; use super::super::dummy; use super::super::dummy_keypair; -use super::super::tests::make_packet_dst; +use super::super::tests::make_packet; use super::super::udp::*; use super::KeyPair; use super::SIZE_MESSAGE_PREFIX; @@ -105,15 +105,15 @@ mod tests { // wait for scheduling fn wait() { - thread::sleep(Duration::from_millis(50)); + thread::sleep(Duration::from_millis(15)); } fn init() { let _ = env_logger::builder().is_test(true).try_init(); } - fn make_packet_dst_padded(size: usize, dst: IpAddr, id: u64) -> Vec { - let p = make_packet_dst(size, dst, id); + fn make_packet_padded(size: usize, src: IpAddr, dst: IpAddr, id: u64) -> Vec { + let p = make_packet(size, src, dst, id); let mut o = vec![0; p.len() + SIZE_MESSAGE_PREFIX]; o[SIZE_MESSAGE_PREFIX..SIZE_MESSAGE_PREFIX + p.len()].copy_from_slice(&p[..]); o @@ -149,15 +149,21 @@ mod tests { peer.add_keypair(dummy_keypair(true)); // add subnet to peer - let (mask, len, ip) = ("192.168.1.0", 24, "192.168.1.20"); + let (mask, len, dst) = ("192.168.1.0", 24, "192.168.1.20"); let mask: IpAddr = mask.parse().unwrap(); - let ip1: IpAddr = ip.parse().unwrap(); peer.add_allowed_ip(mask, len); + // create "IP packet" + let dst = dst.parse().unwrap(); + let src = match dst { + IpAddr::V4(_) => "127.0.0.1".parse().unwrap(), + IpAddr::V6(_) => "::1".parse().unwrap() + }; + let msg = make_packet_padded(1024, src, dst, 0); + // every iteration sends 10 GB b.iter(|| { opaque.store(0, Ordering::SeqCst); - let msg = make_packet_dst_padded(1024, ip1, 0); while opaque.load(Ordering::Acquire) < 10 * 1024 * 1024 { router.send(msg.to_vec()).unwrap(); } @@ -197,7 +203,8 @@ mod tests { ), ]; - for (num, (mask, len, ip, okay)) in tests.iter().enumerate() { + for (num, (mask, len, dst, okay)) in tests.iter().enumerate() { + println!("Check: {} {} {}/{}", dst, if *okay { "\\in" } else { "\\notin" }, mask, len); for set_key in vec![true, false] { debug!("index = {}, set_key = {}", num, set_key); @@ -213,7 +220,12 @@ mod tests { peer.add_allowed_ip(mask, *len); // create "IP packet" - let msg = make_packet_dst_padded(1024, ip.parse().unwrap(), 0); + let dst = dst.parse().unwrap(); + let src = match dst { + IpAddr::V4(_) => "127.0.0.1".parse().unwrap(), + IpAddr::V6(_) => "::1".parse().unwrap() + }; + let msg = make_packet_padded(1024, src, dst, 0); // cryptkey route the IP packet let res = router.send(msg); @@ -269,17 +281,14 @@ mod tests { let tests = [ ( - false, // confirm with keepalive ("192.168.1.0", 24, "192.168.1.20", true), ("172.133.133.133", 32, "172.133.133.133", true), ), ( - true, // confirm with staged packet ("192.168.1.0", 24, "192.168.1.20", true), ("172.133.133.133", 32, "172.133.133.133", true), ), ( - false, // confirm with keepalive ( "2001:db8::ff00:42:8000", 113, @@ -294,7 +303,6 @@ mod tests { ), ), ( - false, // confirm with staged packet ( "2001:db8::ff00:42:8000", 113, @@ -310,117 +318,152 @@ mod tests { ), ]; - for (stage, p1, p2) in tests.iter() { - let ((bind_reader1, bind_writer1), (bind_reader2, bind_writer2)) = - dummy::PairBind::pair(); + for stage in vec![true, false] { + for (p1, p2) in tests.iter() { + let ((bind_reader1, bind_writer1), (bind_reader2, bind_writer2)) = + dummy::PairBind::pair(); - // create matching device - let (_fake, _, tun_writer1, _) = dummy::TunTest::create(false); - let (_fake, _, tun_writer2, _) = dummy::TunTest::create(false); + // create matching device + let (_fake, _, tun_writer1, _) = dummy::TunTest::create(false); + let (_fake, _, tun_writer2, _) = dummy::TunTest::create(false); - let router1: Device<_, TestCallbacks, _, _> = Device::new(1, tun_writer1); - router1.set_outbound_writer(bind_writer1); + let router1: Device<_, TestCallbacks, _, _> = Device::new(1, tun_writer1); + router1.set_outbound_writer(bind_writer1); - let router2: Device<_, TestCallbacks, _, _> = Device::new(1, tun_writer2); - router2.set_outbound_writer(bind_writer2); + let router2: Device<_, TestCallbacks, _, _> = Device::new(1, tun_writer2); + router2.set_outbound_writer(bind_writer2); - // prepare opaque values for tracing callbacks + // prepare opaque values for tracing callbacks - let opaq1 = Opaque::new(); - let opaq2 = Opaque::new(); + let opaque1 = Opaque::new(); + let opaque2 = Opaque::new(); - // create peers with matching keypairs and assign subnets + // create peers with matching keypairs and assign subnets - let (mask, len, _ip, _okay) = p1; - let peer1 = router1.new_peer(opaq1.clone()); - let mask: IpAddr = mask.parse().unwrap(); - peer1.add_allowed_ip(mask, *len); - peer1.add_keypair(dummy_keypair(false)); + let peer1 = router1.new_peer(opaque1.clone()); + let peer2 = router2.new_peer(opaque2.clone()); - let (mask, len, _ip, _okay) = p2; - let peer2 = router2.new_peer(opaq2.clone()); - let mask: IpAddr = mask.parse().unwrap(); - peer2.add_allowed_ip(mask, *len); - peer2.set_endpoint(dummy::UnitEndpoint::new()); + { + let (mask, len, _ip, _okay) = p1; + let mask: IpAddr = mask.parse().unwrap(); + peer1.add_allowed_ip(mask, *len); + peer1.add_keypair(dummy_keypair(false)); + } - if *stage { - // stage a packet which can be used for confirmation (in place of a keepalive) - let (_mask, _len, ip, _okay) = p2; - let msg = make_packet_dst_padded(1024, ip.parse().unwrap(), 0); - router2.send(msg).expect("failed to sent staged packet"); + { + let (mask, len, _ip, _okay) = p2; + let mask: IpAddr = mask.parse().unwrap(); + peer2.add_allowed_ip(mask, *len); + peer2.set_endpoint(dummy::UnitEndpoint::new()); + } - wait(); - assert!(opaq2.recv().is_none()); - assert!( - opaq2.send().is_none(), - "sending should fail as not key is set" - ); - assert!( - opaq2.need_key().is_some(), - "a new key should be requested since a packet was attempted transmitted" - ); - assert!(opaq2.is_empty(), "callbacks should only run once"); - } + if stage { + println!("confirm using staged packet"); + + // create IP packet + let (_mask, _len, ip1, _okay) = p1; + let (_mask, _len, ip2, _okay) = p2; + let msg = make_packet_padded( + 1024, + ip1.parse().unwrap(), // src + ip2.parse().unwrap(), // dst + 0, + ); - // this should cause a key-confirmation packet (keepalive or staged packet) - // this also causes peer1 to learn the "endpoint" for peer2 - assert!(peer1.get_endpoint().is_none()); - peer2.add_keypair(dummy_keypair(true)); - - wait(); - assert!(opaq2.send().is_some()); - assert!(opaq2.is_empty(), "events on peer2 should be 'send'"); - assert!(opaq1.is_empty(), "nothing should happened on peer1"); - - // read confirming message received by the other end ("across the internet") - let mut buf = vec![0u8; 2048]; - let (len, from) = bind_reader1.read(&mut buf).unwrap(); - buf.truncate(len); - router1.recv(from, buf).unwrap(); - - wait(); - assert!(opaq1.recv().is_some()); - assert!(opaq1.key_confirmed().is_some()); - assert!( - opaq1.is_empty(), - "events on peer1 should be 'recv' and 'key_confirmed'" - ); - assert!(peer1.get_endpoint().is_some()); - assert!(opaq2.is_empty(), "nothing should happened on peer2"); - - // now that peer1 has an endpoint - // route packets : peer1 -> peer2 - - for id in 0..10 { - assert!( - opaq1.is_empty(), - "we should have asserted a value for every callback on peer1" - ); - assert!( - opaq2.is_empty(), - "we should have asserted a value for every callback on peer2" - ); + // stage packet for sending + router2.send(msg).expect("failed to sent staged packet"); + wait(); + + // validate events + assert!(opaque2.recv().is_none()); + assert!( + opaque2.send().is_none(), + "sending should fail as not key is set" + ); + assert!( + opaque2.need_key().is_some(), + "a new key should be requested since a packet was attempted transmitted" + ); + assert!(opaque2.is_empty(), "callbacks should only run once"); + } - // pass IP packet to router - let (_mask, _len, ip, _okay) = p1; - let msg = make_packet_dst_padded(1024, ip.parse().unwrap(), id); - router1.send(msg).unwrap(); + // this should cause a key-confirmation packet (keepalive or staged packet) + // this also causes peer1 to learn the "endpoint" for peer2 + assert!(peer1.get_endpoint().is_none()); + peer2.add_keypair(dummy_keypair(true)); wait(); - assert!(opaq1.send().is_some()); - assert!(opaq1.recv().is_none()); - assert!(opaq1.need_key().is_none()); + assert!(opaque2.send().is_some()); + assert!(opaque2.is_empty(), "events on peer2 should be 'send'"); + assert!(opaque1.is_empty(), "nothing should happened on peer1"); - // receive ("across the internet") on the other end + // read confirming message received by the other end ("across the internet") let mut buf = vec![0u8; 2048]; - let (len, from) = bind_reader2.read(&mut buf).unwrap(); + let (len, from) = bind_reader1.read(&mut buf).unwrap(); buf.truncate(len); - router2.recv(from, buf).unwrap(); + router1.recv(from, buf).unwrap(); wait(); - assert!(opaq2.send().is_none()); - assert!(opaq2.recv().is_some()); - assert!(opaq2.need_key().is_none()); + assert!(opaque1.recv().is_some()); + assert!(opaque1.key_confirmed().is_some()); + assert!( + opaque1.is_empty(), + "events on peer1 should be 'recv' and 'key_confirmed'" + ); + assert!(peer1.get_endpoint().is_some()); + assert!(opaque2.is_empty(), "nothing should happened on peer2"); + + // now that peer1 has an endpoint + // route packets : peer1 -> peer2 + + for id in 1..11 { + println!("round: {}", id); + assert!( + opaque1.is_empty(), + "we should have asserted a value for every callback on peer1" + ); + assert!( + opaque2.is_empty(), + "we should have asserted a value for every callback on peer2" + ); + + // pass IP packet to router + let (_mask, _len, ip1, _okay) = p1; + let (_mask, _len, ip2, _okay) = p2; + let msg = + make_packet_padded( + 1024, + ip2.parse().unwrap(), // src + ip1.parse().unwrap(), // dst + id + ); + router1.send(msg).unwrap(); + + wait(); + assert!(opaque1.send().is_some(), "encryption should succeed"); + assert!( + opaque1.recv().is_none(), + "receiving callback should not be called" + ); + assert!(opaque1.need_key().is_none()); + + // receive ("across the internet") on the other end + let mut buf = vec![0u8; 2048]; + let (len, from) = bind_reader2.read(&mut buf).unwrap(); + buf.truncate(len); + router2.recv(from, buf).unwrap(); + + wait(); + assert!( + opaque2.send().is_none(), + "sending callback should not be called" + ); + assert!( + opaque2.recv().is_some(), + "decryption and routing should succeed" + ); + assert!(opaque2.need_key().is_none()); + } } } } diff --git a/src/wireguard/timers.rs b/src/wireguard/timers.rs index e1aabad..f292afd 100644 --- a/src/wireguard/timers.rs +++ b/src/wireguard/timers.rs @@ -3,14 +3,14 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime}; -use log::debug; use hjul::{Runner, Timer}; +use log::debug; use super::constants::*; use super::router::{message_data_len, Callbacks}; -use super::{Peer, PeerInner}; -use super::{udp, tun}; use super::types::KeyPair; +use super::{tun, udp}; +use super::{Peer, PeerInner}; pub struct Timers { // only updated during configuration @@ -36,7 +36,6 @@ impl Timers { } impl PeerInner { - pub fn get_keepalive_interval(&self) -> u64 { self.timers().keepalive_interval } @@ -57,17 +56,19 @@ impl PeerInner { timers.send_persistent_keepalive.stop(); timers.zero_key_material.stop(); timers.new_handshake.stop(); - + // reset all timer state timers.handshake_attempts.store(0, Ordering::SeqCst); - timers.sent_lastminute_handshake.store(false, Ordering::SeqCst); + timers + .sent_lastminute_handshake + .store(false, Ordering::SeqCst); timers.need_another_keepalive.store(false, Ordering::SeqCst); } pub fn start_timers(&self) { // take a write lock preventing simultaneous "stop_timers" call let mut timers = self.timers_mut(); - + // set flag to reenable timer events if timers.enabled { return; @@ -76,18 +77,20 @@ impl PeerInner { // start send_persistent_keepalive if timers.keepalive_interval > 0 { - timers.send_persistent_keepalive.start( - Duration::from_secs(timers.keepalive_interval) - ); + timers + .send_persistent_keepalive + .start(Duration::from_secs(timers.keepalive_interval)); } } /* should be called after an authenticated data packet is sent */ pub fn timers_data_sent(&self) { - let timers = self.timers(); - if timers.enabled { - timers.new_handshake.start(KEEPALIVE_TIMEOUT + REKEY_TIMEOUT); - } + let timers = self.timers(); + if timers.enabled { + timers + .new_handshake + .start(KEEPALIVE_TIMEOUT + REKEY_TIMEOUT); + } } /* should be called after an authenticated data packet is received */ @@ -139,7 +142,9 @@ impl PeerInner { if timers.enabled { timers.retransmit_handshake.stop(); timers.handshake_attempts.store(0, Ordering::SeqCst); - timers.sent_lastminute_handshake.store(false, Ordering::SeqCst); + timers + .sent_lastminute_handshake + .store(false, Ordering::SeqCst); *self.walltime_last_handshake.lock() = Some(SystemTime::now()); } } @@ -161,9 +166,9 @@ impl PeerInner { let timers = self.timers(); if timers.enabled && timers.keepalive_interval > 0 { // push persistent_keepalive into the future - timers.send_persistent_keepalive.reset(Duration::from_secs( - timers.keepalive_interval - )); + timers + .send_persistent_keepalive + .reset(Duration::from_secs(timers.keepalive_interval)); } } @@ -179,7 +184,6 @@ impl PeerInner { if timers.enabled { timers.retransmit_handshake.reset(REKEY_TIMEOUT); } - } /* Called after a handshake worker sends a handshake initiation to the peer @@ -195,7 +199,7 @@ impl PeerInner { *self.last_handshake_sent.lock() = Instant::now(); self.timers_any_authenticated_packet_traversal(); self.timers_any_authenticated_packet_sent(); - } + } pub fn set_persistent_keepalive_interval(&self, secs: u64) { let mut timers = self.timers_mut(); @@ -205,10 +209,12 @@ impl PeerInner { // stop the keepalive timer with the old interval timers.send_persistent_keepalive.stop(); - + // restart the persistent_keepalive timer with the new interval if secs > 0 && timers.enabled { - timers.send_persistent_keepalive.start(Duration::from_secs(secs)); + timers + .send_persistent_keepalive + .start(Duration::from_secs(secs)); } } @@ -220,7 +226,6 @@ impl PeerInner { } } - impl Timers { pub fn new(runner: &Runner, running: bool, peer: Peer) -> Timers where @@ -242,9 +247,12 @@ impl Timers { if !timers.enabled { return; } - + // check if handshake attempts remaining - let attempts = peer.timers().handshake_attempts.fetch_add(1, Ordering::SeqCst); + let attempts = peer + .timers() + .handshake_attempts + .fetch_add(1, Ordering::SeqCst); if attempts > MAX_TIMER_HANDSHAKES { debug!( "Handshake for peer {} did not complete after {} attempts, giving up", @@ -257,8 +265,8 @@ impl Timers { } else { debug!( "Handshake for {} did not complete after {} seconds, retrying (try {})", - peer, - REKEY_TIMEOUT.as_secs(), + peer, + REKEY_TIMEOUT.as_secs(), attempts ); timers.retransmit_handshake.reset(REKEY_TIMEOUT); @@ -287,7 +295,7 @@ impl Timers { runner.timer(move || { debug!( "Retrying handshake with {} because we stopped hearing back after {} seconds", - peer, + peer, (KEEPALIVE_TIMEOUT + REKEY_TIMEOUT).as_secs() ); peer.router.clear_src(); @@ -307,9 +315,9 @@ impl Timers { if timers.enabled && timers.keepalive_interval > 0 { peer.router.send_keepalive(); timers.send_keepalive.stop(); - timers.send_persistent_keepalive.start(Duration::from_secs( - timers.keepalive_interval - )); + timers + .send_persistent_keepalive + .start(Duration::from_secs(timers.keepalive_interval)); } }) }, @@ -318,7 +326,7 @@ impl Timers { pub fn dummy(runner: &Runner) -> Timers { Timers { - enabled: false, + enabled: false, keepalive_interval: 0, need_another_keepalive: AtomicBool::new(false), sent_lastminute_handshake: AtomicBool::new(false), @@ -344,9 +352,8 @@ impl Callbacks for Events { */ #[inline(always)] fn send(peer: &Self::Opaque, size: usize, sent: bool, keypair: &Arc, counter: u64) { - // update timers and stats - + peer.timers_any_authenticated_packet_traversal(); peer.timers_any_authenticated_packet_sent(); peer.tx_bytes.fetch_add(size as u64, Ordering::Relaxed); @@ -375,7 +382,6 @@ impl Callbacks for Events { */ #[inline(always)] fn recv(peer: &Self::Opaque, size: usize, sent: bool, keypair: &Arc) { - // update timers and stats peer.timers_any_authenticated_packet_traversal(); @@ -386,13 +392,18 @@ impl Callbacks for Events { } // keep_key_fresh - + #[inline(always)] fn keep_key_fresh(keypair: &Arc) -> bool { - Instant::now() - keypair.birth > REJECT_AFTER_TIME - KEEPALIVE_TIMEOUT - REKEY_TIMEOUT + Instant::now() - keypair.birth > REJECT_AFTER_TIME - KEEPALIVE_TIMEOUT - REKEY_TIMEOUT } - if keep_key_fresh(keypair) && !peer.timers().sent_lastminute_handshake.swap(true, Ordering::Acquire) { + if keep_key_fresh(keypair) + && !peer + .timers() + .sent_lastminute_handshake + .swap(true, Ordering::Acquire) + { peer.packet_send_queued_handshake_initiation(false); } } @@ -405,7 +416,7 @@ impl Callbacks for Events { */ #[inline(always)] fn need_key(peer: &Self::Opaque) { - peer.packet_send_queued_handshake_initiation(false); + peer.packet_send_queued_handshake_initiation(false); } #[inline(always)] -- cgit v1.2.3-59-g8ed1b