diff options
Diffstat (limited to 'src/wireguard')
-rw-r--r-- | src/wireguard/constants.rs | 2 | ||||
-rw-r--r-- | src/wireguard/peer.rs | 6 | ||||
-rw-r--r-- | src/wireguard/queue.rs | 2 | ||||
-rw-r--r-- | src/wireguard/router/constants.rs | 4 | ||||
-rw-r--r-- | src/wireguard/router/inbound.rs | 4 | ||||
-rw-r--r-- | src/wireguard/router/outbound.rs | 34 | ||||
-rw-r--r-- | src/wireguard/router/pool.rs | 47 | ||||
-rw-r--r-- | src/wireguard/router/runq.rs | 27 | ||||
-rw-r--r-- | src/wireguard/wireguard.rs | 8 |
9 files changed, 92 insertions, 42 deletions
diff --git a/src/wireguard/constants.rs b/src/wireguard/constants.rs index c53c559..97ce6b1 100644 --- a/src/wireguard/constants.rs +++ b/src/wireguard/constants.rs @@ -23,4 +23,4 @@ pub const MESSAGE_PADDING_MULTIPLE: usize = 16; * used in places to avoid Option<Instant> by instead using a long "expired" Instant: * (Instant::now() - TIME_HORIZON) */ -pub const TIME_HORIZON: Duration = Duration::from_secs(3600 * 24); +pub const TIME_HORIZON: Duration = Duration::from_secs(60 * 60 * 24); diff --git a/src/wireguard/peer.rs b/src/wireguard/peer.rs index 448db96..85e340f 100644 --- a/src/wireguard/peer.rs +++ b/src/wireguard/peer.rs @@ -28,9 +28,9 @@ pub struct PeerInner<T: Tun, B: UDP> { pub wg: Arc<WireguardInner<T, B>>, // handshake state - pub walltime_last_handshake: Mutex<Option<SystemTime>>, - pub last_handshake_sent: Mutex<Instant>, // instant for last handshake - pub handshake_queued: AtomicBool, // is a handshake job currently queued for the peer? + pub walltime_last_handshake: Mutex<Option<SystemTime>>, // walltime for last handshake (for UAPI status) + pub last_handshake_sent: Mutex<Instant>, // instant for last handshake + pub handshake_queued: AtomicBool, // is a handshake job currently queued for the peer? // stats and configuration pub pk: PublicKey, // public key, DISCUSS: avoid this. TODO: remove diff --git a/src/wireguard/queue.rs b/src/wireguard/queue.rs index a0fcf03..f484320 100644 --- a/src/wireguard/queue.rs +++ b/src/wireguard/queue.rs @@ -21,7 +21,7 @@ impl<T> ParallelQueue<T> { /// /// # Arguments /// - /// - `queues`: number of readers/writers + /// - `queues`: number of readers /// - `capacity`: capacity of each internal queue /// pub fn new(queues: usize, capacity: usize) -> (Self, Vec<Receiver<T>>) { diff --git a/src/wireguard/router/constants.rs b/src/wireguard/router/constants.rs index 0ca824a..6129fd7 100644 --- a/src/wireguard/router/constants.rs +++ b/src/wireguard/router/constants.rs @@ -4,4 +4,6 @@ pub const MAX_STAGED_PACKETS: usize = 128; // performance constants -pub const WORKER_QUEUE_SIZE: usize = MAX_STAGED_PACKETS; +pub const PARALLEL_QUEUE_SIZE: usize = MAX_STAGED_PACKETS; +pub const INORDER_QUEUE_SIZE: usize = PARALLEL_QUEUE_SIZE; +pub const MAX_INORDER_CONSUME: usize = INORDER_QUEUE_SIZE; diff --git a/src/wireguard/router/inbound.rs b/src/wireguard/router/inbound.rs index 5a27c95..db6d3f3 100644 --- a/src/wireguard/router/inbound.rs +++ b/src/wireguard/router/inbound.rs @@ -1,3 +1,4 @@ +use super::constants::MAX_INORDER_CONSUME; use super::device::DecryptionState; use super::device::Device; use super::messages::TransportHeader; @@ -185,6 +186,7 @@ pub fn sequential<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>( // handle message from the peers inbound queue device.run_inbound.run(|peer| { - peer.inbound.handle(|body| work(&peer, body)); + peer.inbound + .handle(|body| work(&peer, body), MAX_INORDER_CONSUME) }); } diff --git a/src/wireguard/router/outbound.rs b/src/wireguard/router/outbound.rs index 9ecffd8..a555ecb 100644 --- a/src/wireguard/router/outbound.rs +++ b/src/wireguard/router/outbound.rs @@ -1,3 +1,4 @@ +use super::constants::MAX_INORDER_CONSUME; use super::device::Device; use super::messages::{TransportHeader, TYPE_TRANSPORT}; use super::peer::Peer; @@ -88,20 +89,23 @@ pub fn sequential<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>( device: Device<E, C, T, B>, ) { device.run_outbound.run(|peer| { - peer.outbound.handle(|body| { - log::trace!("worker, sequential section, obtained job"); - - // send to peer - let xmit = peer.send(&body.msg[..]).is_ok(); - - // trigger callback - C::send( - &peer.opaque, - body.msg.len(), - xmit, - &body.keypair, - body.counter, - ); - }); + peer.outbound.handle( + |body| { + log::trace!("worker, sequential section, obtained job"); + + // send to peer + let xmit = peer.send(&body.msg[..]).is_ok(); + + // trigger callback + C::send( + &peer.opaque, + body.msg.len(), + xmit, + &body.keypair, + body.counter, + ); + }, + MAX_INORDER_CONSUME, + ) }); } diff --git a/src/wireguard/router/pool.rs b/src/wireguard/router/pool.rs index 07a9bfa..686c788 100644 --- a/src/wireguard/router/pool.rs +++ b/src/wireguard/router/pool.rs @@ -4,10 +4,9 @@ use std::mem; use std::sync::mpsc::Receiver; use std::sync::Arc; +use super::constants::INORDER_QUEUE_SIZE; use super::runq::{RunQueue, ToKey}; -const INORDER_QUEUE_SIZE: usize = 64; - pub struct InnerJob<P, B> { // peer (used by worker to schedule/handle inorder queue), // when the peer is None, the job is complete @@ -52,28 +51,50 @@ pub struct InorderQueue<P, B> { } impl<P, B> InorderQueue<P, B> { - pub fn send(&self, job: Job<P, B>) -> bool { - self.queue.lock().push_back(job).is_ok() - } - pub fn new() -> InorderQueue<P, B> { InorderQueue { queue: Mutex::new(ArrayDeque::new()), } } + /// Add a new job to the in-order queue + /// + /// # Arguments + /// + /// - `job`: The job added to the back of the queue + /// + /// # Returns + /// + /// True if the element was added, + /// false to indicate that the queue is full. + pub fn send(&self, job: Job<P, B>) -> bool { + self.queue.lock().push_back(job).is_ok() + } + + /// Consume completed jobs from the in-order queue + /// + /// # Arguments + /// + /// - `f`: function to apply to the body of each jobof each job. + /// - `limit`: maximum number of jobs to handle before returning + /// + /// # Returns + /// + /// A boolean indicating if the limit was reached: + /// true indicating that the limit was reached, + /// while false implies that the queue is empty or an uncompleted job was reached. #[inline(always)] - pub fn handle<F: Fn(&mut B)>(&self, f: F) { + pub fn handle<F: Fn(&mut B)>(&self, f: F, mut limit: usize) -> bool { // take the mutex let mut queue = self.queue.lock(); - loop { + while limit > 0 { // attempt to extract front element let front = queue.pop_front(); let elem = match front { Some(elem) => elem, _ => { - return; + return false; } }; @@ -90,13 +111,17 @@ impl<P, B> InorderQueue<P, B> { // job not complete yet, return job to front if ret { queue.push_front(elem).unwrap(); - return; + return false; } + limit -= 1; } + + // did not complete all jobs + true } } -/// Allows easy construction of a semi-parallel worker. +/// Allows easy construction of a parallel worker. /// Applicable for both decryption and encryption workers. #[inline(always)] pub fn worker_parallel< diff --git a/src/wireguard/router/runq.rs b/src/wireguard/router/runq.rs index 936a53c..44e11a1 100644 --- a/src/wireguard/router/runq.rs +++ b/src/wireguard/router/runq.rs @@ -58,7 +58,21 @@ impl<T: ToKey> RunQueue<T> { } } - pub fn run<F: Fn(&T) -> ()>(&self, f: F) { + /// Run (consume from) the run queue using the provided function. + /// The function should return wheter the given element should be rescheduled. + /// + /// # Arguments + /// + /// - `f` : function to apply to every element + /// + /// # Note + /// + /// The function f may be called again even when the element was not inserted back in to the + /// queue since the last applciation and no rescheduling was requested. + /// + /// This happens then the function handles all work for T, + /// but T is added to the run queue while the function is running. + pub fn run<F: Fn(&T) -> bool>(&self, f: F) { let mut inner = self.inner.lock().unwrap(); loop { // fetch next element @@ -86,10 +100,16 @@ impl<T: ToKey> RunQueue<T> { mem::drop(inner); // drop guard // handle element - f(&elem); + let rerun = f(&elem); - // retake lock and check if should be added back to queue + // if the function requested a re-run add the element to the back of the queue inner = self.inner.lock().unwrap(); + if rerun { + inner.queue.push_back(elem); + continue; + } + + // otherwise check if new requests have come in since we ran the function match inner.members.entry(key) { Entry::Occupied(occ) => { if *occ.get() == old_n { @@ -111,7 +131,6 @@ impl<T: ToKey> RunQueue<T> { #[cfg(test)] mod tests { use super::*; - use std::sync::Arc; use std::thread; use std::time::Duration; diff --git a/src/wireguard/wireguard.rs b/src/wireguard/wireguard.rs index d0c0e53..2cd6ce4 100644 --- a/src/wireguard/wireguard.rs +++ b/src/wireguard/wireguard.rs @@ -343,8 +343,7 @@ impl<T: tun::Tun, B: udp::UDP> Wireguard<T, B> { // create vector big enough for any message given current MTU let mtu = wg.mtu.load(Ordering::Relaxed); let size = mtu + handshake::MAX_HANDSHAKE_MSG_SIZE; - let mut msg: Vec<u8> = Vec::with_capacity(size); - msg.resize(size, 0); + let mut msg: Vec<u8> = vec![0; size]; // read UDP packet into vector let (size, src) = match reader.read(&mut msg) { @@ -413,8 +412,7 @@ impl<T: tun::Tun, B: udp::UDP> Wireguard<T, B> { // create vector big enough for any transport message (based on MTU) let mtu = wg.mtu.load(Ordering::Relaxed); let size = mtu + router::SIZE_MESSAGE_PREFIX + 1; - let mut msg: Vec<u8> = Vec::with_capacity(size + router::CAPACITY_MESSAGE_POSTFIX); - msg.resize(size, 0); + let mut msg: Vec<u8> = vec![0; size + router::CAPACITY_MESSAGE_POSTFIX]; // read a new IP packet let payload = match reader.read(&mut msg[..], router::SIZE_MESSAGE_PREFIX) { @@ -426,7 +424,7 @@ impl<T: tun::Tun, B: udp::UDP> Wireguard<T, B> { }; debug!("TUN worker, IP packet of {} bytes (MTU = {})", payload, mtu); - // TODO: start device down + // check if device is down if mtu == 0 { continue; } |