From 106c5e8b5c865c8396f824f4f5aa14d1bf0952b1 Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Sun, 16 Feb 2020 18:12:43 +0100 Subject: Work on router optimizations --- src/wireguard/router/peer.rs | 192 ++++++++++++++++++------------------------- 1 file changed, 79 insertions(+), 113 deletions(-) (limited to 'src/wireguard/router/peer.rs') diff --git a/src/wireguard/router/peer.rs b/src/wireguard/router/peer.rs index 7312bc7..710cf32 100644 --- a/src/wireguard/router/peer.rs +++ b/src/wireguard/router/peer.rs @@ -1,13 +1,3 @@ -use std::mem; -use std::net::{IpAddr, SocketAddr}; -use std::ops::Deref; -use std::sync::atomic::AtomicBool; -use std::sync::Arc; - -use arraydeque::{ArrayDeque, Wrapping}; -use log::debug; -use spin::Mutex; - use super::super::constants::*; use super::super::{tun, udp, Endpoint, KeyPair}; @@ -15,20 +5,25 @@ use super::anti_replay::AntiReplay; use super::device::DecryptionState; use super::device::Device; use super::device::EncryptionState; -use super::messages::TransportHeader; use super::constants::*; -use super::runq::ToKey; use super::types::{Callbacks, RouterError}; use super::SIZE_MESSAGE_PREFIX; -// worker pool related -use super::inbound::Inbound; -use super::outbound::Outbound; use super::queue::Queue; - -use super::send::SendJob; use super::receive::ReceiveJob; +use super::send::SendJob; +use super::worker::JobUnion; + +use std::mem; +use std::net::{IpAddr, SocketAddr}; +use std::ops::Deref; +use std::sync::atomic::AtomicBool; +use std::sync::Arc; + +use arraydeque::{ArrayDeque, Wrapping}; +use log::debug; +use spin::Mutex; pub struct KeyWheel { next: Option>, // next key state (unconfirmed) @@ -44,7 +39,7 @@ pub struct PeerInner>, pub staged_packets: Mutex; MAX_QUEUED_PACKETS], Wrapping>>, pub keys: Mutex, - pub ekey: Mutex>, + pub enc_key: Mutex>, pub endpoint: Mutex>, } @@ -69,13 +64,6 @@ impl> PartialEq for } } -impl> ToKey for Peer { - type Key = usize; - fn to_key(&self) -> usize { - Arc::downgrade(&self.inner).into_raw() as usize - } -} - impl> Eq for Peer {} /* A peer is transparently dereferenced to the inner type @@ -157,7 +145,7 @@ impl> Drop for Peer keys.current = None; keys.previous = None; - *peer.ekey.lock() = None; + *peer.enc_key.lock() = None; *peer.endpoint.lock() = None; debug!("peer dropped & removed from device"); @@ -175,9 +163,9 @@ pub fn new_peer>( inner: Arc::new(PeerInner { opaque, device, - inbound: InorderQueue::new(), - outbound: InorderQueue::new(), - ekey: spin::Mutex::new(None), + inbound: Queue::new(), + outbound: Queue::new(), + enc_key: spin::Mutex::new(None), endpoint: spin::Mutex::new(None), keys: spin::Mutex::new(KeyWheel { next: None, @@ -203,7 +191,7 @@ impl> PeerInner Result<(), RouterError> { + pub fn send_raw(&self, msg: &[u8]) -> Result<(), RouterError> { debug!("peer.send"); // send to endpoint (if known) @@ -226,6 +214,57 @@ impl> PeerInner> Peer { + /// Encrypt and send a message to the peer + /// + /// Arguments: + /// + /// - `msg` : A padded vector holding the message (allows in-place construction of the transport header) + /// - `stage`: Should the message be staged if no key is available + /// + pub(super) fn send(&self, msg: Vec, stage: bool) { + // check if key available + let (job, need_key) = { + let mut enc_key = self.enc_key.lock(); + match enc_key.as_mut() { + None => { + if stage { + self.staged_packets.lock().push_back(msg); + }; + (None, true) + } + Some(mut state) => { + // avoid integer overflow in nonce + if state.nonce >= REJECT_AFTER_MESSAGES - 1 { + *enc_key = None; + if stage { + self.staged_packets.lock().push_back(msg); + } + (None, true) + } else { + debug!("encryption state available, nonce = {}", state.nonce); + let job = + SendJob::new(msg, state.nonce, state.keypair.clone(), self.clone()); + if self.outbound.push(job.clone()) { + state.nonce += 1; + (Some(job), false) + } else { + (None, false) + } + } + } + } + }; + + if need_key { + debug_assert!(job.is_none()); + C::need_key(&self.opaque); + }; + + if let Some(job) = job { + self.device.work.send(JobUnion::Outbound(job)) + } + } + // Transmit all staged packets fn send_staged(&self) -> bool { debug!("peer.send_staged"); @@ -235,28 +274,14 @@ impl> Peer { sent = true; - self.send_raw(msg); + self.send(msg, false); } None => break sent, } } } - // Treat the msg as the payload of a transport message - // Unlike device.send, peer.send_raw does not buffer messages when a key is not available. - fn send_raw(&self, msg: Vec) -> bool { - log::debug!("peer.send_raw"); - match self.send_job(msg, false) { - Some(job) => { - self.device.queue_outbound.send(job); - debug!("send_raw: got obtained send_job"); - true - } - None => false, - } - } - - pub fn confirm_key(&self, keypair: &Arc) { + pub(super) fn confirm_key(&self, keypair: &Arc) { debug!("peer.confirm_key"); { // take lock and check keypair = keys.next @@ -284,68 +309,12 @@ impl> Peer, stage: bool) -> Option> { - debug!("peer.send_job"); - debug_assert!( - msg.len() >= mem::size_of::(), - "received TUN message with size: {:}", - msg.len() - ); - - // check if has key - let (keypair, counter) = { - let keypair = { - // TODO: consider using atomic ptr for ekey state - let mut ekey = self.ekey.lock(); - match ekey.as_mut() { - None => None, - Some(mut state) => { - // avoid integer overflow in nonce - if state.nonce >= REJECT_AFTER_MESSAGES - 1 { - *ekey = None; - None - } else { - debug!("encryption state available, nonce = {}", state.nonce); - let counter = state.nonce; - state.nonce += 1; - - SendJob::new( - msg, - state.nonce, - state.keypair.clone(), - self.clone() - ); - - Some((state.keypair.clone(), counter)) - } - } - } - }; - - // If not suitable key was found: - // 1. Stage packet for later transmission - // 2. Request new key - if keypair.is_none() && stage { - self.staged_packets.lock().push_back(msg); - C::need_key(&self.opaque); - return None; - }; - - keypair - }?; - - // add job to in-order queue and return sender to device for inclusion in worker pool - let job = Job::new(self.clone(), Outbound::new(msg, keypair, counter)); - self.outbound.send(job.clone()); - Some(job) - } } impl> PeerHandle { @@ -397,7 +366,7 @@ impl> PeerHandle> PeerHandle> PeerHandle> PeerHandle bool { + pub fn send_keepalive(&self) { debug!("peer.send_keepalive"); - self.peer.send_raw(vec![0u8; SIZE_MESSAGE_PREFIX]) + self.peer.send(vec![0u8; SIZE_MESSAGE_PREFIX], false) } /// Map a subnet to the peer -- cgit v1.2.3-59-g8ed1b