summaryrefslogtreecommitdiffstats
path: root/src/wireguard/router
diff options
context:
space:
mode:
Diffstat (limited to 'src/wireguard/router')
-rw-r--r--src/wireguard/router/mod.rs5
-rw-r--r--src/wireguard/router/peer.rs32
-rw-r--r--src/wireguard/router/queue.rs76
-rw-r--r--src/wireguard/router/receive.rs188
-rw-r--r--src/wireguard/router/send.rs140
-rw-r--r--src/wireguard/router/worker.rs13
-rw-r--r--src/wireguard/router/workers.rs3
7 files changed, 439 insertions, 18 deletions
diff --git a/src/wireguard/router/mod.rs b/src/wireguard/router/mod.rs
index 8238d32..ec5cc63 100644
--- a/src/wireguard/router/mod.rs
+++ b/src/wireguard/router/mod.rs
@@ -11,6 +11,11 @@ mod route;
mod runq;
mod types;
+mod queue;
+mod receive;
+mod send;
+mod worker;
+
#[cfg(test)]
mod tests;
diff --git a/src/wireguard/router/peer.rs b/src/wireguard/router/peer.rs
index 23a3e62..7312bc7 100644
--- a/src/wireguard/router/peer.rs
+++ b/src/wireguard/router/peer.rs
@@ -25,7 +25,10 @@ use super::SIZE_MESSAGE_PREFIX;
// worker pool related
use super::inbound::Inbound;
use super::outbound::Outbound;
-use super::pool::{InorderQueue, Job};
+use super::queue::Queue;
+
+use super::send::SendJob;
+use super::receive::ReceiveJob;
pub struct KeyWheel {
next: Option<Arc<KeyPair>>, // next key state (unconfirmed)
@@ -37,8 +40,8 @@ pub struct KeyWheel {
pub struct PeerInner<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> {
pub device: Device<E, C, T, B>,
pub opaque: C::Opaque,
- pub outbound: InorderQueue<Peer<E, C, T, B>, Outbound>,
- pub inbound: InorderQueue<Peer<E, C, T, B>, Inbound<E, C, T, B>>,
+ pub outbound: Queue<SendJob<E, C, T, B>>,
+ pub inbound: Queue<ReceiveJob<E, C, T, B>>,
pub staged_packets: Mutex<ArrayDeque<[Vec<u8>; MAX_QUEUED_PACKETS], Wrapping>>,
pub keys: Mutex<KeyWheel>,
pub ekey: Mutex<Option<EncryptionState>>,
@@ -288,22 +291,11 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Peer<E, C, T,
self.send_staged();
}
- pub fn recv_job(
- &self,
- src: E,
- dec: Arc<DecryptionState<E, C, T, B>>,
- msg: Vec<u8>,
- ) -> Option<Job<Self, Inbound<E, C, T, B>>> {
- let job = Job::new(self.clone(), Inbound::new(msg, dec, src));
- self.inbound.send(job.clone());
- Some(job)
- }
-
- pub fn send_job(&self, msg: Vec<u8>, stage: bool) -> Option<Job<Self, Outbound>> {
+ pub fn send_job(&self, msg: Vec<u8>, stage: bool) -> Option<SendJob<E, C, T, B>> {
debug!("peer.send_job");
debug_assert!(
msg.len() >= mem::size_of::<TransportHeader>(),
- "received message with size: {:}",
+ "received TUN message with size: {:}",
msg.len()
);
@@ -323,6 +315,14 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Peer<E, C, T,
debug!("encryption state available, nonce = {}", state.nonce);
let counter = state.nonce;
state.nonce += 1;
+
+ SendJob::new(
+ msg,
+ state.nonce,
+ state.keypair.clone(),
+ self.clone()
+ );
+
Some((state.keypair.clone(), counter))
}
}
diff --git a/src/wireguard/router/queue.rs b/src/wireguard/router/queue.rs
new file mode 100644
index 0000000..045fd51
--- /dev/null
+++ b/src/wireguard/router/queue.rs
@@ -0,0 +1,76 @@
+use arraydeque::ArrayDeque;
+use spin::Mutex;
+
+use std::mem;
+use std::sync::atomic::{AtomicUsize, Ordering};
+
+const QUEUE_SIZE: usize = 1024;
+
+pub trait Job: Sized {
+ fn queue(&self) -> &Queue<Self>;
+
+ fn is_ready(&self) -> bool;
+
+ fn parallel_work(&self);
+
+ fn sequential_work(self);
+}
+
+
+pub struct Queue<J: Job> {
+ contenders: AtomicUsize,
+ queue: Mutex<ArrayDeque<[J; QUEUE_SIZE]>>,
+}
+
+impl<J: Job> Queue<J> {
+ pub fn new() -> Queue<J> {
+ Queue {
+ contenders: AtomicUsize::new(0),
+ queue: Mutex::new(ArrayDeque::new()),
+ }
+ }
+
+ pub fn push(&self, job: J) -> bool {
+ self.queue.lock().push_back(job).is_ok()
+ }
+
+ pub fn consume(&self) {
+ // check if we are the first contender
+ let pos = self.contenders.fetch_add(1, Ordering::Acquire);
+ if pos > 0 {
+ assert!(pos < usize::max_value(), "contenders overflow");
+ }
+
+ // enter the critical section
+ let mut contenders = 1; // myself
+ while contenders > 0 {
+ // handle every ready element
+ loop {
+ let mut queue = self.queue.lock();
+
+ // check if front job is ready
+ match queue.front() {
+ None => break,
+ Some(job) => {
+ if job.is_ready() {
+ ()
+ } else {
+ break;
+ }
+ }
+ };
+
+ // take the job out of the queue
+ let job = queue.pop_front().unwrap();
+ debug_assert!(job.is_ready());
+ mem::drop(queue);
+
+ // process element
+ job.sequential_work();
+ }
+
+ // decrease contenders
+ contenders = self.contenders.fetch_sub(contenders, Ordering::Acquire) - contenders;
+ }
+ }
+}
diff --git a/src/wireguard/router/receive.rs b/src/wireguard/router/receive.rs
new file mode 100644
index 0000000..53890e3
--- /dev/null
+++ b/src/wireguard/router/receive.rs
@@ -0,0 +1,188 @@
+use super::queue::{Job, Queue};
+use super::KeyPair;
+use super::types::Callbacks;
+use super::peer::Peer;
+use super::{REJECT_AFTER_MESSAGES, SIZE_TAG};
+use super::messages::{TransportHeader, TYPE_TRANSPORT};
+use super::device::DecryptionState;
+
+use super::super::{tun, udp, Endpoint};
+
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Arc;
+use std::mem;
+
+use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305};
+use zerocopy::{AsBytes, LayoutVerified};
+use spin::Mutex;
+
+
+struct Inner<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> {
+ ready: AtomicBool,
+ buffer: Mutex<(Option<E>, Vec<u8>)>, // endpoint & ciphertext buffer
+ state: Arc<DecryptionState<E, C, T, B>>, // decryption state (keys and replay protector)
+}
+
+pub struct ReceiveJob<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> {
+ inner: Arc<Inner<E, C, T, B>>,
+}
+
+impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> ReceiveJob<E, C, T, B> {
+ fn new(buffer: Vec<u8>, state: Arc<DecryptionState<E, C, T, B>>, endpoint: E) -> Option<ReceiveJob<E, C, T, B>> {
+ // create job
+ let inner = Arc::new(Inner{
+ ready: AtomicBool::new(false),
+ buffer: Mutex::new((Some(endpoint), buffer)),
+ state
+ });
+
+ // attempt to add to queue
+ if state.peer.inbound.push(ReceiveJob{ inner: inner.clone()}) {
+ Some(ReceiveJob{inner})
+ } else {
+ None
+ }
+
+ }
+}
+
+impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Job for ReceiveJob<E, C, T, B> {
+ fn queue(&self) -> &Queue<Self> {
+ &self.inner.state.peer.inbound
+ }
+
+ fn is_ready(&self) -> bool {
+ self.inner.ready.load(Ordering::Acquire)
+ }
+
+ fn parallel_work(&self) {
+ // TODO: refactor
+ // decrypt
+ {
+ let job = &self.inner;
+ let peer = &job.state.peer;
+ let mut msg = job.buffer.lock();
+
+ let failed = || {
+ // cast to header followed by payload
+ let (header, packet): (LayoutVerified<&mut [u8], TransportHeader>, &mut [u8]) =
+ match LayoutVerified::new_from_prefix(&mut msg.1[..]) {
+ Some(v) => v,
+ None => {
+ log::debug!("inbound worker: failed to parse message");
+ return;
+ }
+ };
+
+ // authenticate and decrypt payload
+ {
+ // create nonce object
+ let mut nonce = [0u8; 12];
+ debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len());
+ nonce[4..].copy_from_slice(header.f_counter.as_bytes());
+ let nonce = Nonce::assume_unique_for_key(nonce);
+
+ // do the weird ring AEAD dance
+ let key = LessSafeKey::new(
+ UnboundKey::new(&CHACHA20_POLY1305, &job.state.keypair.recv.key[..]).unwrap(),
+ );
+
+ // attempt to open (and authenticate) the body
+ match key.open_in_place(nonce, Aad::empty(), packet) {
+ Ok(_) => (),
+ Err(_) => {
+ // fault and return early
+ log::trace!("inbound worker: authentication failure");
+ msg.1.truncate(0);
+ return;
+ }
+ }
+ }
+
+ // check that counter not after reject
+ if header.f_counter.get() >= REJECT_AFTER_MESSAGES {
+ msg.1.truncate(0);
+ return;
+ }
+
+ // cryptokey route and strip padding
+ let inner_len = {
+ let length = packet.len() - SIZE_TAG;
+ if length > 0 {
+ peer.device.table.check_route(&peer, &packet[..length])
+ } else {
+ Some(0)
+ }
+ };
+
+ // truncate to remove tag
+ match inner_len {
+ None => {
+ log::trace!("inbound worker: cryptokey routing failed");
+ msg.1.truncate(0);
+ }
+ Some(len) => {
+ log::trace!(
+ "inbound worker: good route, length = {} {}",
+ len,
+ if len == 0 { "(keepalive)" } else { "" }
+ );
+ msg.1.truncate(mem::size_of::<TransportHeader>() + len);
+ }
+ }
+ };
+ }
+
+ // mark ready
+ self.inner.ready.store(true, Ordering::Release);
+ }
+
+ fn sequential_work(self) {
+ let job = &self.inner;
+ let peer = &job.state.peer;
+ let mut msg = job.buffer.lock();
+
+ // cast transport header
+ let (header, packet): (LayoutVerified<&[u8], TransportHeader>, &[u8]) =
+ match LayoutVerified::new_from_prefix(&msg.1[..]) {
+ Some(v) => v,
+ None => {
+ // also covers authentication failure
+ return;
+ }
+ };
+
+ // check for replay
+ if !job.state.protector.lock().update(header.f_counter.get()) {
+ log::debug!("inbound worker: replay detected");
+ return;
+ }
+
+ // check for confirms key
+ if !job.state.confirmed.swap(true, Ordering::SeqCst) {
+ log::debug!("inbound worker: message confirms key");
+ peer.confirm_key(&job.state.keypair);
+ }
+
+ // update endpoint
+ *peer.endpoint.lock() = msg.0.take();
+
+ // check if should be written to TUN
+ let mut sent = false;
+ if packet.len() > 0 {
+ sent = match peer.device.inbound.write(&packet[..]) {
+ Err(e) => {
+ log::debug!("failed to write inbound packet to TUN: {:?}", e);
+ false
+ }
+ Ok(_) => true,
+ }
+ } else {
+ log::debug!("inbound worker: received keepalive")
+ }
+
+ // trigger callback
+ C::recv(&peer.opaque, msg.1.len(), sent, &job.state.keypair);
+ }
+
+}
diff --git a/src/wireguard/router/send.rs b/src/wireguard/router/send.rs
new file mode 100644
index 0000000..2bd4abd
--- /dev/null
+++ b/src/wireguard/router/send.rs
@@ -0,0 +1,140 @@
+use super::queue::{Job, Queue};
+use super::KeyPair;
+use super::types::Callbacks;
+use super::peer::Peer;
+use super::{REJECT_AFTER_MESSAGES, SIZE_TAG};
+use super::messages::{TransportHeader, TYPE_TRANSPORT};
+
+use super::super::{tun, udp, Endpoint};
+
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Arc;
+
+use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305};
+use zerocopy::{AsBytes, LayoutVerified};
+use spin::Mutex;
+
+struct Inner<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> {
+ ready: AtomicBool,
+ buffer: Mutex<Vec<u8>>,
+ counter: u64,
+ keypair: Arc<KeyPair>,
+ peer: Peer<E, C, T, B>,
+}
+
+pub struct SendJob<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> {
+ inner: Arc<Inner<E, C, T, B>>,
+}
+
+impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> SendJob<E, C, T, B> {
+ pub fn new(
+ buffer: Vec<u8>,
+ counter: u64,
+ keypair: Arc<KeyPair>,
+ peer: Peer<E, C, T, B>
+ ) -> Option<SendJob<E, C, T, B>> {
+ // create job
+ let inner = Arc::new(Inner{
+ buffer: Mutex::new(buffer),
+ counter,
+ keypair,
+ peer,
+ ready: AtomicBool::new(false)
+ });
+
+ // attempt to add to queue
+ if peer.outbound.push(SendJob{ inner: inner.clone()}) {
+ Some(SendJob{inner})
+ } else {
+ None
+ }
+ }
+}
+
+impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Job for SendJob<E, C, T, B> {
+ fn queue(&self) -> &Queue<Self> {
+ &self.inner.peer.outbound
+ }
+
+ fn is_ready(&self) -> bool {
+ self.inner.ready.load(Ordering::Acquire)
+ }
+
+ fn parallel_work(&self) {
+ debug_assert_eq!(
+ self.is_ready(),
+ false,
+ "doing parallel work on completed job"
+ );
+ log::trace!("processing parallel send job");
+
+ // encrypt body
+ {
+ // make space for the tag
+ let job = &*self.inner;
+ let mut msg = job.buffer.lock();
+ msg.extend([0u8; SIZE_TAG].iter());
+
+ // cast to header (should never fail)
+ let (mut header, packet): (LayoutVerified<&mut [u8], TransportHeader>, &mut [u8]) =
+ LayoutVerified::new_from_prefix(&mut msg[..])
+ .expect("earlier code should ensure that there is ample space");
+
+ // set header fields
+ debug_assert!(
+ job.counter < REJECT_AFTER_MESSAGES,
+ "should be checked when assigning counters"
+ );
+ header.f_type.set(TYPE_TRANSPORT);
+ header.f_receiver.set(job.keypair.send.id);
+ header.f_counter.set(job.counter);
+
+ // create a nonce object
+ let mut nonce = [0u8; 12];
+ debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len());
+ nonce[4..].copy_from_slice(header.f_counter.as_bytes());
+ let nonce = Nonce::assume_unique_for_key(nonce);
+
+ // do the weird ring AEAD dance
+ let key = LessSafeKey::new(
+ UnboundKey::new(&CHACHA20_POLY1305, &job.keypair.send.key[..]).unwrap(),
+ );
+
+ // encrypt contents of transport message in-place
+ let end = packet.len() - SIZE_TAG;
+ let tag = key
+ .seal_in_place_separate_tag(nonce, Aad::empty(), &mut packet[..end])
+ .unwrap();
+
+ // append tag
+ packet[end..].copy_from_slice(tag.as_ref());
+ }
+
+ // mark ready
+ self.inner.ready.store(true, Ordering::Release);
+ }
+
+ fn sequential_work(self) {
+ debug_assert_eq!(
+ self.is_ready(),
+ true,
+ "doing sequential work
+ on an incomplete job"
+ );
+ log::trace!("processing sequential send job");
+
+ // send to peer
+ let job = &self.inner;
+ let msg = job.buffer.lock();
+ let xmit = job.peer.send(&msg[..]).is_ok();
+
+ // trigger callback (for timers)
+ C::send(
+ &job.peer.opaque,
+ msg.len(),
+ xmit,
+ &job.keypair,
+ job.counter,
+ );
+ }
+}
diff --git a/src/wireguard/router/worker.rs b/src/wireguard/router/worker.rs
new file mode 100644
index 0000000..d95050e
--- /dev/null
+++ b/src/wireguard/router/worker.rs
@@ -0,0 +1,13 @@
+use super::Device;
+
+use super::super::{tun, udp, Endpoint};
+use super::types::Callbacks;
+
+use super::receive::ReceieveJob;
+use super::send::SendJob;
+
+fn worker<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>(
+ device: Device<E, C, T, B>,
+) {
+ // fetch job
+}
diff --git a/src/wireguard/router/workers.rs b/src/wireguard/router/workers.rs
index 43464a0..8ddc136 100644
--- a/src/wireguard/router/workers.rs
+++ b/src/wireguard/router/workers.rs
@@ -132,8 +132,7 @@ pub fn worker_inbound<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<
}
}
-/* TODO: Replace with run-queue
- */
+
pub fn worker_outbound<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>(
peer: Arc<PeerInner<E, C, T, B>>,
receiver: Receiver<JobOutbound>,