aboutsummaryrefslogtreecommitdiffstats
path: root/src/wireguard/router/receive.rs
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2020-02-16 18:12:43 +0100
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2020-02-16 18:12:43 +0100
commit106c5e8b5c865c8396f824f4f5aa14d1bf0952b1 (patch)
tree68101553c62d301921b84776a9e18fc627c7a731 /src/wireguard/router/receive.rs
parentWork on reducing context switches (diff)
downloadwireguard-rs-router.tar.xz
wireguard-rs-router.zip
Work on router optimizationsrouter
Diffstat (limited to 'src/wireguard/router/receive.rs')
-rw-r--r--src/wireguard/router/receive.rs184
1 files changed, 94 insertions, 90 deletions
diff --git a/src/wireguard/router/receive.rs b/src/wireguard/router/receive.rs
index 53890e3..c5fe3da 100644
--- a/src/wireguard/router/receive.rs
+++ b/src/wireguard/router/receive.rs
@@ -1,21 +1,18 @@
-use super::queue::{Job, Queue};
-use super::KeyPair;
+use super::device::DecryptionState;
+use super::messages::TransportHeader;
+use super::queue::{ParallelJob, Queue, SequentialJob};
use super::types::Callbacks;
-use super::peer::Peer;
use super::{REJECT_AFTER_MESSAGES, SIZE_TAG};
-use super::messages::{TransportHeader, TYPE_TRANSPORT};
-use super::device::DecryptionState;
use super::super::{tun, udp, Endpoint};
+use std::mem;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
-use std::mem;
use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305};
-use zerocopy::{AsBytes, LayoutVerified};
use spin::Mutex;
-
+use zerocopy::{AsBytes, LayoutVerified};
struct Inner<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> {
ready: AtomicBool,
@@ -23,49 +20,49 @@ struct Inner<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> {
state: Arc<DecryptionState<E, C, T, B>>, // decryption state (keys and replay protector)
}
-pub struct ReceiveJob<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> {
- inner: Arc<Inner<E, C, T, B>>,
+pub struct ReceiveJob<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>(
+ Arc<Inner<E, C, T, B>>,
+);
+
+impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Clone
+ for ReceiveJob<E, C, T, B>
+{
+ fn clone(&self) -> ReceiveJob<E, C, T, B> {
+ ReceiveJob(self.0.clone())
+ }
}
-impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> ReceiveJob<E, C, T, B> {
- fn new(buffer: Vec<u8>, state: Arc<DecryptionState<E, C, T, B>>, endpoint: E) -> Option<ReceiveJob<E, C, T, B>> {
- // create job
- let inner = Arc::new(Inner{
+impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> ReceiveJob<E, C, T, B> {
+ pub fn new(
+ buffer: Vec<u8>,
+ state: Arc<DecryptionState<E, C, T, B>>,
+ endpoint: E,
+ ) -> ReceiveJob<E, C, T, B> {
+ ReceiveJob(Arc::new(Inner {
ready: AtomicBool::new(false),
buffer: Mutex::new((Some(endpoint), buffer)),
- state
- });
-
- // attempt to add to queue
- if state.peer.inbound.push(ReceiveJob{ inner: inner.clone()}) {
- Some(ReceiveJob{inner})
- } else {
- None
- }
-
+ state,
+ }))
}
}
-impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Job for ReceiveJob<E, C, T, B> {
+impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> ParallelJob
+ for ReceiveJob<E, C, T, B>
+{
fn queue(&self) -> &Queue<Self> {
- &self.inner.state.peer.inbound
- }
-
- fn is_ready(&self) -> bool {
- self.inner.ready.load(Ordering::Acquire)
+ &self.0.state.peer.inbound
}
fn parallel_work(&self) {
// TODO: refactor
// decrypt
{
- let job = &self.inner;
+ let job = &self.0;
let peer = &job.state.peer;
let mut msg = job.buffer.lock();
-
- let failed = || {
- // cast to header followed by payload
- let (header, packet): (LayoutVerified<&mut [u8], TransportHeader>, &mut [u8]) =
+
+ // cast to header followed by payload
+ let (header, packet): (LayoutVerified<&mut [u8], TransportHeader>, &mut [u8]) =
match LayoutVerified::new_from_prefix(&mut msg.1[..]) {
Some(v) => v,
None => {
@@ -74,73 +71,81 @@ impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Job for Rece
}
};
- // authenticate and decrypt payload
- {
- // create nonce object
- let mut nonce = [0u8; 12];
- debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len());
- nonce[4..].copy_from_slice(header.f_counter.as_bytes());
- let nonce = Nonce::assume_unique_for_key(nonce);
-
- // do the weird ring AEAD dance
- let key = LessSafeKey::new(
- UnboundKey::new(&CHACHA20_POLY1305, &job.state.keypair.recv.key[..]).unwrap(),
- );
-
- // attempt to open (and authenticate) the body
- match key.open_in_place(nonce, Aad::empty(), packet) {
- Ok(_) => (),
- Err(_) => {
- // fault and return early
- log::trace!("inbound worker: authentication failure");
- msg.1.truncate(0);
- return;
- }
+ // authenticate and decrypt payload
+ {
+ // create nonce object
+ let mut nonce = [0u8; 12];
+ debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len());
+ nonce[4..].copy_from_slice(header.f_counter.as_bytes());
+ let nonce = Nonce::assume_unique_for_key(nonce);
+
+ // do the weird ring AEAD dance
+ let key = LessSafeKey::new(
+ UnboundKey::new(&CHACHA20_POLY1305, &job.state.keypair.recv.key[..]).unwrap(),
+ );
+
+ // attempt to open (and authenticate) the body
+ match key.open_in_place(nonce, Aad::empty(), packet) {
+ Ok(_) => (),
+ Err(_) => {
+ // fault and return early
+ log::trace!("inbound worker: authentication failure");
+ msg.1.truncate(0);
+ return;
}
}
+ }
- // check that counter not after reject
- if header.f_counter.get() >= REJECT_AFTER_MESSAGES {
- msg.1.truncate(0);
- return;
- }
-
- // cryptokey route and strip padding
- let inner_len = {
- let length = packet.len() - SIZE_TAG;
- if length > 0 {
- peer.device.table.check_route(&peer, &packet[..length])
- } else {
- Some(0)
- }
- };
+ // check that counter not after reject
+ if header.f_counter.get() >= REJECT_AFTER_MESSAGES {
+ msg.1.truncate(0);
+ return;
+ }
- // truncate to remove tag
- match inner_len {
- None => {
- log::trace!("inbound worker: cryptokey routing failed");
- msg.1.truncate(0);
- }
- Some(len) => {
- log::trace!(
- "inbound worker: good route, length = {} {}",
- len,
- if len == 0 { "(keepalive)" } else { "" }
- );
- msg.1.truncate(mem::size_of::<TransportHeader>() + len);
- }
+ // cryptokey route and strip padding
+ let inner_len = {
+ let length = packet.len() - SIZE_TAG;
+ if length > 0 {
+ peer.device.table.check_route(&peer, &packet[..length])
+ } else {
+ Some(0)
}
};
+
+ // truncate to remove tag
+ match inner_len {
+ None => {
+ log::trace!("inbound worker: cryptokey routing failed");
+ msg.1.truncate(0);
+ }
+ Some(len) => {
+ log::trace!(
+ "inbound worker: good route, length = {} {}",
+ len,
+ if len == 0 { "(keepalive)" } else { "" }
+ );
+ msg.1.truncate(mem::size_of::<TransportHeader>() + len);
+ }
+ }
}
// mark ready
- self.inner.ready.store(true, Ordering::Release);
+ self.0.ready.store(true, Ordering::Release);
+ }
+}
+
+impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> SequentialJob
+ for ReceiveJob<E, C, T, B>
+{
+ fn is_ready(&self) -> bool {
+ self.0.ready.load(Ordering::Acquire)
}
fn sequential_work(self) {
- let job = &self.inner;
+ let job = &self.0;
let peer = &job.state.peer;
let mut msg = job.buffer.lock();
+ let endpoint = msg.0.take();
// cast transport header
let (header, packet): (LayoutVerified<&[u8], TransportHeader>, &[u8]) =
@@ -165,7 +170,7 @@ impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Job for Rece
}
// update endpoint
- *peer.endpoint.lock() = msg.0.take();
+ *peer.endpoint.lock() = endpoint;
// check if should be written to TUN
let mut sent = false;
@@ -184,5 +189,4 @@ impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Job for Rece
// trigger callback
C::recv(&peer.opaque, msg.1.len(), sent, &job.state.keypair);
}
-
}