diff options
Diffstat (limited to 'src/wireguard/router/workers.rs')
-rw-r--r-- | src/wireguard/router/workers.rs | 184 |
1 files changed, 103 insertions, 81 deletions
diff --git a/src/wireguard/router/workers.rs b/src/wireguard/router/workers.rs index 70334c1..3d85188 100644 --- a/src/wireguard/router/workers.rs +++ b/src/wireguard/router/workers.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use futures::sync::oneshot; use futures::*; -use log::debug; +use log::{debug, trace}; use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305}; @@ -16,34 +16,40 @@ use super::messages::{TransportHeader, TYPE_TRANSPORT}; use super::peer::PeerInner; use super::route::check_route; use super::types::Callbacks; +use super::REJECT_AFTER_MESSAGES; +use super::super::types::KeyPair; use super::super::{bind, tun, Endpoint}; pub const SIZE_TAG: usize = 16; -#[derive(PartialEq, Debug)] -pub enum Operation { - Encryption, - Decryption, +#[derive(Debug)] +pub struct JobEncryption { + pub msg: Vec<u8>, + pub keypair: Arc<KeyPair>, + pub counter: u64, } -pub struct JobBuffer { - pub msg: Vec<u8>, // message buffer (nonce and receiver id set) - pub key: [u8; 32], // chacha20poly1305 key - pub okay: bool, // state of the job - pub op: Operation, // should be buffer be encrypted / decrypted? +#[derive(Debug)] +pub struct JobDecryption { + pub msg: Vec<u8>, + pub keypair: Arc<KeyPair>, } -pub type JobParallel = (oneshot::Sender<JobBuffer>, JobBuffer); +#[derive(Debug)] +pub enum JobParallel { + Encryption(oneshot::Sender<JobEncryption>, JobEncryption), + Decryption(oneshot::Sender<Option<JobDecryption>>, JobDecryption), +} #[allow(type_alias_bounds)] pub type JobInbound<E, C, T, B: bind::Writer<E>> = ( Arc<DecryptionState<E, C, T, B>>, E, - oneshot::Receiver<JobBuffer>, + oneshot::Receiver<Option<JobDecryption>>, ); -pub type JobOutbound = oneshot::Receiver<JobBuffer>; +pub type JobOutbound = oneshot::Receiver<JobEncryption>; pub fn worker_inbound<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Writer<E>>( device: Arc<DeviceInner<E, C, T, B>>, // related device @@ -64,7 +70,7 @@ pub fn worker_inbound<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Writer let _ = rx .map(|buf| { debug!("inbound worker: job complete"); - if buf.okay { + if let Some(buf) = buf { // cast transport header let (header, packet): (LayoutVerified<&[u8], TransportHeader>, &[u8]) = match LayoutVerified::new_from_prefix(&buf.msg[..]) { @@ -134,6 +140,10 @@ pub fn worker_outbound<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Write peer: Arc<PeerInner<E, C, T, B>>, // related peer receiver: Receiver<JobOutbound>, ) { + fn keep_key_fresh(keypair: &KeyPair, counter: u64) -> bool { + false + } + loop { // fetch job let rx = match receiver.recv() { @@ -148,27 +158,30 @@ pub fn worker_outbound<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Write let _ = rx .map(|buf| { debug!("outbound worker: job complete"); - if buf.okay { - // write to UDP bind - let xmit = if let Some(dst) = peer.endpoint.lock().as_ref() { - let send: &Option<B> = &*device.outbound.read(); - if let Some(writer) = send.as_ref() { - match writer.write(&buf.msg[..], dst) { - Err(e) => { - debug!("failed to send outbound packet: {:?}", e); - false - } - Ok(_) => true, + // write to UDP bind + let xmit = if let Some(dst) = peer.endpoint.lock().as_ref() { + let send: &Option<B> = &*device.outbound.read(); + if let Some(writer) = send.as_ref() { + match writer.write(&buf.msg[..], dst) { + Err(e) => { + debug!("failed to send outbound packet: {:?}", e); + false } - } else { - false + Ok(_) => true, } } else { false - }; + } + } else { + false + }; - // trigger callback - C::send(&peer.opaque, buf.msg.len(), xmit); + // trigger callback + C::send(&peer.opaque, buf.msg.len(), xmit); + + // keep_key_fresh semantics + if keep_key_fresh(&buf.keypair, buf.counter) { + C::need_key(&peer.opaque); } }) .wait(); @@ -178,76 +191,85 @@ pub fn worker_outbound<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Write pub fn worker_parallel(receiver: Receiver<JobParallel>) { loop { // fetch next job - let (tx, mut buf) = match receiver.recv() { + let job = match receiver.recv() { Err(_) => { return; } Ok(val) => val, }; - debug!("parallel worker: obtained job"); - - // make space for tag (TODO: consider moving this out) - if buf.op == Operation::Encryption { - buf.msg.extend([0u8; SIZE_TAG].iter()); - } + trace!("parallel worker: obtained job"); - // cast and check size of packet - let (mut header, packet): (LayoutVerified<&mut [u8], TransportHeader>, &mut [u8]) = - match LayoutVerified::new_from_prefix(&mut buf.msg[..]) { - Some(v) => v, - None => { - debug_assert!( - false, - "parallel worker: failed to parse message (insufficient size)" - ); - continue; - } - }; - debug_assert!(packet.len() >= CHACHA20_POLY1305.tag_len()); + // handle job + match job { + JobParallel::Encryption(tx, mut job) => { + job.msg.extend([0u8; SIZE_TAG].iter()); - // do the weird ring AEAD dance - let key = LessSafeKey::new(UnboundKey::new(&CHACHA20_POLY1305, &buf.key[..]).unwrap()); + // cast to header (should never fail) + let (mut header, body): (LayoutVerified<&mut [u8], TransportHeader>, &mut [u8]) = + LayoutVerified::new_from_prefix(&mut job.msg[..]) + .expect("earlier code should ensure that there is ample space"); - // create a nonce object - let mut nonce = [0u8; 12]; - debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len()); - nonce[4..].copy_from_slice(header.f_counter.as_bytes()); - let nonce = Nonce::assume_unique_for_key(nonce); + // set header fields + header.f_type.set(TYPE_TRANSPORT); + header.f_receiver.set(job.keypair.send.id); + header.f_counter.set(job.counter); - match buf.op { - Operation::Encryption => { - debug!("parallel worker: process encryption"); + // create a nonce object + let mut nonce = [0u8; 12]; + debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len()); + nonce[4..].copy_from_slice(header.f_counter.as_bytes()); + let nonce = Nonce::assume_unique_for_key(nonce); - // set the type field - header.f_type.set(TYPE_TRANSPORT); + // do the weird ring AEAD dance + let key = LessSafeKey::new( + UnboundKey::new(&CHACHA20_POLY1305, &job.keypair.send.key[..]).unwrap(), + ); // encrypt content of transport message in-place - let end = packet.len() - SIZE_TAG; + let end = body.len() - SIZE_TAG; let tag = key - .seal_in_place_separate_tag(nonce, Aad::empty(), &mut packet[..end]) + .seal_in_place_separate_tag(nonce, Aad::empty(), &mut body[..end]) .unwrap(); // append tag - packet[end..].copy_from_slice(tag.as_ref()); + body[end..].copy_from_slice(tag.as_ref()); - buf.okay = true; + // pass ownership + let _ = tx.send(job); } - Operation::Decryption => { - debug!("parallel worker: process decryption"); - - // opening failure is signaled by fault state - buf.okay = match key.open_in_place(nonce, Aad::empty(), packet) { - Ok(_) => true, - Err(_) => false, - }; + JobParallel::Decryption(tx, mut job) => { + // cast to header (could fail) + let layout: Option<(LayoutVerified<&mut [u8], TransportHeader>, &mut [u8])> = + LayoutVerified::new_from_prefix(&mut job.msg[..]); + + let _ = tx.send(match layout { + Some((header, body)) => { + debug_assert_eq!(header.f_type.get(), TYPE_TRANSPORT); + if header.f_counter.get() >= REJECT_AFTER_MESSAGES { + None + } else { + // create a nonce object + let mut nonce = [0u8; 12]; + debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len()); + nonce[4..].copy_from_slice(header.f_counter.as_bytes()); + let nonce = Nonce::assume_unique_for_key(nonce); + + // do the weird ring AEAD dance + let key = LessSafeKey::new( + UnboundKey::new(&CHACHA20_POLY1305, &job.keypair.recv.key[..]) + .unwrap(), + ); + + // attempt to open (and authenticate) the body + match key.open_in_place(nonce, Aad::empty(), body) { + Ok(_) => Some(job), + Err(_) => None, + } + } + } + None => None, + }); } } - - // pass ownership to consumer - let okay = tx.send(buf); - debug!( - "parallel worker: passing ownership to sequential worker: {}", - okay.is_ok() - ); } } |