diff options
Diffstat (limited to 'src/wireguard/router')
-rw-r--r-- | src/wireguard/router/mod.rs | 5 | ||||
-rw-r--r-- | src/wireguard/router/peer.rs | 32 | ||||
-rw-r--r-- | src/wireguard/router/queue.rs | 76 | ||||
-rw-r--r-- | src/wireguard/router/receive.rs | 188 | ||||
-rw-r--r-- | src/wireguard/router/send.rs | 140 | ||||
-rw-r--r-- | src/wireguard/router/worker.rs | 13 | ||||
-rw-r--r-- | src/wireguard/router/workers.rs | 3 |
7 files changed, 439 insertions, 18 deletions
diff --git a/src/wireguard/router/mod.rs b/src/wireguard/router/mod.rs index 8238d32..ec5cc63 100644 --- a/src/wireguard/router/mod.rs +++ b/src/wireguard/router/mod.rs @@ -11,6 +11,11 @@ mod route; mod runq; mod types; +mod queue; +mod receive; +mod send; +mod worker; + #[cfg(test)] mod tests; diff --git a/src/wireguard/router/peer.rs b/src/wireguard/router/peer.rs index 23a3e62..7312bc7 100644 --- a/src/wireguard/router/peer.rs +++ b/src/wireguard/router/peer.rs @@ -25,7 +25,10 @@ use super::SIZE_MESSAGE_PREFIX; // worker pool related use super::inbound::Inbound; use super::outbound::Outbound; -use super::pool::{InorderQueue, Job}; +use super::queue::Queue; + +use super::send::SendJob; +use super::receive::ReceiveJob; pub struct KeyWheel { next: Option<Arc<KeyPair>>, // next key state (unconfirmed) @@ -37,8 +40,8 @@ pub struct KeyWheel { pub struct PeerInner<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> { pub device: Device<E, C, T, B>, pub opaque: C::Opaque, - pub outbound: InorderQueue<Peer<E, C, T, B>, Outbound>, - pub inbound: InorderQueue<Peer<E, C, T, B>, Inbound<E, C, T, B>>, + pub outbound: Queue<SendJob<E, C, T, B>>, + pub inbound: Queue<ReceiveJob<E, C, T, B>>, pub staged_packets: Mutex<ArrayDeque<[Vec<u8>; MAX_QUEUED_PACKETS], Wrapping>>, pub keys: Mutex<KeyWheel>, pub ekey: Mutex<Option<EncryptionState>>, @@ -288,22 +291,11 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Peer<E, C, T, self.send_staged(); } - pub fn recv_job( - &self, - src: E, - dec: Arc<DecryptionState<E, C, T, B>>, - msg: Vec<u8>, - ) -> Option<Job<Self, Inbound<E, C, T, B>>> { - let job = Job::new(self.clone(), Inbound::new(msg, dec, src)); - self.inbound.send(job.clone()); - Some(job) - } - - pub fn send_job(&self, msg: Vec<u8>, stage: bool) -> Option<Job<Self, Outbound>> { + pub fn send_job(&self, msg: Vec<u8>, stage: bool) -> Option<SendJob<E, C, T, B>> { debug!("peer.send_job"); debug_assert!( msg.len() >= mem::size_of::<TransportHeader>(), - "received message with size: {:}", + "received TUN message with size: {:}", msg.len() ); @@ -323,6 +315,14 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Peer<E, C, T, debug!("encryption state available, nonce = {}", state.nonce); let counter = state.nonce; state.nonce += 1; + + SendJob::new( + msg, + state.nonce, + state.keypair.clone(), + self.clone() + ); + Some((state.keypair.clone(), counter)) } } diff --git a/src/wireguard/router/queue.rs b/src/wireguard/router/queue.rs new file mode 100644 index 0000000..045fd51 --- /dev/null +++ b/src/wireguard/router/queue.rs @@ -0,0 +1,76 @@ +use arraydeque::ArrayDeque; +use spin::Mutex; + +use std::mem; +use std::sync::atomic::{AtomicUsize, Ordering}; + +const QUEUE_SIZE: usize = 1024; + +pub trait Job: Sized { + fn queue(&self) -> &Queue<Self>; + + fn is_ready(&self) -> bool; + + fn parallel_work(&self); + + fn sequential_work(self); +} + + +pub struct Queue<J: Job> { + contenders: AtomicUsize, + queue: Mutex<ArrayDeque<[J; QUEUE_SIZE]>>, +} + +impl<J: Job> Queue<J> { + pub fn new() -> Queue<J> { + Queue { + contenders: AtomicUsize::new(0), + queue: Mutex::new(ArrayDeque::new()), + } + } + + pub fn push(&self, job: J) -> bool { + self.queue.lock().push_back(job).is_ok() + } + + pub fn consume(&self) { + // check if we are the first contender + let pos = self.contenders.fetch_add(1, Ordering::Acquire); + if pos > 0 { + assert!(pos < usize::max_value(), "contenders overflow"); + } + + // enter the critical section + let mut contenders = 1; // myself + while contenders > 0 { + // handle every ready element + loop { + let mut queue = self.queue.lock(); + + // check if front job is ready + match queue.front() { + None => break, + Some(job) => { + if job.is_ready() { + () + } else { + break; + } + } + }; + + // take the job out of the queue + let job = queue.pop_front().unwrap(); + debug_assert!(job.is_ready()); + mem::drop(queue); + + // process element + job.sequential_work(); + } + + // decrease contenders + contenders = self.contenders.fetch_sub(contenders, Ordering::Acquire) - contenders; + } + } +} diff --git a/src/wireguard/router/receive.rs b/src/wireguard/router/receive.rs new file mode 100644 index 0000000..53890e3 --- /dev/null +++ b/src/wireguard/router/receive.rs @@ -0,0 +1,188 @@ +use super::queue::{Job, Queue}; +use super::KeyPair; +use super::types::Callbacks; +use super::peer::Peer; +use super::{REJECT_AFTER_MESSAGES, SIZE_TAG}; +use super::messages::{TransportHeader, TYPE_TRANSPORT}; +use super::device::DecryptionState; + +use super::super::{tun, udp, Endpoint}; + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::mem; + +use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305}; +use zerocopy::{AsBytes, LayoutVerified}; +use spin::Mutex; + + +struct Inner<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> { + ready: AtomicBool, + buffer: Mutex<(Option<E>, Vec<u8>)>, // endpoint & ciphertext buffer + state: Arc<DecryptionState<E, C, T, B>>, // decryption state (keys and replay protector) +} + +pub struct ReceiveJob<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> { + inner: Arc<Inner<E, C, T, B>>, +} + +impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> ReceiveJob<E, C, T, B> { + fn new(buffer: Vec<u8>, state: Arc<DecryptionState<E, C, T, B>>, endpoint: E) -> Option<ReceiveJob<E, C, T, B>> { + // create job + let inner = Arc::new(Inner{ + ready: AtomicBool::new(false), + buffer: Mutex::new((Some(endpoint), buffer)), + state + }); + + // attempt to add to queue + if state.peer.inbound.push(ReceiveJob{ inner: inner.clone()}) { + Some(ReceiveJob{inner}) + } else { + None + } + + } +} + +impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Job for ReceiveJob<E, C, T, B> { + fn queue(&self) -> &Queue<Self> { + &self.inner.state.peer.inbound + } + + fn is_ready(&self) -> bool { + self.inner.ready.load(Ordering::Acquire) + } + + fn parallel_work(&self) { + // TODO: refactor + // decrypt + { + let job = &self.inner; + let peer = &job.state.peer; + let mut msg = job.buffer.lock(); + + let failed = || { + // cast to header followed by payload + let (header, packet): (LayoutVerified<&mut [u8], TransportHeader>, &mut [u8]) = + match LayoutVerified::new_from_prefix(&mut msg.1[..]) { + Some(v) => v, + None => { + log::debug!("inbound worker: failed to parse message"); + return; + } + }; + + // authenticate and decrypt payload + { + // create nonce object + let mut nonce = [0u8; 12]; + debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len()); + nonce[4..].copy_from_slice(header.f_counter.as_bytes()); + let nonce = Nonce::assume_unique_for_key(nonce); + + // do the weird ring AEAD dance + let key = LessSafeKey::new( + UnboundKey::new(&CHACHA20_POLY1305, &job.state.keypair.recv.key[..]).unwrap(), + ); + + // attempt to open (and authenticate) the body + match key.open_in_place(nonce, Aad::empty(), packet) { + Ok(_) => (), + Err(_) => { + // fault and return early + log::trace!("inbound worker: authentication failure"); + msg.1.truncate(0); + return; + } + } + } + + // check that counter not after reject + if header.f_counter.get() >= REJECT_AFTER_MESSAGES { + msg.1.truncate(0); + return; + } + + // cryptokey route and strip padding + let inner_len = { + let length = packet.len() - SIZE_TAG; + if length > 0 { + peer.device.table.check_route(&peer, &packet[..length]) + } else { + Some(0) + } + }; + + // truncate to remove tag + match inner_len { + None => { + log::trace!("inbound worker: cryptokey routing failed"); + msg.1.truncate(0); + } + Some(len) => { + log::trace!( + "inbound worker: good route, length = {} {}", + len, + if len == 0 { "(keepalive)" } else { "" } + ); + msg.1.truncate(mem::size_of::<TransportHeader>() + len); + } + } + }; + } + + // mark ready + self.inner.ready.store(true, Ordering::Release); + } + + fn sequential_work(self) { + let job = &self.inner; + let peer = &job.state.peer; + let mut msg = job.buffer.lock(); + + // cast transport header + let (header, packet): (LayoutVerified<&[u8], TransportHeader>, &[u8]) = + match LayoutVerified::new_from_prefix(&msg.1[..]) { + Some(v) => v, + None => { + // also covers authentication failure + return; + } + }; + + // check for replay + if !job.state.protector.lock().update(header.f_counter.get()) { + log::debug!("inbound worker: replay detected"); + return; + } + + // check for confirms key + if !job.state.confirmed.swap(true, Ordering::SeqCst) { + log::debug!("inbound worker: message confirms key"); + peer.confirm_key(&job.state.keypair); + } + + // update endpoint + *peer.endpoint.lock() = msg.0.take(); + + // check if should be written to TUN + let mut sent = false; + if packet.len() > 0 { + sent = match peer.device.inbound.write(&packet[..]) { + Err(e) => { + log::debug!("failed to write inbound packet to TUN: {:?}", e); + false + } + Ok(_) => true, + } + } else { + log::debug!("inbound worker: received keepalive") + } + + // trigger callback + C::recv(&peer.opaque, msg.1.len(), sent, &job.state.keypair); + } + +} diff --git a/src/wireguard/router/send.rs b/src/wireguard/router/send.rs new file mode 100644 index 0000000..2bd4abd --- /dev/null +++ b/src/wireguard/router/send.rs @@ -0,0 +1,140 @@ +use super::queue::{Job, Queue}; +use super::KeyPair; +use super::types::Callbacks; +use super::peer::Peer; +use super::{REJECT_AFTER_MESSAGES, SIZE_TAG}; +use super::messages::{TransportHeader, TYPE_TRANSPORT}; + +use super::super::{tun, udp, Endpoint}; + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305}; +use zerocopy::{AsBytes, LayoutVerified}; +use spin::Mutex; + +struct Inner<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> { + ready: AtomicBool, + buffer: Mutex<Vec<u8>>, + counter: u64, + keypair: Arc<KeyPair>, + peer: Peer<E, C, T, B>, +} + +pub struct SendJob<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> { + inner: Arc<Inner<E, C, T, B>>, +} + +impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> SendJob<E, C, T, B> { + pub fn new( + buffer: Vec<u8>, + counter: u64, + keypair: Arc<KeyPair>, + peer: Peer<E, C, T, B> + ) -> Option<SendJob<E, C, T, B>> { + // create job + let inner = Arc::new(Inner{ + buffer: Mutex::new(buffer), + counter, + keypair, + peer, + ready: AtomicBool::new(false) + }); + + // attempt to add to queue + if peer.outbound.push(SendJob{ inner: inner.clone()}) { + Some(SendJob{inner}) + } else { + None + } + } +} + +impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Job for SendJob<E, C, T, B> { + fn queue(&self) -> &Queue<Self> { + &self.inner.peer.outbound + } + + fn is_ready(&self) -> bool { + self.inner.ready.load(Ordering::Acquire) + } + + fn parallel_work(&self) { + debug_assert_eq!( + self.is_ready(), + false, + "doing parallel work on completed job" + ); + log::trace!("processing parallel send job"); + + // encrypt body + { + // make space for the tag + let job = &*self.inner; + let mut msg = job.buffer.lock(); + msg.extend([0u8; SIZE_TAG].iter()); + + // cast to header (should never fail) + let (mut header, packet): (LayoutVerified<&mut [u8], TransportHeader>, &mut [u8]) = + LayoutVerified::new_from_prefix(&mut msg[..]) + .expect("earlier code should ensure that there is ample space"); + + // set header fields + debug_assert!( + job.counter < REJECT_AFTER_MESSAGES, + "should be checked when assigning counters" + ); + header.f_type.set(TYPE_TRANSPORT); + header.f_receiver.set(job.keypair.send.id); + header.f_counter.set(job.counter); + + // create a nonce object + let mut nonce = [0u8; 12]; + debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len()); + nonce[4..].copy_from_slice(header.f_counter.as_bytes()); + let nonce = Nonce::assume_unique_for_key(nonce); + + // do the weird ring AEAD dance + let key = LessSafeKey::new( + UnboundKey::new(&CHACHA20_POLY1305, &job.keypair.send.key[..]).unwrap(), + ); + + // encrypt contents of transport message in-place + let end = packet.len() - SIZE_TAG; + let tag = key + .seal_in_place_separate_tag(nonce, Aad::empty(), &mut packet[..end]) + .unwrap(); + + // append tag + packet[end..].copy_from_slice(tag.as_ref()); + } + + // mark ready + self.inner.ready.store(true, Ordering::Release); + } + + fn sequential_work(self) { + debug_assert_eq!( + self.is_ready(), + true, + "doing sequential work + on an incomplete job" + ); + log::trace!("processing sequential send job"); + + // send to peer + let job = &self.inner; + let msg = job.buffer.lock(); + let xmit = job.peer.send(&msg[..]).is_ok(); + + // trigger callback (for timers) + C::send( + &job.peer.opaque, + msg.len(), + xmit, + &job.keypair, + job.counter, + ); + } +} diff --git a/src/wireguard/router/worker.rs b/src/wireguard/router/worker.rs new file mode 100644 index 0000000..d95050e --- /dev/null +++ b/src/wireguard/router/worker.rs @@ -0,0 +1,13 @@ +use super::Device; + +use super::super::{tun, udp, Endpoint}; +use super::types::Callbacks; + +use super::receive::ReceieveJob; +use super::send::SendJob; + +fn worker<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>( + device: Device<E, C, T, B>, +) { + // fetch job +} diff --git a/src/wireguard/router/workers.rs b/src/wireguard/router/workers.rs index 43464a0..8ddc136 100644 --- a/src/wireguard/router/workers.rs +++ b/src/wireguard/router/workers.rs @@ -132,8 +132,7 @@ pub fn worker_inbound<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer< } } -/* TODO: Replace with run-queue - */ + pub fn worker_outbound<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>( peer: Arc<PeerInner<E, C, T, B>>, receiver: Receiver<JobOutbound>, |