diff options
author | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-09-10 21:42:21 +0200 |
---|---|---|
committer | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-09-10 21:42:21 +0200 |
commit | 02d9bdcc96c955b654a45d3629b1ee515902078d (patch) | |
tree | d0231989ccca424d26f0dcded640acac079aa9de /src/router/workers.rs | |
parent | Begin work on full router interaction unittest (diff) | |
download | wireguard-rs-02d9bdcc96c955b654a45d3629b1ee515902078d.tar.xz wireguard-rs-02d9bdcc96c955b654a45d3629b1ee515902078d.zip |
Full inbound/outbound router test
Diffstat (limited to '')
-rw-r--r-- | src/router/workers.rs | 72 |
1 files changed, 54 insertions, 18 deletions
diff --git a/src/router/workers.rs b/src/router/workers.rs index 85cf22a..b038a20 100644 --- a/src/router/workers.rs +++ b/src/router/workers.rs @@ -13,13 +13,14 @@ use std::sync::atomic::Ordering; use zerocopy::{AsBytes, LayoutVerified}; use super::device::{DecryptionState, DeviceInner}; -use super::messages::TransportHeader; +use super::messages::{TransportHeader, TYPE_TRANSPORT}; use super::peer::PeerInner; use super::types::Callbacks; +use super::super::types::{Bind, Tun}; use super::ip::*; -use super::super::types::{Bind, Tun}; +const SIZE_TAG: usize = 16; #[derive(PartialEq, Debug)] pub enum Operation { @@ -105,32 +106,37 @@ pub fn worker_inbound<C: Callbacks, T: Tun, B: Bind>( return; } }; + debug!("inbound worker: obtained job"); // wait for job to complete let _ = rx .map(|buf| { + debug!("inbound worker: job complete"); if buf.okay { // cast transport header let (header, packet): (LayoutVerified<&[u8], TransportHeader>, &[u8]) = match LayoutVerified::new_from_prefix(&buf.msg[..]) { Some(v) => v, None => { + debug!("inbound worker: failed to parse message"); return; } }; debug_assert!( packet.len() >= CHACHA20_POLY1305.tag_len(), - "this should be checked earlier in the pipeline" + "this should be checked earlier in the pipeline (decryption should fail)" ); // check for replay if !state.protector.lock().update(header.f_counter.get()) { + debug!("inbound worker: replay detected"); return; } // check for confirms key if !state.confirmed.swap(true, Ordering::SeqCst) { + debug!("inbound worker: message confirms key"); peer.confirm_key(&state.keypair); } @@ -138,7 +144,8 @@ pub fn worker_inbound<C: Callbacks, T: Tun, B: Bind>( *peer.endpoint.lock() = Some(endpoint); // calculate length of IP packet + padding - let length = packet.len() - CHACHA20_POLY1305.nonce_len(); + let length = packet.len() - SIZE_TAG; + debug!("inbound worker: plaintext length = {}", length); // check if should be written to TUN let mut sent = false; @@ -155,10 +162,14 @@ pub fn worker_inbound<C: Callbacks, T: Tun, B: Bind>( } } } + } else { + debug!("inbound worker: received keepalive") } // trigger callback - (device.call_recv)(&peer.opaque, length == 0, sent); + (device.call_recv)(&peer.opaque, buf.msg.len(), length == 0, sent); + } else { + debug!("inbound worker: authentication failure") } }) .wait(); @@ -178,10 +189,12 @@ pub fn worker_outbound<C: Callbacks, T: Tun, B: Bind>( return; } }; + debug!("outbound worker: obtained job"); // wait for job to complete let _ = rx .map(|buf| { + debug!("outbound worker: job complete"); if buf.okay { // write to UDP bind let xmit = if let Some(dst) = peer.endpoint.lock().as_ref() { @@ -199,6 +212,7 @@ pub fn worker_outbound<C: Callbacks, T: Tun, B: Bind>( // trigger callback (device.call_send)( &peer.opaque, + buf.msg.len(), buf.msg.len() > CHACHA20_POLY1305.nonce_len() + mem::size_of::<TransportHeader>(), xmit, @@ -218,17 +232,26 @@ pub fn worker_parallel(receiver: Receiver<JobParallel>) { } Ok(val) => val, }; + debug!("parallel worker: obtained job"); + + // make space for tag (TODO: consider moving this out) + if buf.op == Operation::Encryption { + buf.msg.extend([0u8; SIZE_TAG].iter()); + } // cast and check size of packet - let (header, packet): (LayoutVerified<&[u8], TransportHeader>, &[u8]) = - match LayoutVerified::new_from_prefix(&buf.msg[..]) { + let (mut header, packet): (LayoutVerified<&mut [u8], TransportHeader>, &mut [u8]) = + match LayoutVerified::new_from_prefix(&mut buf.msg[..]) { Some(v) => v, - None => continue, + None => { + debug_assert!( + false, + "parallel worker: failed to parse message (insufficient size)" + ); + continue; + } }; - - if packet.len() < CHACHA20_POLY1305.nonce_len() { - continue; - } + debug_assert!(packet.len() >= CHACHA20_POLY1305.tag_len()); // do the weird ring AEAD dance let key = LessSafeKey::new(UnboundKey::new(&CHACHA20_POLY1305, &buf.key[..]).unwrap()); @@ -241,18 +264,27 @@ pub fn worker_parallel(receiver: Receiver<JobParallel>) { match buf.op { Operation::Encryption => { - debug!("worker, process encryption"); + debug!("parallel worker: process encryption"); - // note: extends the vector to accommodate the tag - key.seal_in_place_append_tag(nonce, Aad::empty(), &mut buf.msg) + // set the type field + header.f_type.set(TYPE_TRANSPORT); + + // encrypt content of transport message in-place + let end = packet.len() - SIZE_TAG; + let tag = key + .seal_in_place_separate_tag(nonce, Aad::empty(), &mut packet[..end]) .unwrap(); + + // append tag + packet[end..].copy_from_slice(tag.as_ref()); + buf.okay = true; } Operation::Decryption => { - debug!("worker, process decryption"); + debug!("parallel worker: process decryption"); // opening failure is signaled by fault state - buf.okay = match key.open_in_place(nonce, Aad::empty(), &mut buf.msg) { + buf.okay = match key.open_in_place(nonce, Aad::empty(), packet) { Ok(_) => true, Err(_) => false, }; @@ -260,6 +292,10 @@ pub fn worker_parallel(receiver: Receiver<JobParallel>) { } // pass ownership to consumer - let _ = tx.send(buf); + let okay = tx.send(buf); + debug!( + "parallel worker: passing ownership to sequential worker: {}", + okay.is_ok() + ); } } |