diff options
author | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2020-02-16 18:12:43 +0100 |
---|---|---|
committer | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2020-02-16 18:12:43 +0100 |
commit | 106c5e8b5c865c8396f824f4f5aa14d1bf0952b1 (patch) | |
tree | 68101553c62d301921b84776a9e18fc627c7a731 /src/wireguard/router/receive.rs | |
parent | Work on reducing context switches (diff) | |
download | wireguard-rs-router.tar.xz wireguard-rs-router.zip |
Work on router optimizationsrouter
Diffstat (limited to 'src/wireguard/router/receive.rs')
-rw-r--r-- | src/wireguard/router/receive.rs | 184 |
1 files changed, 94 insertions, 90 deletions
diff --git a/src/wireguard/router/receive.rs b/src/wireguard/router/receive.rs index 53890e3..c5fe3da 100644 --- a/src/wireguard/router/receive.rs +++ b/src/wireguard/router/receive.rs @@ -1,21 +1,18 @@ -use super::queue::{Job, Queue}; -use super::KeyPair; +use super::device::DecryptionState; +use super::messages::TransportHeader; +use super::queue::{ParallelJob, Queue, SequentialJob}; use super::types::Callbacks; -use super::peer::Peer; use super::{REJECT_AFTER_MESSAGES, SIZE_TAG}; -use super::messages::{TransportHeader, TYPE_TRANSPORT}; -use super::device::DecryptionState; use super::super::{tun, udp, Endpoint}; +use std::mem; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use std::mem; use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305}; -use zerocopy::{AsBytes, LayoutVerified}; use spin::Mutex; - +use zerocopy::{AsBytes, LayoutVerified}; struct Inner<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> { ready: AtomicBool, @@ -23,49 +20,49 @@ struct Inner<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> { state: Arc<DecryptionState<E, C, T, B>>, // decryption state (keys and replay protector) } -pub struct ReceiveJob<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> { - inner: Arc<Inner<E, C, T, B>>, +pub struct ReceiveJob<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>( + Arc<Inner<E, C, T, B>>, +); + +impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Clone + for ReceiveJob<E, C, T, B> +{ + fn clone(&self) -> ReceiveJob<E, C, T, B> { + ReceiveJob(self.0.clone()) + } } -impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> ReceiveJob<E, C, T, B> { - fn new(buffer: Vec<u8>, state: Arc<DecryptionState<E, C, T, B>>, endpoint: E) -> Option<ReceiveJob<E, C, T, B>> { - // create job - let inner = Arc::new(Inner{ +impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> ReceiveJob<E, C, T, B> { + pub fn new( + buffer: Vec<u8>, + state: Arc<DecryptionState<E, C, T, B>>, + endpoint: E, + ) -> ReceiveJob<E, C, T, B> { + ReceiveJob(Arc::new(Inner { ready: AtomicBool::new(false), buffer: Mutex::new((Some(endpoint), buffer)), - state - }); - - // attempt to add to queue - if state.peer.inbound.push(ReceiveJob{ inner: inner.clone()}) { - Some(ReceiveJob{inner}) - } else { - None - } - + state, + })) } } -impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Job for ReceiveJob<E, C, T, B> { +impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> ParallelJob + for ReceiveJob<E, C, T, B> +{ fn queue(&self) -> &Queue<Self> { - &self.inner.state.peer.inbound - } - - fn is_ready(&self) -> bool { - self.inner.ready.load(Ordering::Acquire) + &self.0.state.peer.inbound } fn parallel_work(&self) { // TODO: refactor // decrypt { - let job = &self.inner; + let job = &self.0; let peer = &job.state.peer; let mut msg = job.buffer.lock(); - - let failed = || { - // cast to header followed by payload - let (header, packet): (LayoutVerified<&mut [u8], TransportHeader>, &mut [u8]) = + + // cast to header followed by payload + let (header, packet): (LayoutVerified<&mut [u8], TransportHeader>, &mut [u8]) = match LayoutVerified::new_from_prefix(&mut msg.1[..]) { Some(v) => v, None => { @@ -74,73 +71,81 @@ impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Job for Rece } }; - // authenticate and decrypt payload - { - // create nonce object - let mut nonce = [0u8; 12]; - debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len()); - nonce[4..].copy_from_slice(header.f_counter.as_bytes()); - let nonce = Nonce::assume_unique_for_key(nonce); - - // do the weird ring AEAD dance - let key = LessSafeKey::new( - UnboundKey::new(&CHACHA20_POLY1305, &job.state.keypair.recv.key[..]).unwrap(), - ); - - // attempt to open (and authenticate) the body - match key.open_in_place(nonce, Aad::empty(), packet) { - Ok(_) => (), - Err(_) => { - // fault and return early - log::trace!("inbound worker: authentication failure"); - msg.1.truncate(0); - return; - } + // authenticate and decrypt payload + { + // create nonce object + let mut nonce = [0u8; 12]; + debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len()); + nonce[4..].copy_from_slice(header.f_counter.as_bytes()); + let nonce = Nonce::assume_unique_for_key(nonce); + + // do the weird ring AEAD dance + let key = LessSafeKey::new( + UnboundKey::new(&CHACHA20_POLY1305, &job.state.keypair.recv.key[..]).unwrap(), + ); + + // attempt to open (and authenticate) the body + match key.open_in_place(nonce, Aad::empty(), packet) { + Ok(_) => (), + Err(_) => { + // fault and return early + log::trace!("inbound worker: authentication failure"); + msg.1.truncate(0); + return; } } + } - // check that counter not after reject - if header.f_counter.get() >= REJECT_AFTER_MESSAGES { - msg.1.truncate(0); - return; - } - - // cryptokey route and strip padding - let inner_len = { - let length = packet.len() - SIZE_TAG; - if length > 0 { - peer.device.table.check_route(&peer, &packet[..length]) - } else { - Some(0) - } - }; + // check that counter not after reject + if header.f_counter.get() >= REJECT_AFTER_MESSAGES { + msg.1.truncate(0); + return; + } - // truncate to remove tag - match inner_len { - None => { - log::trace!("inbound worker: cryptokey routing failed"); - msg.1.truncate(0); - } - Some(len) => { - log::trace!( - "inbound worker: good route, length = {} {}", - len, - if len == 0 { "(keepalive)" } else { "" } - ); - msg.1.truncate(mem::size_of::<TransportHeader>() + len); - } + // cryptokey route and strip padding + let inner_len = { + let length = packet.len() - SIZE_TAG; + if length > 0 { + peer.device.table.check_route(&peer, &packet[..length]) + } else { + Some(0) } }; + + // truncate to remove tag + match inner_len { + None => { + log::trace!("inbound worker: cryptokey routing failed"); + msg.1.truncate(0); + } + Some(len) => { + log::trace!( + "inbound worker: good route, length = {} {}", + len, + if len == 0 { "(keepalive)" } else { "" } + ); + msg.1.truncate(mem::size_of::<TransportHeader>() + len); + } + } } // mark ready - self.inner.ready.store(true, Ordering::Release); + self.0.ready.store(true, Ordering::Release); + } +} + +impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> SequentialJob + for ReceiveJob<E, C, T, B> +{ + fn is_ready(&self) -> bool { + self.0.ready.load(Ordering::Acquire) } fn sequential_work(self) { - let job = &self.inner; + let job = &self.0; let peer = &job.state.peer; let mut msg = job.buffer.lock(); + let endpoint = msg.0.take(); // cast transport header let (header, packet): (LayoutVerified<&[u8], TransportHeader>, &[u8]) = @@ -165,7 +170,7 @@ impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Job for Rece } // update endpoint - *peer.endpoint.lock() = msg.0.take(); + *peer.endpoint.lock() = endpoint; // check if should be written to TUN let mut sent = false; @@ -184,5 +189,4 @@ impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Job for Rece // trigger callback C::recv(&peer.opaque, msg.1.len(), sent, &job.state.keypair); } - } |