From b62eeb89ab8a90f005dd48776e38dd33f0f3fb9e Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Sun, 16 Feb 2020 15:50:32 +0100 Subject: Work on reducing context switches --- src/wireguard/router/mod.rs | 5 ++ src/wireguard/router/peer.rs | 32 +++---- src/wireguard/router/queue.rs | 76 ++++++++++++++++ src/wireguard/router/receive.rs | 188 ++++++++++++++++++++++++++++++++++++++++ src/wireguard/router/send.rs | 140 ++++++++++++++++++++++++++++++ src/wireguard/router/worker.rs | 13 +++ src/wireguard/router/workers.rs | 3 +- 7 files changed, 439 insertions(+), 18 deletions(-) create mode 100644 src/wireguard/router/queue.rs create mode 100644 src/wireguard/router/receive.rs create mode 100644 src/wireguard/router/send.rs create mode 100644 src/wireguard/router/worker.rs diff --git a/src/wireguard/router/mod.rs b/src/wireguard/router/mod.rs index 8238d32..ec5cc63 100644 --- a/src/wireguard/router/mod.rs +++ b/src/wireguard/router/mod.rs @@ -11,6 +11,11 @@ mod route; mod runq; mod types; +mod queue; +mod receive; +mod send; +mod worker; + #[cfg(test)] mod tests; diff --git a/src/wireguard/router/peer.rs b/src/wireguard/router/peer.rs index 23a3e62..7312bc7 100644 --- a/src/wireguard/router/peer.rs +++ b/src/wireguard/router/peer.rs @@ -25,7 +25,10 @@ use super::SIZE_MESSAGE_PREFIX; // worker pool related use super::inbound::Inbound; use super::outbound::Outbound; -use super::pool::{InorderQueue, Job}; +use super::queue::Queue; + +use super::send::SendJob; +use super::receive::ReceiveJob; pub struct KeyWheel { next: Option>, // next key state (unconfirmed) @@ -37,8 +40,8 @@ pub struct KeyWheel { pub struct PeerInner> { pub device: Device, pub opaque: C::Opaque, - pub outbound: InorderQueue, Outbound>, - pub inbound: InorderQueue, Inbound>, + pub outbound: Queue>, + pub inbound: Queue>, pub staged_packets: Mutex; MAX_QUEUED_PACKETS], Wrapping>>, pub keys: Mutex, pub ekey: Mutex>, @@ -288,22 +291,11 @@ impl> Peer>, - msg: Vec, - ) -> Option>> { - let job = Job::new(self.clone(), Inbound::new(msg, dec, src)); - self.inbound.send(job.clone()); - Some(job) - } - - pub fn send_job(&self, msg: Vec, stage: bool) -> Option> { + pub fn send_job(&self, msg: Vec, stage: bool) -> Option> { debug!("peer.send_job"); debug_assert!( msg.len() >= mem::size_of::(), - "received message with size: {:}", + "received TUN message with size: {:}", msg.len() ); @@ -323,6 +315,14 @@ impl> Peer &Queue; + + fn is_ready(&self) -> bool; + + fn parallel_work(&self); + + fn sequential_work(self); +} + + +pub struct Queue { + contenders: AtomicUsize, + queue: Mutex>, +} + +impl Queue { + pub fn new() -> Queue { + Queue { + contenders: AtomicUsize::new(0), + queue: Mutex::new(ArrayDeque::new()), + } + } + + pub fn push(&self, job: J) -> bool { + self.queue.lock().push_back(job).is_ok() + } + + pub fn consume(&self) { + // check if we are the first contender + let pos = self.contenders.fetch_add(1, Ordering::Acquire); + if pos > 0 { + assert!(pos < usize::max_value(), "contenders overflow"); + } + + // enter the critical section + let mut contenders = 1; // myself + while contenders > 0 { + // handle every ready element + loop { + let mut queue = self.queue.lock(); + + // check if front job is ready + match queue.front() { + None => break, + Some(job) => { + if job.is_ready() { + () + } else { + break; + } + } + }; + + // take the job out of the queue + let job = queue.pop_front().unwrap(); + debug_assert!(job.is_ready()); + mem::drop(queue); + + // process element + job.sequential_work(); + } + + // decrease contenders + contenders = self.contenders.fetch_sub(contenders, Ordering::Acquire) - contenders; + } + } +} diff --git a/src/wireguard/router/receive.rs b/src/wireguard/router/receive.rs new file mode 100644 index 0000000..53890e3 --- /dev/null +++ b/src/wireguard/router/receive.rs @@ -0,0 +1,188 @@ +use super::queue::{Job, Queue}; +use super::KeyPair; +use super::types::Callbacks; +use super::peer::Peer; +use super::{REJECT_AFTER_MESSAGES, SIZE_TAG}; +use super::messages::{TransportHeader, TYPE_TRANSPORT}; +use super::device::DecryptionState; + +use super::super::{tun, udp, Endpoint}; + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::mem; + +use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305}; +use zerocopy::{AsBytes, LayoutVerified}; +use spin::Mutex; + + +struct Inner> { + ready: AtomicBool, + buffer: Mutex<(Option, Vec)>, // endpoint & ciphertext buffer + state: Arc>, // decryption state (keys and replay protector) +} + +pub struct ReceiveJob> { + inner: Arc>, +} + +impl > ReceiveJob { + fn new(buffer: Vec, state: Arc>, endpoint: E) -> Option> { + // create job + let inner = Arc::new(Inner{ + ready: AtomicBool::new(false), + buffer: Mutex::new((Some(endpoint), buffer)), + state + }); + + // attempt to add to queue + if state.peer.inbound.push(ReceiveJob{ inner: inner.clone()}) { + Some(ReceiveJob{inner}) + } else { + None + } + + } +} + +impl > Job for ReceiveJob { + fn queue(&self) -> &Queue { + &self.inner.state.peer.inbound + } + + fn is_ready(&self) -> bool { + self.inner.ready.load(Ordering::Acquire) + } + + fn parallel_work(&self) { + // TODO: refactor + // decrypt + { + let job = &self.inner; + let peer = &job.state.peer; + let mut msg = job.buffer.lock(); + + let failed = || { + // cast to header followed by payload + let (header, packet): (LayoutVerified<&mut [u8], TransportHeader>, &mut [u8]) = + match LayoutVerified::new_from_prefix(&mut msg.1[..]) { + Some(v) => v, + None => { + log::debug!("inbound worker: failed to parse message"); + return; + } + }; + + // authenticate and decrypt payload + { + // create nonce object + let mut nonce = [0u8; 12]; + debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len()); + nonce[4..].copy_from_slice(header.f_counter.as_bytes()); + let nonce = Nonce::assume_unique_for_key(nonce); + + // do the weird ring AEAD dance + let key = LessSafeKey::new( + UnboundKey::new(&CHACHA20_POLY1305, &job.state.keypair.recv.key[..]).unwrap(), + ); + + // attempt to open (and authenticate) the body + match key.open_in_place(nonce, Aad::empty(), packet) { + Ok(_) => (), + Err(_) => { + // fault and return early + log::trace!("inbound worker: authentication failure"); + msg.1.truncate(0); + return; + } + } + } + + // check that counter not after reject + if header.f_counter.get() >= REJECT_AFTER_MESSAGES { + msg.1.truncate(0); + return; + } + + // cryptokey route and strip padding + let inner_len = { + let length = packet.len() - SIZE_TAG; + if length > 0 { + peer.device.table.check_route(&peer, &packet[..length]) + } else { + Some(0) + } + }; + + // truncate to remove tag + match inner_len { + None => { + log::trace!("inbound worker: cryptokey routing failed"); + msg.1.truncate(0); + } + Some(len) => { + log::trace!( + "inbound worker: good route, length = {} {}", + len, + if len == 0 { "(keepalive)" } else { "" } + ); + msg.1.truncate(mem::size_of::() + len); + } + } + }; + } + + // mark ready + self.inner.ready.store(true, Ordering::Release); + } + + fn sequential_work(self) { + let job = &self.inner; + let peer = &job.state.peer; + let mut msg = job.buffer.lock(); + + // cast transport header + let (header, packet): (LayoutVerified<&[u8], TransportHeader>, &[u8]) = + match LayoutVerified::new_from_prefix(&msg.1[..]) { + Some(v) => v, + None => { + // also covers authentication failure + return; + } + }; + + // check for replay + if !job.state.protector.lock().update(header.f_counter.get()) { + log::debug!("inbound worker: replay detected"); + return; + } + + // check for confirms key + if !job.state.confirmed.swap(true, Ordering::SeqCst) { + log::debug!("inbound worker: message confirms key"); + peer.confirm_key(&job.state.keypair); + } + + // update endpoint + *peer.endpoint.lock() = msg.0.take(); + + // check if should be written to TUN + let mut sent = false; + if packet.len() > 0 { + sent = match peer.device.inbound.write(&packet[..]) { + Err(e) => { + log::debug!("failed to write inbound packet to TUN: {:?}", e); + false + } + Ok(_) => true, + } + } else { + log::debug!("inbound worker: received keepalive") + } + + // trigger callback + C::recv(&peer.opaque, msg.1.len(), sent, &job.state.keypair); + } + +} diff --git a/src/wireguard/router/send.rs b/src/wireguard/router/send.rs new file mode 100644 index 0000000..2bd4abd --- /dev/null +++ b/src/wireguard/router/send.rs @@ -0,0 +1,140 @@ +use super::queue::{Job, Queue}; +use super::KeyPair; +use super::types::Callbacks; +use super::peer::Peer; +use super::{REJECT_AFTER_MESSAGES, SIZE_TAG}; +use super::messages::{TransportHeader, TYPE_TRANSPORT}; + +use super::super::{tun, udp, Endpoint}; + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305}; +use zerocopy::{AsBytes, LayoutVerified}; +use spin::Mutex; + +struct Inner> { + ready: AtomicBool, + buffer: Mutex>, + counter: u64, + keypair: Arc, + peer: Peer, +} + +pub struct SendJob> { + inner: Arc>, +} + +impl > SendJob { + pub fn new( + buffer: Vec, + counter: u64, + keypair: Arc, + peer: Peer + ) -> Option> { + // create job + let inner = Arc::new(Inner{ + buffer: Mutex::new(buffer), + counter, + keypair, + peer, + ready: AtomicBool::new(false) + }); + + // attempt to add to queue + if peer.outbound.push(SendJob{ inner: inner.clone()}) { + Some(SendJob{inner}) + } else { + None + } + } +} + +impl > Job for SendJob { + fn queue(&self) -> &Queue { + &self.inner.peer.outbound + } + + fn is_ready(&self) -> bool { + self.inner.ready.load(Ordering::Acquire) + } + + fn parallel_work(&self) { + debug_assert_eq!( + self.is_ready(), + false, + "doing parallel work on completed job" + ); + log::trace!("processing parallel send job"); + + // encrypt body + { + // make space for the tag + let job = &*self.inner; + let mut msg = job.buffer.lock(); + msg.extend([0u8; SIZE_TAG].iter()); + + // cast to header (should never fail) + let (mut header, packet): (LayoutVerified<&mut [u8], TransportHeader>, &mut [u8]) = + LayoutVerified::new_from_prefix(&mut msg[..]) + .expect("earlier code should ensure that there is ample space"); + + // set header fields + debug_assert!( + job.counter < REJECT_AFTER_MESSAGES, + "should be checked when assigning counters" + ); + header.f_type.set(TYPE_TRANSPORT); + header.f_receiver.set(job.keypair.send.id); + header.f_counter.set(job.counter); + + // create a nonce object + let mut nonce = [0u8; 12]; + debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len()); + nonce[4..].copy_from_slice(header.f_counter.as_bytes()); + let nonce = Nonce::assume_unique_for_key(nonce); + + // do the weird ring AEAD dance + let key = LessSafeKey::new( + UnboundKey::new(&CHACHA20_POLY1305, &job.keypair.send.key[..]).unwrap(), + ); + + // encrypt contents of transport message in-place + let end = packet.len() - SIZE_TAG; + let tag = key + .seal_in_place_separate_tag(nonce, Aad::empty(), &mut packet[..end]) + .unwrap(); + + // append tag + packet[end..].copy_from_slice(tag.as_ref()); + } + + // mark ready + self.inner.ready.store(true, Ordering::Release); + } + + fn sequential_work(self) { + debug_assert_eq!( + self.is_ready(), + true, + "doing sequential work + on an incomplete job" + ); + log::trace!("processing sequential send job"); + + // send to peer + let job = &self.inner; + let msg = job.buffer.lock(); + let xmit = job.peer.send(&msg[..]).is_ok(); + + // trigger callback (for timers) + C::send( + &job.peer.opaque, + msg.len(), + xmit, + &job.keypair, + job.counter, + ); + } +} diff --git a/src/wireguard/router/worker.rs b/src/wireguard/router/worker.rs new file mode 100644 index 0000000..d95050e --- /dev/null +++ b/src/wireguard/router/worker.rs @@ -0,0 +1,13 @@ +use super::Device; + +use super::super::{tun, udp, Endpoint}; +use super::types::Callbacks; + +use super::receive::ReceieveJob; +use super::send::SendJob; + +fn worker>( + device: Device, +) { + // fetch job +} diff --git a/src/wireguard/router/workers.rs b/src/wireguard/router/workers.rs index 43464a0..8ddc136 100644 --- a/src/wireguard/router/workers.rs +++ b/src/wireguard/router/workers.rs @@ -132,8 +132,7 @@ pub fn worker_inbound>( peer: Arc>, receiver: Receiver, -- cgit v1.2.3-59-g8ed1b