From db02609334482cf391dbf665559ca60654ed4398 Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Thu, 20 Feb 2020 13:21:37 +0100 Subject: More comprehensive unit tests for router --- src/wireguard/router/device.rs | 28 +- src/wireguard/router/ip.rs | 23 ++ src/wireguard/router/mod.rs | 1 + src/wireguard/router/peer.rs | 32 +- src/wireguard/router/receive.rs | 123 +++--- src/wireguard/router/route.rs | 71 ++-- src/wireguard/router/send.rs | 89 +++-- src/wireguard/router/tests.rs | 814 ++++++++++++++++++++++------------------ src/wireguard/router/worker.rs | 7 +- 9 files changed, 625 insertions(+), 563 deletions(-) (limited to 'src/wireguard/router') diff --git a/src/wireguard/router/device.rs b/src/wireguard/router/device.rs index 9d78178..8bfa261 100644 --- a/src/wireguard/router/device.rs +++ b/src/wireguard/router/device.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use std::thread; use std::time::Instant; -use log::debug; +use log; use spin::{Mutex, RwLock}; use zerocopy::LayoutVerified; @@ -91,20 +91,17 @@ impl> Drop for DeviceHandle { fn drop(&mut self) { - debug!("router: dropping device"); + log::debug!("router: dropping device"); // close worker queue self.state.work.close(); // join all worker threads - while match self.handles.pop() { - Some(handle) => { - handle.thread().unpark(); - handle.join().unwrap(); - true - } - _ => false, - } {} + while let Some(handle) = self.handles.pop() { + handle.thread().unpark(); + handle.join().unwrap(); + } + log::debug!("router: joined with all workers from pool"); } } @@ -124,8 +121,13 @@ impl> DeviceHandle< // start worker threads let mut threads = Vec::with_capacity(num_workers); while let Some(rx) = consumers.pop() { - threads.push(thread::spawn(move || worker(rx))); + println!("spawn"); + threads.push(thread::spawn(move || { + println!("spawned"); + worker(rx); + })); } + debug_assert!(num_workers > 0, "zero worker threads"); debug_assert_eq!(threads.len(), num_workers); // return exported device handle @@ -135,14 +137,14 @@ impl> DeviceHandle< } } - pub fn send_raw(&self, msg : &[u8], dst: &mut E) -> Result<(), B::Error> { + pub fn send_raw(&self, msg: &[u8], dst: &mut E) -> Result<(), B::Error> { let bind = self.state.outbound.read(); if bind.0 { if let Some(bind) = bind.1.as_ref() { return bind.write(msg, dst); } } - return Ok(()) + return Ok(()); } /// Brings the router down. diff --git a/src/wireguard/router/ip.rs b/src/wireguard/router/ip.rs index e66144f..532c512 100644 --- a/src/wireguard/router/ip.rs +++ b/src/wireguard/router/ip.rs @@ -1,5 +1,8 @@ +use std::mem; + use byteorder::BigEndian; use zerocopy::byteorder::U16; +use zerocopy::LayoutVerified; use zerocopy::{AsBytes, FromBytes}; pub const VERSION_IP4: u8 = 4; @@ -24,3 +27,23 @@ pub struct IPv6Header { pub f_source: [u8; 16], pub f_destination: [u8; 16], } + +#[inline(always)] +pub fn inner_length(packet: &[u8]) -> Option { + match packet.get(0)? >> 4 { + VERSION_IP4 => { + let (header, _): (LayoutVerified<&[u8], IPv4Header>, _) = + LayoutVerified::new_from_prefix(packet)?; + + Some(header.f_total_len.get() as usize) + } + VERSION_IP6 => { + // check length and cast to IPv6 header + let (header, _): (LayoutVerified<&[u8], IPv6Header>, _) = + LayoutVerified::new_from_prefix(packet)?; + + Some(header.f_len.get() as usize + mem::size_of::()) + } + _ => None, + } +} diff --git a/src/wireguard/router/mod.rs b/src/wireguard/router/mod.rs index 699c621..19e037f 100644 --- a/src/wireguard/router/mod.rs +++ b/src/wireguard/router/mod.rs @@ -24,6 +24,7 @@ use super::types::*; pub const SIZE_TAG: usize = 16; pub const SIZE_MESSAGE_PREFIX: usize = mem::size_of::(); +pub const SIZE_KEEPALIVE: usize = mem::size_of::() + SIZE_TAG; pub const CAPACITY_MESSAGE_POSTFIX: usize = SIZE_TAG; pub const fn message_data_len(payload: usize) -> usize { diff --git a/src/wireguard/router/peer.rs b/src/wireguard/router/peer.rs index a20908e..689674d 100644 --- a/src/wireguard/router/peer.rs +++ b/src/wireguard/router/peer.rs @@ -22,7 +22,7 @@ use std::sync::atomic::AtomicBool; use std::sync::Arc; use arraydeque::{ArrayDeque, Wrapping}; -use log::debug; +use log; use spin::Mutex; pub struct KeyWheel { @@ -148,7 +148,7 @@ impl> Drop for Peer *peer.enc_key.lock() = None; *peer.endpoint.lock() = None; - debug!("peer dropped & removed from device"); + log::debug!("peer dropped & removed from device"); } } @@ -192,8 +192,6 @@ impl> PeerInner Result<(), RouterError> { - debug!("peer.send"); - // send to endpoint (if known) match self.endpoint.lock().as_mut() { Some(endpoint) => { @@ -227,6 +225,7 @@ impl> Peer { + log::debug!("no key encryption key available"); if stage { self.staged_packets.lock().push_back(msg); }; @@ -235,13 +234,14 @@ impl> Peer { // avoid integer overflow in nonce if state.nonce >= REJECT_AFTER_MESSAGES - 1 { + log::debug!("encryption key expired"); *enc_key = None; if stage { self.staged_packets.lock().push_back(msg); } (None, true) } else { - debug!("encryption state available, nonce = {}", state.nonce); + log::debug!("encryption state available, nonce = {}", state.nonce); let job = SendJob::new(msg, state.nonce, state.keypair.clone(), self.clone()); if self.outbound.push(job.clone()) { @@ -256,18 +256,20 @@ impl> Peer bool { - debug!("peer.send_staged"); + log::trace!("peer.send_staged"); let mut sent = false; let mut staged = self.staged_packets.lock(); loop { @@ -282,7 +284,7 @@ impl> Peer) { - debug!("peer.confirm_key"); + log::trace!("peer.confirm_key"); { // take lock and check keypair = keys.next let mut keys = self.keys.lock(); @@ -329,7 +331,7 @@ impl> PeerHandle> PeerHandle Option { - debug!("peer.get_endpoint"); + log::trace!("peer.get_endpoint"); self.peer.endpoint.lock().as_ref().map(|e| e.into_address()) } /// Zero all key-material related to the peer pub fn zero_keys(&self) { - debug!("peer.zero_keys"); + log::trace!("peer.zero_keys"); let mut release: Vec = Vec::with_capacity(3); let mut keys = self.peer.keys.lock(); @@ -416,7 +418,7 @@ impl> PeerHandle> PeerHandle> PeerHandle> { - ready: AtomicBool, - buffer: Mutex<(Option, Vec)>, // endpoint & ciphertext buffer + ready: AtomicBool, // job status + buffer: Mutex<(Option, Vec)>, // endpoint & ciphertext buffer state: Arc>, // decryption state (keys and replay protector) } @@ -53,26 +53,41 @@ impl> ParallelJob &self.0.state.peer.inbound } + /* The parallel section of an incoming job: + * + * - Decryption. + * - Crypto-key routing lookup. + * + * Note: We truncate the message buffer to 0 bytes in case of authentication failure + * or crypto-key routing failure (attempted impersonation). + * + * Note: We cannot do replay protection in the parallel job, + * since this can cause dropping of packets (leaving the window) due to scheduling. + */ fn parallel_work(&self) { - // TODO: refactor + debug_assert_eq!( + self.is_ready(), + false, + "doing parallel work on completed job" + ); + log::trace!("processing parallel receive job"); + // decrypt { + // closure for locking let job = &self.0; let peer = &job.state.peer; let mut msg = job.buffer.lock(); - // cast to header followed by payload - let (header, packet): (LayoutVerified<&mut [u8], TransportHeader>, &mut [u8]) = - match LayoutVerified::new_from_prefix(&mut msg.1[..]) { - Some(v) => v, - None => { - log::debug!("inbound worker: failed to parse message"); - return; - } - }; - - // authenticate and decrypt payload - { + // process buffer + let ok = (|| { + // cast to header followed by payload + let (header, packet): (LayoutVerified<&mut [u8], TransportHeader>, &mut [u8]) = + match LayoutVerified::new_from_prefix(&mut msg.1[..]) { + Some(v) => v, + None => return false, + }; + // create nonce object let mut nonce = [0u8; 12]; debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len()); @@ -87,47 +102,24 @@ impl> ParallelJob // attempt to open (and authenticate) the body match key.open_in_place(nonce, Aad::empty(), packet) { Ok(_) => (), - Err(_) => { - // fault and return early - log::trace!("inbound worker: authentication failure"); - msg.1.truncate(0); - return; - } + Err(_) => return false, } - } - - // check that counter not after reject - if header.f_counter.get() >= REJECT_AFTER_MESSAGES { - msg.1.truncate(0); - return; - } - // cryptokey route and strip padding - let inner_len = { - let length = packet.len() - SIZE_TAG; - if length > 0 { - peer.device.table.check_route(&peer, &packet[..length]) - } else { - Some(0) + // check that counter not after reject + if header.f_counter.get() >= REJECT_AFTER_MESSAGES { + return false; } - }; - // truncate to remove tag - match inner_len { - None => { - log::trace!("inbound worker: cryptokey routing failed"); - msg.1.truncate(0); - } - Some(len) => { - log::trace!( - "inbound worker: good route, length = {} {}", - len, - if len == 0 { "(keepalive)" } else { "" } - ); - msg.1.truncate(mem::size_of::() + len); - } + // check crypto-key router + packet.len() == SIZE_KEEPALIVE || peer.device.table.check_route(&peer, &packet) + })(); + + // remove message in case of failure: + // to indicate failure and avoid later accidental use of unauthenticated data. + if !ok { + msg.1.truncate(0); } - } + }; // mark ready self.0.ready.store(true, Ordering::Release); @@ -142,6 +134,13 @@ impl> SequentialJob } fn sequential_work(self) { + debug_assert_eq!( + self.is_ready(), + true, + "doing sequential work on an incomplete job" + ); + log::trace!("processing sequential receive job"); + let job = &self.0; let peer = &job.state.peer; let mut msg = job.buffer.lock(); @@ -152,7 +151,7 @@ impl> SequentialJob match LayoutVerified::new_from_prefix(&msg.1[..]) { Some(v) => v, None => { - // also covers authentication failure + // also covers authentication failure (will fail to parse header) return; } }; @@ -173,20 +172,16 @@ impl> SequentialJob *peer.endpoint.lock() = endpoint; // check if should be written to TUN - let mut sent = false; - if packet.len() > 0 { - sent = match peer.device.inbound.write(&packet[..]) { - Err(e) => { + // (keep-alive and malformed packets will have no inner length) + if let Some(inner) = inner_length(packet) { + if inner >= packet.len() { + let _ = peer.device.inbound.write(&packet[..inner]).map_err(|e| { log::debug!("failed to write inbound packet to TUN: {:?}", e); - false - } - Ok(_) => true, + }); } - } else { - log::debug!("inbound worker: received keepalive") } // trigger callback - C::recv(&peer.opaque, msg.1.len(), sent, &job.state.keypair); + C::recv(&peer.opaque, msg.1.len(), true, &job.state.keypair); } } diff --git a/src/wireguard/router/route.rs b/src/wireguard/router/route.rs index 7256232..3680157 100644 --- a/src/wireguard/router/route.rs +++ b/src/wireguard/router/route.rs @@ -1,13 +1,11 @@ use super::ip::*; -use zerocopy::LayoutVerified; - -use std::mem; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use spin::RwLock; use treebitmap::address::Address; use treebitmap::IpLookupTable; +use zerocopy::LayoutVerified; /* Functions for obtaining and validating "cryptokey" routes */ @@ -115,53 +113,26 @@ impl RoutingTable { } #[inline(always)] - pub fn check_route(&self, peer: &T, packet: &[u8]) -> Option { - match packet.get(0)? >> 4 { - VERSION_IP4 => { - // check length and cast to IPv4 header - let (header, _): (LayoutVerified<&[u8], IPv4Header>, _) = - LayoutVerified::new_from_prefix(packet)?; - - log::trace!( - "router, check route for IPv4 source: {:?}", - Ipv4Addr::from(header.f_source) - ); - - // check IPv4 source address - self.ipv4 - .read() - .longest_match(Ipv4Addr::from(header.f_source)) - .and_then(|(_, _, p)| { - if p == peer { - Some(header.f_total_len.get() as usize) - } else { - None - } - }) - } - VERSION_IP6 => { - // check length and cast to IPv6 header - let (header, _): (LayoutVerified<&[u8], IPv6Header>, _) = - LayoutVerified::new_from_prefix(packet)?; - - log::trace!( - "router, check route for IPv6 source: {:?}", - Ipv6Addr::from(header.f_source) - ); - - // check IPv6 source address - self.ipv6 - .read() - .longest_match(Ipv6Addr::from(header.f_source)) - .and_then(|(_, _, p)| { - if p == peer { - Some(header.f_len.get() as usize + mem::size_of::()) - } else { - None - } - }) - } - _ => None, + pub fn check_route(&self, peer: &T, packet: &[u8]) -> bool { + match packet.get(0).map(|v| v >> 4) { + Some(VERSION_IP4) => LayoutVerified::new_from_prefix(packet) + .and_then(|(header, _): (LayoutVerified<&[u8], IPv4Header>, _)| { + self.ipv4 + .read() + .longest_match(Ipv4Addr::from(header.f_source)) + .map(|(_, _, p)| p == peer) + }) + .is_some(), + + Some(VERSION_IP6) => LayoutVerified::new_from_prefix(packet) + .and_then(|(header, _): (LayoutVerified<&[u8], IPv6Header>, _)| { + self.ipv6 + .read() + .longest_match(Ipv6Addr::from(header.f_source)) + .map(|(_, _, p)| p == peer) + }) + .is_some(), + _ => false, } } } diff --git a/src/wireguard/router/send.rs b/src/wireguard/router/send.rs index 8e41796..db6b079 100644 --- a/src/wireguard/router/send.rs +++ b/src/wireguard/router/send.rs @@ -1,9 +1,9 @@ -use super::queue::{SequentialJob, ParallelJob, Queue}; -use super::KeyPair; -use super::types::Callbacks; +use super::messages::{TransportHeader, TYPE_TRANSPORT}; use super::peer::Peer; +use super::queue::{ParallelJob, Queue, SequentialJob}; +use super::types::Callbacks; +use super::KeyPair; use super::{REJECT_AFTER_MESSAGES, SIZE_TAG}; -use super::messages::{TransportHeader, TYPE_TRANSPORT}; use super::super::{tun, udp, Endpoint}; @@ -11,8 +11,8 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305}; -use zerocopy::{AsBytes, LayoutVerified}; use spin::Mutex; +use zerocopy::{AsBytes, LayoutVerified}; struct Inner> { ready: AtomicBool, @@ -22,67 +22,36 @@ struct Inner> { peer: Peer, } -pub struct SendJob> ( - Arc> +pub struct SendJob>( + Arc>, ); -impl > Clone for SendJob { +impl> Clone for SendJob { fn clone(&self) -> SendJob { SendJob(self.0.clone()) } } -impl > SendJob { +impl> SendJob { pub fn new( buffer: Vec, counter: u64, keypair: Arc, - peer: Peer + peer: Peer, ) -> SendJob { - SendJob(Arc::new(Inner{ + SendJob(Arc::new(Inner { buffer: Mutex::new(buffer), counter, keypair, peer, - ready: AtomicBool::new(false) + ready: AtomicBool::new(false), })) } } -impl > SequentialJob for SendJob { - - fn is_ready(&self) -> bool { - self.0.ready.load(Ordering::Acquire) - } - - fn sequential_work(self) { - debug_assert_eq!( - self.is_ready(), - true, - "doing sequential work - on an incomplete job" - ); - log::trace!("processing sequential send job"); - - // send to peer - let job = &self.0; - let msg = job.buffer.lock(); - let xmit = job.peer.send_raw(&msg[..]).is_ok(); - - // trigger callback (for timers) - C::send( - &job.peer.opaque, - msg.len(), - xmit, - &job.keypair, - job.counter, - ); - } -} - - -impl > ParallelJob for SendJob { - +impl> ParallelJob + for SendJob +{ fn queue(&self) -> &Queue { &self.0.peer.outbound } @@ -140,4 +109,30 @@ impl > ParallelJob // mark ready self.0.ready.store(true, Ordering::Release); } -} \ No newline at end of file +} + +impl> SequentialJob + for SendJob +{ + fn is_ready(&self) -> bool { + self.0.ready.load(Ordering::Acquire) + } + + fn sequential_work(self) { + debug_assert_eq!( + self.is_ready(), + true, + "doing sequential work + on an incomplete job" + ); + log::trace!("processing sequential send job"); + + // send to peer + let job = &self.0; + let msg = job.buffer.lock(); + let xmit = job.peer.send_raw(&msg[..]).is_ok(); + + // trigger callback (for timers) + C::send(&job.peer.opaque, msg.len(), xmit, &job.keypair, job.counter); + } +} diff --git a/src/wireguard/router/tests.rs b/src/wireguard/router/tests.rs index 3d5c79b..3afa422 100644 --- a/src/wireguard/router/tests.rs +++ b/src/wireguard/router/tests.rs @@ -1,229 +1,264 @@ +use super::KeyPair; +use super::SIZE_MESSAGE_PREFIX; +use super::{Callbacks, Device}; + +use super::SIZE_KEEPALIVE; + +use super::super::dummy; +use super::super::dummy_keypair; +use super::super::tests::make_packet; + +use crate::platform::udp::Reader; + use std::net::IpAddr; +use std::ops::Deref; +use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; +use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; use std::sync::Arc; use std::sync::Mutex; -use std::thread; use std::time::Duration; +use env_logger; use num_cpus; - -use super::super::dummy; -use super::super::dummy_keypair; -use super::super::tests::make_packet; -use super::super::udp::*; -use super::KeyPair; -use super::SIZE_MESSAGE_PREFIX; -use super::{Callbacks, Device}; +use test::Bencher; extern crate test; -const SIZE_KEEPALIVE: usize = 32; - -#[cfg(test)] -mod tests { - use super::*; - use env_logger; - use log::debug; - use std::sync::atomic::AtomicUsize; - use test::Bencher; - - // type for tracking events inside the router module - struct Flags { - send: Mutex>, - recv: Mutex>, - need_key: Mutex>, - key_confirmed: Mutex>, - } +const SIZE_MSG: usize = 1024; - #[derive(Clone)] - struct Opaque(Arc); +const TIMEOUT: Duration = Duration::from_millis(1000); - struct TestCallbacks(); +struct EventTracker { + rx: Mutex>, + tx: Mutex>, +} - impl Opaque { - fn new() -> Opaque { - Opaque(Arc::new(Flags { - send: Mutex::new(vec![]), - recv: Mutex::new(vec![]), - need_key: Mutex::new(vec![]), - key_confirmed: Mutex::new(vec![]), - })) +impl EventTracker { + fn new() -> Self { + let (tx, rx) = channel(); + EventTracker { + rx: Mutex::new(rx), + tx: Mutex::new(tx), } + } - fn reset(&self) { - self.0.send.lock().unwrap().clear(); - self.0.recv.lock().unwrap().clear(); - self.0.need_key.lock().unwrap().clear(); - self.0.key_confirmed.lock().unwrap().clear(); - } + fn log(&self, e: E) { + self.tx.lock().unwrap().send(e).unwrap(); + } - fn send(&self) -> Option<(usize, bool)> { - self.0.send.lock().unwrap().pop() + fn wait(&self, timeout: Duration) -> Option { + match self.rx.lock().unwrap().recv_timeout(timeout) { + Ok(v) => Some(v), + Err(RecvTimeoutError::Timeout) => None, + Err(RecvTimeoutError::Disconnected) => panic!("Disconnect"), } + } - fn recv(&self) -> Option<(usize, bool)> { - self.0.recv.lock().unwrap().pop() - } + fn now(&self) -> Option { + self.wait(Duration::from_millis(0)) + } +} - fn need_key(&self) -> Option<()> { - self.0.need_key.lock().unwrap().pop() - } +// type for tracking events inside the router module +struct Inner { + send: EventTracker<(usize, bool)>, + recv: EventTracker<(usize, bool)>, + need_key: EventTracker<()>, + key_confirmed: EventTracker<()>, +} - fn key_confirmed(&self) -> Option<()> { - self.0.key_confirmed.lock().unwrap().pop() - } +#[derive(Clone)] +struct Opaque { + inner: Arc, +} - // has all events been accounted for by assertions? - fn is_empty(&self) -> bool { - let send = self.0.send.lock().unwrap(); - let recv = self.0.recv.lock().unwrap(); - let need_key = self.0.need_key.lock().unwrap(); - let key_confirmed = self.0.key_confirmed.lock().unwrap(); - send.is_empty() && recv.is_empty() && need_key.is_empty() & key_confirmed.is_empty() - } - } +impl Deref for Opaque { + type Target = Inner; - impl Callbacks for TestCallbacks { - type Opaque = Opaque; + fn deref(&self) -> &Self::Target { + &self.inner + } +} - fn send(t: &Self::Opaque, size: usize, sent: bool, _keypair: &Arc, _counter: u64) { - t.0.send.lock().unwrap().push((size, sent)) +struct TestCallbacks(); + +impl Opaque { + fn new() -> Opaque { + Opaque { + inner: Arc::new(Inner { + send: EventTracker::new(), + recv: EventTracker::new(), + need_key: EventTracker::new(), + key_confirmed: EventTracker::new(), + }), } + } +} - fn recv(t: &Self::Opaque, size: usize, sent: bool, _keypair: &Arc) { - t.0.recv.lock().unwrap().push((size, sent)) - } +macro_rules! no_events { + ($opq:expr) => { + assert_eq!($opq.send.now(), None, "unexpected send event"); + assert_eq!($opq.recv.now(), None, "unexpected recv event"); + assert_eq!($opq.need_key.now(), None, "unexpected need_key event"); + assert_eq!( + $opq.key_confirmed.now(), + None, + "unexpected key_confirmed event" + ); + }; +} - fn need_key(t: &Self::Opaque) { - t.0.need_key.lock().unwrap().push(()); - } +impl Callbacks for TestCallbacks { + type Opaque = Opaque; - fn key_confirmed(t: &Self::Opaque) { - t.0.key_confirmed.lock().unwrap().push(()); - } + fn send(t: &Self::Opaque, size: usize, sent: bool, _keypair: &Arc, _counter: u64) { + t.send.log((size, sent)) } - // wait for scheduling - fn wait() { - thread::sleep(Duration::from_millis(15)); + fn recv(t: &Self::Opaque, size: usize, sent: bool, _keypair: &Arc) { + t.recv.log((size, sent)) } - fn init() { - let _ = env_logger::builder().is_test(true).try_init(); + fn need_key(t: &Self::Opaque) { + t.need_key.log(()); } - fn make_packet_padded(size: usize, src: IpAddr, dst: IpAddr, id: u64) -> Vec { - let p = make_packet(size, src, dst, id); - let mut o = vec![0; p.len() + SIZE_MESSAGE_PREFIX]; - o[SIZE_MESSAGE_PREFIX..SIZE_MESSAGE_PREFIX + p.len()].copy_from_slice(&p[..]); - o + fn key_confirmed(t: &Self::Opaque) { + t.key_confirmed.log(()); } +} - #[bench] - fn bench_outbound(b: &mut Bencher) { - struct BencherCallbacks {} - impl Callbacks for BencherCallbacks { - type Opaque = Arc; - fn send( - t: &Self::Opaque, - size: usize, - _sent: bool, - _keypair: &Arc, - _counter: u64, - ) { - t.fetch_add(size, Ordering::SeqCst); - } - fn recv(_: &Self::Opaque, _size: usize, _sent: bool, _keypair: &Arc) {} - fn need_key(_: &Self::Opaque) {} - fn key_confirmed(_: &Self::Opaque) {} - } - - // create device - let (_fake, _reader, tun_writer, _mtu) = dummy::TunTest::create(false); - let router: Device<_, BencherCallbacks, dummy::TunWriter, dummy::VoidBind> = - Device::new(num_cpus::get(), tun_writer); - - // add new peer - let opaque = Arc::new(AtomicUsize::new(0)); - let peer = router.new_peer(opaque.clone()); - peer.add_keypair(dummy_keypair(true)); - - // add subnet to peer - let (mask, len, dst) = ("192.168.1.0", 24, "192.168.1.20"); - let mask: IpAddr = mask.parse().unwrap(); - peer.add_allowed_ip(mask, len); - - // create "IP packet" - let dst = dst.parse().unwrap(); - let src = match dst { - IpAddr::V4(_) => "127.0.0.1".parse().unwrap(), - IpAddr::V6(_) => "::1".parse().unwrap(), - }; - let msg = make_packet_padded(1024, src, dst, 0); - - // every iteration sends 10 GB - b.iter(|| { - opaque.store(0, Ordering::SeqCst); - while opaque.load(Ordering::Acquire) < 10 * 1024 * 1024 { - router.send(msg.to_vec()).unwrap(); - } - }); - } +fn init() { + let _ = env_logger::builder().is_test(true).try_init(); +} - #[test] - fn test_outbound() { - init(); +fn make_packet_padded(size: usize, src: IpAddr, dst: IpAddr, id: u64) -> Vec { + let p = make_packet(size, src, dst, id); + let mut o = vec![0; p.len() + SIZE_MESSAGE_PREFIX]; + o[SIZE_MESSAGE_PREFIX..SIZE_MESSAGE_PREFIX + p.len()].copy_from_slice(&p[..]); + o +} - // create device - let (_fake, _reader, tun_writer, _mtu) = dummy::TunTest::create(false); - let router: Device<_, TestCallbacks, _, _> = Device::new(1, tun_writer); - router.set_outbound_writer(dummy::VoidBind::new()); +#[bench] +fn bench_outbound(b: &mut Bencher) { + struct BencherCallbacks {} + impl Callbacks for BencherCallbacks { + type Opaque = Arc; + fn send( + t: &Self::Opaque, + size: usize, + _sent: bool, + _keypair: &Arc, + _counter: u64, + ) { + t.fetch_add(size, Ordering::SeqCst); + } + fn recv(_: &Self::Opaque, _size: usize, _sent: bool, _keypair: &Arc) {} + fn need_key(_: &Self::Opaque) {} + fn key_confirmed(_: &Self::Opaque) {} + } - let tests = vec![ - ("192.168.1.0", 24, "192.168.1.20", true), - ("172.133.133.133", 32, "172.133.133.133", true), - ("172.133.133.133", 32, "172.133.133.132", false), - ( - "2001:db8::ff00:42:0000", - 112, - "2001:db8::ff00:42:3242", - true, - ), - ( - "2001:db8::ff00:42:8000", - 113, - "2001:db8::ff00:42:0660", - false, - ), - ( - "2001:db8::ff00:42:8000", - 113, - "2001:db8::ff00:42:ffff", - true, - ), - ]; + // create device + let (_fake, _reader, tun_writer, _mtu) = dummy::TunTest::create(false); + let router: Device<_, BencherCallbacks, dummy::TunWriter, dummy::VoidBind> = + Device::new(num_cpus::get(), tun_writer); + + // add new peer + let opaque = Arc::new(AtomicUsize::new(0)); + let peer = router.new_peer(opaque.clone()); + peer.add_keypair(dummy_keypair(true)); + + // add subnet to peer + let (mask, len, dst) = ("192.168.1.0", 24, "192.168.1.20"); + let mask: IpAddr = mask.parse().unwrap(); + peer.add_allowed_ip(mask, len); + + // create "IP packet" + let dst = dst.parse().unwrap(); + let src = match dst { + IpAddr::V4(_) => "127.0.0.1".parse().unwrap(), + IpAddr::V6(_) => "::1".parse().unwrap(), + }; + let msg = make_packet_padded(1024, src, dst, 0); + + // every iteration sends 10 GB + b.iter(|| { + opaque.store(0, Ordering::SeqCst); + while opaque.load(Ordering::Acquire) < 10 * 1024 * 1024 { + router.send(msg.to_vec()).unwrap(); + } + }); +} - for (num, (mask, len, dst, okay)) in tests.iter().enumerate() { - println!( - "Check: {} {} {}/{}", - dst, - if *okay { "\\in" } else { "\\notin" }, - mask, - len - ); - for set_key in vec![true, false] { - debug!("index = {}, set_key = {}", num, set_key); +#[test] +fn test_outbound() { + init(); + + // create device + let (_fake, _reader, tun_writer, _mtu) = dummy::TunTest::create(false); + let router: Device<_, TestCallbacks, _, _> = Device::new(1, tun_writer); + router.set_outbound_writer(dummy::VoidBind::new()); + + let tests = vec![ + ("192.168.1.0", 24, "192.168.1.20", true), + ("172.133.133.133", 32, "172.133.133.133", true), + ("172.133.133.133", 32, "172.133.133.132", false), + ( + "2001:db8::ff00:42:0000", + 112, + "2001:db8::ff00:42:3242", + true, + ), + ( + "2001:db8::ff00:42:8000", + 113, + "2001:db8::ff00:42:0660", + false, + ), + ( + "2001:db8::ff00:42:8000", + 113, + "2001:db8::ff00:42:ffff", + true, + ), + ]; + + for (mask, len, dst, okay) in tests.iter() { + let len = *len; + let okay = *okay; + + println!( + "Check: {} {} {}/{}", + dst, + if okay { "\\in" } else { "\\notin" }, + mask, + len + ); + + for set_key in vec![true, false] { + for confirm_with_staged_packet in vec![true, false] { + let send_keepalive = (!confirm_with_staged_packet || !okay) && set_key; + let send_payload = okay && set_key; + let need_key = ((confirm_with_staged_packet && set_key) || !set_key) && okay; + + println!( + " confirm_with_staged_packet = {}, send_keepalive = {}, set_key = {}", + confirm_with_staged_packet, send_keepalive, set_key + ); // add new peer let opaque = Opaque::new(); let peer = router.new_peer(opaque.clone()); let mask: IpAddr = mask.parse().unwrap(); - if set_key { + + // confirm using keepalive + if set_key && (!confirm_with_staged_packet) { peer.add_keypair(dummy_keypair(true)); } // map subnet to peer - peer.add_allowed_ip(mask, *len); + peer.add_allowed_ip(mask, len); // create "IP packet" let dst = dst.parse().unwrap(); @@ -231,246 +266,279 @@ mod tests { IpAddr::V4(_) => "127.0.0.1".parse().unwrap(), IpAddr::V6(_) => "::1".parse().unwrap(), }; - let msg = make_packet_padded(1024, src, dst, 0); + let msg = make_packet_padded(SIZE_MSG, src, dst, 0); - // cryptkey route the IP packet + // crypto-key route the IP packet let res = router.send(msg); + assert_eq!( + res.is_ok(), + okay, + "crypto-routing / destination lookup failure" + ); - // allow some scheduling - wait(); + // confirm using staged packet + if set_key && confirm_with_staged_packet { + peer.add_keypair(dummy_keypair(true)); + } - if *okay { - // cryptkey routing succeeded - assert!(res.is_ok(), "crypt-key routing should succeed: {:?}", res); + // check for key-material request + if need_key { assert_eq!( - opaque.need_key().is_some(), - !set_key, + opaque.need_key.wait(TIMEOUT), + Some(()), "should have requested a new key, if no encryption state was set" ); + } + + // check for keepalive + if send_keepalive { assert_eq!( - opaque.send().is_some(), - set_key, - "transmission should have been attempted" - ); - assert!( - opaque.recv().is_none(), - "no messages should have been marked as received" - ); - } else { - // no such cryptkey route - assert!(res.is_err(), "crypt-key routing should fail"); - assert!( - opaque.need_key().is_none(), - "should not request a new-key if crypt-key routing failed" + opaque.send.wait(TIMEOUT), + Some((SIZE_KEEPALIVE, false)), + "keepalive should be sent before transport message" ); + } + + // check for encryption of payload + if send_payload { assert_eq!( - opaque.send(), - if set_key { - Some((SIZE_KEEPALIVE, false)) - } else { - None - }, - "transmission should only happen if key was set (keepalive)", - ); - assert!( - opaque.recv().is_none(), - "no messages should have been marked as received", - ); + opaque.send.wait(TIMEOUT), + Some((SIZE_KEEPALIVE + SIZE_MSG, false)), + "message buffer should be encrypted" + ) } + + // check that we handled all events + no_events!(opaque); } } - - println!("Test complete, drop device"); } +} - #[test] - fn test_bidirectional() { - init(); +#[test] +fn test_bidirectional() { + init(); - let tests = [ + let tests = [ + ( + ("192.168.1.0", 24, "192.168.1.20", true), + ("172.133.133.133", 32, "172.133.133.133", true), + ), + ( + ("192.168.1.0", 24, "192.168.1.20", true), + ("172.133.133.133", 32, "172.133.133.133", true), + ), + ( ( - ("192.168.1.0", 24, "192.168.1.20", true), - ("172.133.133.133", 32, "172.133.133.133", true), + "2001:db8::ff00:42:8000", + 113, + "2001:db8::ff00:42:ffff", + true, ), ( - ("192.168.1.0", 24, "192.168.1.20", true), - ("172.133.133.133", 32, "172.133.133.133", true), + "2001:db8::ff40:42:8000", + 113, + "2001:db8::ff40:42:ffff", + true, ), + ), + ( ( - ( - "2001:db8::ff00:42:8000", - 113, - "2001:db8::ff00:42:ffff", - true, - ), - ( - "2001:db8::ff40:42:8000", - 113, - "2001:db8::ff40:42:ffff", - true, - ), + "2001:db8::ff00:42:8000", + 113, + "2001:db8::ff00:42:ffff", + true, ), ( - ( - "2001:db8::ff00:42:8000", - 113, - "2001:db8::ff00:42:ffff", - true, - ), - ( - "2001:db8::ff40:42:8000", - 113, - "2001:db8::ff40:42:ffff", - true, - ), + "2001:db8::ff40:42:8000", + 113, + "2001:db8::ff40:42:ffff", + true, ), - ]; + ), + ]; - for stage in vec![true, false] { - for (p1, p2) in tests.iter() { - let ((bind_reader1, bind_writer1), (bind_reader2, bind_writer2)) = - dummy::PairBind::pair(); + for (p1, p2) in tests.iter() { + for confirm_with_staged_packet in vec![true, false] { + println!( + "peer1 = {:?}, peer2 = {:?}, confirm_with_staged_packet = {}", + p1, p2, confirm_with_staged_packet + ); - // create matching device - let (_fake, _, tun_writer1, _) = dummy::TunTest::create(false); - let (_fake, _, tun_writer2, _) = dummy::TunTest::create(false); + let ((bind_reader1, bind_writer1), (bind_reader2, bind_writer2)) = + dummy::PairBind::pair(); - let router1: Device<_, TestCallbacks, _, _> = Device::new(1, tun_writer1); - router1.set_outbound_writer(bind_writer1); + let confirm_packet_size = if confirm_with_staged_packet { + SIZE_KEEPALIVE + SIZE_MSG + } else { + SIZE_KEEPALIVE + }; - let router2: Device<_, TestCallbacks, _, _> = Device::new(1, tun_writer2); - router2.set_outbound_writer(bind_writer2); + // create matching device + let (_fake, _, tun_writer1, _) = dummy::TunTest::create(false); + let (_fake, _, tun_writer2, _) = dummy::TunTest::create(false); - // prepare opaque values for tracing callbacks + let router1: Device<_, TestCallbacks, _, _> = Device::new(1, tun_writer1); + router1.set_outbound_writer(bind_writer1); - let opaque1 = Opaque::new(); - let opaque2 = Opaque::new(); + let router2: Device<_, TestCallbacks, _, _> = Device::new(1, tun_writer2); + router2.set_outbound_writer(bind_writer2); - // create peers with matching keypairs and assign subnets + // prepare opaque values for tracing callbacks - let peer1 = router1.new_peer(opaque1.clone()); - let peer2 = router2.new_peer(opaque2.clone()); + let opaque1 = Opaque::new(); + let opaque2 = Opaque::new(); - { - let (mask, len, _ip, _okay) = p1; - let mask: IpAddr = mask.parse().unwrap(); - peer1.add_allowed_ip(mask, *len); - peer1.add_keypair(dummy_keypair(false)); - } + // create peers with matching keypairs and assign subnets - { - let (mask, len, _ip, _okay) = p2; - let mask: IpAddr = mask.parse().unwrap(); - peer2.add_allowed_ip(mask, *len); - peer2.set_endpoint(dummy::UnitEndpoint::new()); - } + let peer1 = router1.new_peer(opaque1.clone()); + let peer2 = router2.new_peer(opaque2.clone()); - if stage { - println!("confirm using staged packet"); - - // create IP packet - let (_mask, _len, ip1, _okay) = p1; - let (_mask, _len, ip2, _okay) = p2; - let msg = make_packet_padded( - 1024, - ip1.parse().unwrap(), // src - ip2.parse().unwrap(), // dst - 0, - ); + { + let (mask, len, _ip, _okay) = p1; + let mask: IpAddr = mask.parse().unwrap(); + peer1.add_allowed_ip(mask, *len); + peer1.add_keypair(dummy_keypair(false)); + } + + { + let (mask, len, _ip, _okay) = p2; + let mask: IpAddr = mask.parse().unwrap(); + peer2.add_allowed_ip(mask, *len); + peer2.set_endpoint(dummy::UnitEndpoint::new()); + } - // stage packet for sending - router2.send(msg).expect("failed to sent staged packet"); - wait(); + if confirm_with_staged_packet { + // create IP packet + let (_mask, _len, ip1, _okay) = p1; + let (_mask, _len, ip2, _okay) = p2; + let msg = make_packet_padded( + SIZE_MSG, + ip1.parse().unwrap(), // src + ip2.parse().unwrap(), // dst + 0, + ); - // validate events - assert!(opaque2.recv().is_none()); - assert!( - opaque2.send().is_none(), - "sending should fail as not key is set" - ); - assert!( - opaque2.need_key().is_some(), - "a new key should be requested since a packet was attempted transmitted" - ); - assert!(opaque2.is_empty(), "callbacks should only run once"); - } + // stage packet for sending + router2.send(msg).expect("failed to sent staged packet"); - // this should cause a key-confirmation packet (keepalive or staged packet) - // this also causes peer1 to learn the "endpoint" for peer2 - assert!(peer1.get_endpoint().is_none()); - peer2.add_keypair(dummy_keypair(true)); + // a new key should have been requested from the handshake machine + assert_eq!( + opaque2.need_key.wait(TIMEOUT), + Some(()), + "a new key should be requested since a packet was attempted transmitted" + ); - wait(); - assert!(opaque2.send().is_some()); - assert!(opaque2.is_empty(), "events on peer2 should be 'send'"); - assert!(opaque1.is_empty(), "nothing should happened on peer1"); + no_events!(opaque1); + no_events!(opaque2); + } - // read confirming message received by the other end ("across the internet") - let mut buf = vec![0u8; 2048]; - let (len, from) = bind_reader1.read(&mut buf).unwrap(); - buf.truncate(len); - router1.recv(from, buf).unwrap(); - - wait(); - assert!(opaque1.recv().is_some()); - assert!(opaque1.key_confirmed().is_some()); - assert!( - opaque1.is_empty(), - "events on peer1 should be 'recv' and 'key_confirmed'" + // add a keypair + assert_eq!(peer1.get_endpoint(), None, "no endpoint has yet been set"); + peer2.add_keypair(dummy_keypair(true)); + + // this should cause a key-confirmation packet (keepalive or staged packet) + assert_eq!( + opaque2.send.wait(TIMEOUT), + Some((confirm_packet_size, true)), + "expected successful transmission of a confirmation packet" + ); + + // no other events should fire + no_events!(opaque1); + no_events!(opaque2); + + // read confirming message received by the other end ("across the internet") + let mut buf = vec![0u8; SIZE_MSG * 2]; + let (len, from) = bind_reader1.read(&mut buf).unwrap(); + buf.truncate(len); + + assert_eq!( + len, + if confirm_with_staged_packet { + SIZE_MSG + SIZE_KEEPALIVE + } else { + SIZE_KEEPALIVE + }, + "unexpected size of confirmation message" + ); + + // pass to the router for processing + router1 + .recv(from, buf) + .expect("failed to receive confirmation message"); + + // check that a receive event is fired + assert_eq!( + opaque1.recv.wait(TIMEOUT), + Some((confirm_packet_size, true)), + "we expect processing to be successful" + ); + + // the key is confirmed + assert_eq!( + opaque1.key_confirmed.wait(TIMEOUT), + Some(()), + "confirmation message should confirm the key" + ); + + // peer1 learns the endpoint + assert!( + peer1.get_endpoint().is_some(), + "peer1 should learn the endpoint of peer2 from the confirmation message (roaming)" + ); + + // no other events should fire + no_events!(opaque1); + no_events!(opaque2); + // now that peer1 has an endpoint + // route packets in the other direction: peer1 -> peer2 + for id in 1..11 { + println!("packet: {}", id); + + let message_size = 1024; + + // pass IP packet to router + let (_mask, _len, ip1, _okay) = p1; + let (_mask, _len, ip2, _okay) = p2; + let msg = make_packet_padded( + message_size, + ip2.parse().unwrap(), // src + ip1.parse().unwrap(), // dst + id, ); - assert!(peer1.get_endpoint().is_some()); - assert!(opaque2.is_empty(), "nothing should happened on peer2"); - // now that peer1 has an endpoint - // route packets : peer1 -> peer2 + router1 + .send(msg) + .expect("we expect routing to be successful"); - for id in 1..11 { - println!("round: {}", id); - assert!( - opaque1.is_empty(), - "we should have asserted a value for every callback on peer1" - ); - assert!( - opaque2.is_empty(), - "we should have asserted a value for every callback on peer2" - ); + // encryption succeeds and the correct size is logged + assert_eq!( + opaque1.send.wait(TIMEOUT), + Some((message_size + SIZE_KEEPALIVE, true)), + "expected send event for peer1 -> peer2 payload" + ); - // pass IP packet to router - let (_mask, _len, ip1, _okay) = p1; - let (_mask, _len, ip2, _okay) = p2; - let msg = make_packet_padded( - 1024, - ip2.parse().unwrap(), // src - ip1.parse().unwrap(), // dst - id, - ); - router1.send(msg).unwrap(); + // otherwise no events + no_events!(opaque1); + no_events!(opaque2); - wait(); - assert!(opaque1.send().is_some(), "encryption should succeed"); - assert!( - opaque1.recv().is_none(), - "receiving callback should not be called" - ); - assert!(opaque1.need_key().is_none()); - - // receive ("across the internet") on the other end - let mut buf = vec![0u8; 2048]; - let (len, from) = bind_reader2.read(&mut buf).unwrap(); - buf.truncate(len); - router2.recv(from, buf).unwrap(); - - wait(); - assert!( - opaque2.send().is_none(), - "sending callback should not be called" - ); - assert!( - opaque2.recv().is_some(), - "decryption and routing should succeed" - ); - assert!(opaque2.need_key().is_none()); - } + // receive ("across the internet") on the other end + let mut buf = vec![0u8; 2048]; + let (len, from) = bind_reader2.read(&mut buf).unwrap(); + buf.truncate(len); + router2.recv(from, buf).unwrap(); + + // check that decryption succeeds + assert_eq!( + opaque2.recv.wait(TIMEOUT), + Some((message_size + SIZE_KEEPALIVE, true)), + "decryption and routing should succeed" + ); + + // otherwise no events + no_events!(opaque1); + no_events!(opaque2); } } } diff --git a/src/wireguard/router/worker.rs b/src/wireguard/router/worker.rs index bbb644c..459a198 100644 --- a/src/wireguard/router/worker.rs +++ b/src/wireguard/router/worker.rs @@ -6,6 +6,7 @@ use super::receive::ReceiveJob; use super::send::SendJob; use crossbeam_channel::Receiver; +use log; pub enum JobUnion> { Outbound(SendJob), @@ -16,8 +17,12 @@ pub fn worker>( receiver: Receiver>, ) { loop { + log::trace!("pool worker awaiting job"); match receiver.recv() { - Err(_) => break, + Err(e) => { + log::debug!("worker stopped with {}", e); + break; + } Ok(JobUnion::Inbound(job)) => { job.parallel_work(); job.queue().consume(); -- cgit v1.2.3-59-g8ed1b