aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/wireguard/router/device.rs28
-rw-r--r--src/wireguard/router/ip.rs23
-rw-r--r--src/wireguard/router/mod.rs1
-rw-r--r--src/wireguard/router/peer.rs32
-rw-r--r--src/wireguard/router/receive.rs123
-rw-r--r--src/wireguard/router/route.rs71
-rw-r--r--src/wireguard/router/send.rs89
-rw-r--r--src/wireguard/router/tests.rs814
-rw-r--r--src/wireguard/router/worker.rs7
9 files changed, 625 insertions, 563 deletions
diff --git a/src/wireguard/router/device.rs b/src/wireguard/router/device.rs
index 9d78178..8bfa261 100644
--- a/src/wireguard/router/device.rs
+++ b/src/wireguard/router/device.rs
@@ -5,7 +5,7 @@ use std::sync::Arc;
use std::thread;
use std::time::Instant;
-use log::debug;
+use log;
use spin::{Mutex, RwLock};
use zerocopy::LayoutVerified;
@@ -91,20 +91,17 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Drop
for DeviceHandle<E, C, T, B>
{
fn drop(&mut self) {
- debug!("router: dropping device");
+ log::debug!("router: dropping device");
// close worker queue
self.state.work.close();
// join all worker threads
- while match self.handles.pop() {
- Some(handle) => {
- handle.thread().unpark();
- handle.join().unwrap();
- true
- }
- _ => false,
- } {}
+ while let Some(handle) = self.handles.pop() {
+ handle.thread().unpark();
+ handle.join().unwrap();
+ }
+ log::debug!("router: joined with all workers from pool");
}
}
@@ -124,8 +121,13 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle<
// start worker threads
let mut threads = Vec::with_capacity(num_workers);
while let Some(rx) = consumers.pop() {
- threads.push(thread::spawn(move || worker(rx)));
+ println!("spawn");
+ threads.push(thread::spawn(move || {
+ println!("spawned");
+ worker(rx);
+ }));
}
+ debug_assert!(num_workers > 0, "zero worker threads");
debug_assert_eq!(threads.len(), num_workers);
// return exported device handle
@@ -135,14 +137,14 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle<
}
}
- pub fn send_raw(&self, msg : &[u8], dst: &mut E) -> Result<(), B::Error> {
+ pub fn send_raw(&self, msg: &[u8], dst: &mut E) -> Result<(), B::Error> {
let bind = self.state.outbound.read();
if bind.0 {
if let Some(bind) = bind.1.as_ref() {
return bind.write(msg, dst);
}
}
- return Ok(())
+ return Ok(());
}
/// Brings the router down.
diff --git a/src/wireguard/router/ip.rs b/src/wireguard/router/ip.rs
index e66144f..532c512 100644
--- a/src/wireguard/router/ip.rs
+++ b/src/wireguard/router/ip.rs
@@ -1,5 +1,8 @@
+use std::mem;
+
use byteorder::BigEndian;
use zerocopy::byteorder::U16;
+use zerocopy::LayoutVerified;
use zerocopy::{AsBytes, FromBytes};
pub const VERSION_IP4: u8 = 4;
@@ -24,3 +27,23 @@ pub struct IPv6Header {
pub f_source: [u8; 16],
pub f_destination: [u8; 16],
}
+
+#[inline(always)]
+pub fn inner_length(packet: &[u8]) -> Option<usize> {
+ match packet.get(0)? >> 4 {
+ VERSION_IP4 => {
+ let (header, _): (LayoutVerified<&[u8], IPv4Header>, _) =
+ LayoutVerified::new_from_prefix(packet)?;
+
+ Some(header.f_total_len.get() as usize)
+ }
+ VERSION_IP6 => {
+ // check length and cast to IPv6 header
+ let (header, _): (LayoutVerified<&[u8], IPv6Header>, _) =
+ LayoutVerified::new_from_prefix(packet)?;
+
+ Some(header.f_len.get() as usize + mem::size_of::<IPv6Header>())
+ }
+ _ => None,
+ }
+}
diff --git a/src/wireguard/router/mod.rs b/src/wireguard/router/mod.rs
index 699c621..19e037f 100644
--- a/src/wireguard/router/mod.rs
+++ b/src/wireguard/router/mod.rs
@@ -24,6 +24,7 @@ use super::types::*;
pub const SIZE_TAG: usize = 16;
pub const SIZE_MESSAGE_PREFIX: usize = mem::size_of::<TransportHeader>();
+pub const SIZE_KEEPALIVE: usize = mem::size_of::<TransportHeader>() + SIZE_TAG;
pub const CAPACITY_MESSAGE_POSTFIX: usize = SIZE_TAG;
pub const fn message_data_len(payload: usize) -> usize {
diff --git a/src/wireguard/router/peer.rs b/src/wireguard/router/peer.rs
index a20908e..689674d 100644
--- a/src/wireguard/router/peer.rs
+++ b/src/wireguard/router/peer.rs
@@ -22,7 +22,7 @@ use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use arraydeque::{ArrayDeque, Wrapping};
-use log::debug;
+use log;
use spin::Mutex;
pub struct KeyWheel {
@@ -148,7 +148,7 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Drop for Peer
*peer.enc_key.lock() = None;
*peer.endpoint.lock() = None;
- debug!("peer dropped & removed from device");
+ log::debug!("peer dropped & removed from device");
}
}
@@ -192,8 +192,6 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> PeerInner<E,
///
/// Unit if packet was sent, or an error indicating why sending failed
pub fn send_raw(&self, msg: &[u8]) -> Result<(), RouterError> {
- debug!("peer.send");
-
// send to endpoint (if known)
match self.endpoint.lock().as_mut() {
Some(endpoint) => {
@@ -227,6 +225,7 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Peer<E, C, T,
let mut enc_key = self.enc_key.lock();
match enc_key.as_mut() {
None => {
+ log::debug!("no key encryption key available");
if stage {
self.staged_packets.lock().push_back(msg);
};
@@ -235,13 +234,14 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Peer<E, C, T,
Some(mut state) => {
// avoid integer overflow in nonce
if state.nonce >= REJECT_AFTER_MESSAGES - 1 {
+ log::debug!("encryption key expired");
*enc_key = None;
if stage {
self.staged_packets.lock().push_back(msg);
}
(None, true)
} else {
- debug!("encryption state available, nonce = {}", state.nonce);
+ log::debug!("encryption state available, nonce = {}", state.nonce);
let job =
SendJob::new(msg, state.nonce, state.keypair.clone(), self.clone());
if self.outbound.push(job.clone()) {
@@ -256,18 +256,20 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Peer<E, C, T,
};
if need_key {
+ log::debug!("request new key");
debug_assert!(job.is_none());
C::need_key(&self.opaque);
};
if let Some(job) = job {
+ log::debug!("schedule outbound job");
self.device.work.send(JobUnion::Outbound(job))
}
}
// Transmit all staged packets
fn send_staged(&self) -> bool {
- debug!("peer.send_staged");
+ log::trace!("peer.send_staged");
let mut sent = false;
let mut staged = self.staged_packets.lock();
loop {
@@ -282,7 +284,7 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Peer<E, C, T,
}
pub(super) fn confirm_key(&self, keypair: &Arc<KeyPair>) {
- debug!("peer.confirm_key");
+ log::trace!("peer.confirm_key");
{
// take lock and check keypair = keys.next
let mut keys = self.keys.lock();
@@ -329,7 +331,7 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> PeerHandle<E,
/// This API still permits support for the "sticky socket" behavior,
/// as sockets should be "unsticked" when manually updating the endpoint
pub fn set_endpoint(&self, endpoint: E) {
- debug!("peer.set_endpoint");
+ log::trace!("peer.set_endpoint");
*self.peer.endpoint.lock() = Some(endpoint);
}
@@ -339,13 +341,13 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> PeerHandle<E,
///
/// Does not convey potential "sticky socket" information
pub fn get_endpoint(&self) -> Option<SocketAddr> {
- debug!("peer.get_endpoint");
+ log::trace!("peer.get_endpoint");
self.peer.endpoint.lock().as_ref().map(|e| e.into_address())
}
/// Zero all key-material related to the peer
pub fn zero_keys(&self) {
- debug!("peer.zero_keys");
+ log::trace!("peer.zero_keys");
let mut release: Vec<u32> = Vec::with_capacity(3);
let mut keys = self.peer.keys.lock();
@@ -416,7 +418,7 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> PeerHandle<E,
// update incoming packet id map
{
- debug!("peer.add_keypair: updating inbound id map");
+ log::trace!("peer.add_keypair: updating inbound id map");
let mut recv = self.peer.device.recv.write();
// purge recv map of previous id
@@ -438,14 +440,14 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> PeerHandle<E,
// schedule confirmation
if initiator {
debug_assert!(self.peer.enc_key.lock().is_some());
- debug!("peer.add_keypair: is initiator, must confirm the key");
+ log::trace!("peer.add_keypair: is initiator, must confirm the key");
// attempt to confirm using staged packets
if !self.peer.send_staged() {
// fall back to keepalive packet
self.send_keepalive();
- debug!("peer.add_keypair: keepalive for confirmation",);
+ log::debug!("peer.add_keypair: keepalive for confirmation",);
}
- debug!("peer.add_keypair: key attempted confirmed");
+ log::trace!("peer.add_keypair: key attempted confirmed");
}
debug_assert!(
@@ -456,7 +458,7 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> PeerHandle<E,
}
pub fn send_keepalive(&self) {
- debug!("peer.send_keepalive");
+ log::trace!("peer.send_keepalive");
self.peer.send(vec![0u8; SIZE_MESSAGE_PREFIX], false)
}
diff --git a/src/wireguard/router/receive.rs b/src/wireguard/router/receive.rs
index c5fe3da..0e5cb0f 100644
--- a/src/wireguard/router/receive.rs
+++ b/src/wireguard/router/receive.rs
@@ -1,12 +1,12 @@
use super::device::DecryptionState;
+use super::ip::inner_length;
use super::messages::TransportHeader;
use super::queue::{ParallelJob, Queue, SequentialJob};
use super::types::Callbacks;
-use super::{REJECT_AFTER_MESSAGES, SIZE_TAG};
+use super::{REJECT_AFTER_MESSAGES, SIZE_KEEPALIVE};
use super::super::{tun, udp, Endpoint};
-use std::mem;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
@@ -15,8 +15,8 @@ use spin::Mutex;
use zerocopy::{AsBytes, LayoutVerified};
struct Inner<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> {
- ready: AtomicBool,
- buffer: Mutex<(Option<E>, Vec<u8>)>, // endpoint & ciphertext buffer
+ ready: AtomicBool, // job status
+ buffer: Mutex<(Option<E>, Vec<u8>)>, // endpoint & ciphertext buffer
state: Arc<DecryptionState<E, C, T, B>>, // decryption state (keys and replay protector)
}
@@ -53,26 +53,41 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> ParallelJob
&self.0.state.peer.inbound
}
+ /* The parallel section of an incoming job:
+ *
+ * - Decryption.
+ * - Crypto-key routing lookup.
+ *
+ * Note: We truncate the message buffer to 0 bytes in case of authentication failure
+ * or crypto-key routing failure (attempted impersonation).
+ *
+ * Note: We cannot do replay protection in the parallel job,
+ * since this can cause dropping of packets (leaving the window) due to scheduling.
+ */
fn parallel_work(&self) {
- // TODO: refactor
+ debug_assert_eq!(
+ self.is_ready(),
+ false,
+ "doing parallel work on completed job"
+ );
+ log::trace!("processing parallel receive job");
+
// decrypt
{
+ // closure for locking
let job = &self.0;
let peer = &job.state.peer;
let mut msg = job.buffer.lock();
- // cast to header followed by payload
- let (header, packet): (LayoutVerified<&mut [u8], TransportHeader>, &mut [u8]) =
- match LayoutVerified::new_from_prefix(&mut msg.1[..]) {
- Some(v) => v,
- None => {
- log::debug!("inbound worker: failed to parse message");
- return;
- }
- };
-
- // authenticate and decrypt payload
- {
+ // process buffer
+ let ok = (|| {
+ // cast to header followed by payload
+ let (header, packet): (LayoutVerified<&mut [u8], TransportHeader>, &mut [u8]) =
+ match LayoutVerified::new_from_prefix(&mut msg.1[..]) {
+ Some(v) => v,
+ None => return false,
+ };
+
// create nonce object
let mut nonce = [0u8; 12];
debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len());
@@ -87,47 +102,24 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> ParallelJob
// attempt to open (and authenticate) the body
match key.open_in_place(nonce, Aad::empty(), packet) {
Ok(_) => (),
- Err(_) => {
- // fault and return early
- log::trace!("inbound worker: authentication failure");
- msg.1.truncate(0);
- return;
- }
+ Err(_) => return false,
}
- }
-
- // check that counter not after reject
- if header.f_counter.get() >= REJECT_AFTER_MESSAGES {
- msg.1.truncate(0);
- return;
- }
- // cryptokey route and strip padding
- let inner_len = {
- let length = packet.len() - SIZE_TAG;
- if length > 0 {
- peer.device.table.check_route(&peer, &packet[..length])
- } else {
- Some(0)
+ // check that counter not after reject
+ if header.f_counter.get() >= REJECT_AFTER_MESSAGES {
+ return false;
}
- };
- // truncate to remove tag
- match inner_len {
- None => {
- log::trace!("inbound worker: cryptokey routing failed");
- msg.1.truncate(0);
- }
- Some(len) => {
- log::trace!(
- "inbound worker: good route, length = {} {}",
- len,
- if len == 0 { "(keepalive)" } else { "" }
- );
- msg.1.truncate(mem::size_of::<TransportHeader>() + len);
- }
+ // check crypto-key router
+ packet.len() == SIZE_KEEPALIVE || peer.device.table.check_route(&peer, &packet)
+ })();
+
+ // remove message in case of failure:
+ // to indicate failure and avoid later accidental use of unauthenticated data.
+ if !ok {
+ msg.1.truncate(0);
}
- }
+ };
// mark ready
self.0.ready.store(true, Ordering::Release);
@@ -142,6 +134,13 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> SequentialJob
}
fn sequential_work(self) {
+ debug_assert_eq!(
+ self.is_ready(),
+ true,
+ "doing sequential work on an incomplete job"
+ );
+ log::trace!("processing sequential receive job");
+
let job = &self.0;
let peer = &job.state.peer;
let mut msg = job.buffer.lock();
@@ -152,7 +151,7 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> SequentialJob
match LayoutVerified::new_from_prefix(&msg.1[..]) {
Some(v) => v,
None => {
- // also covers authentication failure
+ // also covers authentication failure (will fail to parse header)
return;
}
};
@@ -173,20 +172,16 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> SequentialJob
*peer.endpoint.lock() = endpoint;
// check if should be written to TUN
- let mut sent = false;
- if packet.len() > 0 {
- sent = match peer.device.inbound.write(&packet[..]) {
- Err(e) => {
+ // (keep-alive and malformed packets will have no inner length)
+ if let Some(inner) = inner_length(packet) {
+ if inner >= packet.len() {
+ let _ = peer.device.inbound.write(&packet[..inner]).map_err(|e| {
log::debug!("failed to write inbound packet to TUN: {:?}", e);
- false
- }
- Ok(_) => true,
+ });
}
- } else {
- log::debug!("inbound worker: received keepalive")
}
// trigger callback
- C::recv(&peer.opaque, msg.1.len(), sent, &job.state.keypair);
+ C::recv(&peer.opaque, msg.1.len(), true, &job.state.keypair);
}
}
diff --git a/src/wireguard/router/route.rs b/src/wireguard/router/route.rs
index 7256232..3680157 100644
--- a/src/wireguard/router/route.rs
+++ b/src/wireguard/router/route.rs
@@ -1,13 +1,11 @@
use super::ip::*;
-use zerocopy::LayoutVerified;
-
-use std::mem;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use spin::RwLock;
use treebitmap::address::Address;
use treebitmap::IpLookupTable;
+use zerocopy::LayoutVerified;
/* Functions for obtaining and validating "cryptokey" routes */
@@ -115,53 +113,26 @@ impl<T: Eq + Clone> RoutingTable<T> {
}
#[inline(always)]
- pub fn check_route(&self, peer: &T, packet: &[u8]) -> Option<usize> {
- match packet.get(0)? >> 4 {
- VERSION_IP4 => {
- // check length and cast to IPv4 header
- let (header, _): (LayoutVerified<&[u8], IPv4Header>, _) =
- LayoutVerified::new_from_prefix(packet)?;
-
- log::trace!(
- "router, check route for IPv4 source: {:?}",
- Ipv4Addr::from(header.f_source)
- );
-
- // check IPv4 source address
- self.ipv4
- .read()
- .longest_match(Ipv4Addr::from(header.f_source))
- .and_then(|(_, _, p)| {
- if p == peer {
- Some(header.f_total_len.get() as usize)
- } else {
- None
- }
- })
- }
- VERSION_IP6 => {
- // check length and cast to IPv6 header
- let (header, _): (LayoutVerified<&[u8], IPv6Header>, _) =
- LayoutVerified::new_from_prefix(packet)?;
-
- log::trace!(
- "router, check route for IPv6 source: {:?}",
- Ipv6Addr::from(header.f_source)
- );
-
- // check IPv6 source address
- self.ipv6
- .read()
- .longest_match(Ipv6Addr::from(header.f_source))
- .and_then(|(_, _, p)| {
- if p == peer {
- Some(header.f_len.get() as usize + mem::size_of::<IPv6Header>())
- } else {
- None
- }
- })
- }
- _ => None,
+ pub fn check_route(&self, peer: &T, packet: &[u8]) -> bool {
+ match packet.get(0).map(|v| v >> 4) {
+ Some(VERSION_IP4) => LayoutVerified::new_from_prefix(packet)
+ .and_then(|(header, _): (LayoutVerified<&[u8], IPv4Header>, _)| {
+ self.ipv4
+ .read()
+ .longest_match(Ipv4Addr::from(header.f_source))
+ .map(|(_, _, p)| p == peer)
+ })
+ .is_some(),
+
+ Some(VERSION_IP6) => LayoutVerified::new_from_prefix(packet)
+ .and_then(|(header, _): (LayoutVerified<&[u8], IPv6Header>, _)| {
+ self.ipv6
+ .read()
+ .longest_match(Ipv6Addr::from(header.f_source))
+ .map(|(_, _, p)| p == peer)
+ })
+ .is_some(),
+ _ => false,
}
}
}
diff --git a/src/wireguard/router/send.rs b/src/wireguard/router/send.rs
index 8e41796..db6b079 100644
--- a/src/wireguard/router/send.rs
+++ b/src/wireguard/router/send.rs
@@ -1,9 +1,9 @@
-use super::queue::{SequentialJob, ParallelJob, Queue};
-use super::KeyPair;
-use super::types::Callbacks;
+use super::messages::{TransportHeader, TYPE_TRANSPORT};
use super::peer::Peer;
+use super::queue::{ParallelJob, Queue, SequentialJob};
+use super::types::Callbacks;
+use super::KeyPair;
use super::{REJECT_AFTER_MESSAGES, SIZE_TAG};
-use super::messages::{TransportHeader, TYPE_TRANSPORT};
use super::super::{tun, udp, Endpoint};
@@ -11,8 +11,8 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305};
-use zerocopy::{AsBytes, LayoutVerified};
use spin::Mutex;
+use zerocopy::{AsBytes, LayoutVerified};
struct Inner<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> {
ready: AtomicBool,
@@ -22,67 +22,36 @@ struct Inner<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> {
peer: Peer<E, C, T, B>,
}
-pub struct SendJob<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> (
- Arc<Inner<E, C, T, B>>
+pub struct SendJob<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>(
+ Arc<Inner<E, C, T, B>>,
);
-impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Clone for SendJob<E, C, T, B> {
+impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Clone for SendJob<E, C, T, B> {
fn clone(&self) -> SendJob<E, C, T, B> {
SendJob(self.0.clone())
}
}
-impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> SendJob<E, C, T, B> {
+impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> SendJob<E, C, T, B> {
pub fn new(
buffer: Vec<u8>,
counter: u64,
keypair: Arc<KeyPair>,
- peer: Peer<E, C, T, B>
+ peer: Peer<E, C, T, B>,
) -> SendJob<E, C, T, B> {
- SendJob(Arc::new(Inner{
+ SendJob(Arc::new(Inner {
buffer: Mutex::new(buffer),
counter,
keypair,
peer,
- ready: AtomicBool::new(false)
+ ready: AtomicBool::new(false),
}))
}
}
-impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> SequentialJob for SendJob<E, C, T, B> {
-
- fn is_ready(&self) -> bool {
- self.0.ready.load(Ordering::Acquire)
- }
-
- fn sequential_work(self) {
- debug_assert_eq!(
- self.is_ready(),
- true,
- "doing sequential work
- on an incomplete job"
- );
- log::trace!("processing sequential send job");
-
- // send to peer
- let job = &self.0;
- let msg = job.buffer.lock();
- let xmit = job.peer.send_raw(&msg[..]).is_ok();
-
- // trigger callback (for timers)
- C::send(
- &job.peer.opaque,
- msg.len(),
- xmit,
- &job.keypair,
- job.counter,
- );
- }
-}
-
-
-impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> ParallelJob for SendJob<E, C, T, B> {
-
+impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> ParallelJob
+ for SendJob<E, C, T, B>
+{
fn queue(&self) -> &Queue<Self> {
&self.0.peer.outbound
}
@@ -140,4 +109,30 @@ impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> ParallelJob
// mark ready
self.0.ready.store(true, Ordering::Release);
}
-} \ No newline at end of file
+}
+
+impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> SequentialJob
+ for SendJob<E, C, T, B>
+{
+ fn is_ready(&self) -> bool {
+ self.0.ready.load(Ordering::Acquire)
+ }
+
+ fn sequential_work(self) {
+ debug_assert_eq!(
+ self.is_ready(),
+ true,
+ "doing sequential work
+ on an incomplete job"
+ );
+ log::trace!("processing sequential send job");
+
+ // send to peer
+ let job = &self.0;
+ let msg = job.buffer.lock();
+ let xmit = job.peer.send_raw(&msg[..]).is_ok();
+
+ // trigger callback (for timers)
+ C::send(&job.peer.opaque, msg.len(), xmit, &job.keypair, job.counter);
+ }
+}
diff --git a/src/wireguard/router/tests.rs b/src/wireguard/router/tests.rs
index 3d5c79b..3afa422 100644
--- a/src/wireguard/router/tests.rs
+++ b/src/wireguard/router/tests.rs
@@ -1,229 +1,264 @@
+use super::KeyPair;
+use super::SIZE_MESSAGE_PREFIX;
+use super::{Callbacks, Device};
+
+use super::SIZE_KEEPALIVE;
+
+use super::super::dummy;
+use super::super::dummy_keypair;
+use super::super::tests::make_packet;
+
+use crate::platform::udp::Reader;
+
use std::net::IpAddr;
+use std::ops::Deref;
+use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
+use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
use std::sync::Arc;
use std::sync::Mutex;
-use std::thread;
use std::time::Duration;
+use env_logger;
use num_cpus;
-
-use super::super::dummy;
-use super::super::dummy_keypair;
-use super::super::tests::make_packet;
-use super::super::udp::*;
-use super::KeyPair;
-use super::SIZE_MESSAGE_PREFIX;
-use super::{Callbacks, Device};
+use test::Bencher;
extern crate test;
-const SIZE_KEEPALIVE: usize = 32;
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use env_logger;
- use log::debug;
- use std::sync::atomic::AtomicUsize;
- use test::Bencher;
-
- // type for tracking events inside the router module
- struct Flags {
- send: Mutex<Vec<(usize, bool)>>,
- recv: Mutex<Vec<(usize, bool)>>,
- need_key: Mutex<Vec<()>>,
- key_confirmed: Mutex<Vec<()>>,
- }
+const SIZE_MSG: usize = 1024;
- #[derive(Clone)]
- struct Opaque(Arc<Flags>);
+const TIMEOUT: Duration = Duration::from_millis(1000);
- struct TestCallbacks();
+struct EventTracker<E> {
+ rx: Mutex<Receiver<E>>,
+ tx: Mutex<Sender<E>>,
+}
- impl Opaque {
- fn new() -> Opaque {
- Opaque(Arc::new(Flags {
- send: Mutex::new(vec![]),
- recv: Mutex::new(vec![]),
- need_key: Mutex::new(vec![]),
- key_confirmed: Mutex::new(vec![]),
- }))
+impl<E> EventTracker<E> {
+ fn new() -> Self {
+ let (tx, rx) = channel();
+ EventTracker {
+ rx: Mutex::new(rx),
+ tx: Mutex::new(tx),
}
+ }
- fn reset(&self) {
- self.0.send.lock().unwrap().clear();
- self.0.recv.lock().unwrap().clear();
- self.0.need_key.lock().unwrap().clear();
- self.0.key_confirmed.lock().unwrap().clear();
- }
+ fn log(&self, e: E) {
+ self.tx.lock().unwrap().send(e).unwrap();
+ }
- fn send(&self) -> Option<(usize, bool)> {
- self.0.send.lock().unwrap().pop()
+ fn wait(&self, timeout: Duration) -> Option<E> {
+ match self.rx.lock().unwrap().recv_timeout(timeout) {
+ Ok(v) => Some(v),
+ Err(RecvTimeoutError::Timeout) => None,
+ Err(RecvTimeoutError::Disconnected) => panic!("Disconnect"),
}
+ }
- fn recv(&self) -> Option<(usize, bool)> {
- self.0.recv.lock().unwrap().pop()
- }
+ fn now(&self) -> Option<E> {
+ self.wait(Duration::from_millis(0))
+ }
+}
- fn need_key(&self) -> Option<()> {
- self.0.need_key.lock().unwrap().pop()
- }
+// type for tracking events inside the router module
+struct Inner {
+ send: EventTracker<(usize, bool)>,
+ recv: EventTracker<(usize, bool)>,
+ need_key: EventTracker<()>,
+ key_confirmed: EventTracker<()>,
+}
- fn key_confirmed(&self) -> Option<()> {
- self.0.key_confirmed.lock().unwrap().pop()
- }
+#[derive(Clone)]
+struct Opaque {
+ inner: Arc<Inner>,
+}
- // has all events been accounted for by assertions?
- fn is_empty(&self) -> bool {
- let send = self.0.send.lock().unwrap();
- let recv = self.0.recv.lock().unwrap();
- let need_key = self.0.need_key.lock().unwrap();
- let key_confirmed = self.0.key_confirmed.lock().unwrap();
- send.is_empty() && recv.is_empty() && need_key.is_empty() & key_confirmed.is_empty()
- }
- }
+impl Deref for Opaque {
+ type Target = Inner;
- impl Callbacks for TestCallbacks {
- type Opaque = Opaque;
+ fn deref(&self) -> &Self::Target {
+ &self.inner
+ }
+}
- fn send(t: &Self::Opaque, size: usize, sent: bool, _keypair: &Arc<KeyPair>, _counter: u64) {
- t.0.send.lock().unwrap().push((size, sent))
+struct TestCallbacks();
+
+impl Opaque {
+ fn new() -> Opaque {
+ Opaque {
+ inner: Arc::new(Inner {
+ send: EventTracker::new(),
+ recv: EventTracker::new(),
+ need_key: EventTracker::new(),
+ key_confirmed: EventTracker::new(),
+ }),
}
+ }
+}
- fn recv(t: &Self::Opaque, size: usize, sent: bool, _keypair: &Arc<KeyPair>) {
- t.0.recv.lock().unwrap().push((size, sent))
- }
+macro_rules! no_events {
+ ($opq:expr) => {
+ assert_eq!($opq.send.now(), None, "unexpected send event");
+ assert_eq!($opq.recv.now(), None, "unexpected recv event");
+ assert_eq!($opq.need_key.now(), None, "unexpected need_key event");
+ assert_eq!(
+ $opq.key_confirmed.now(),
+ None,
+ "unexpected key_confirmed event"
+ );
+ };
+}
- fn need_key(t: &Self::Opaque) {
- t.0.need_key.lock().unwrap().push(());
- }
+impl Callbacks for TestCallbacks {
+ type Opaque = Opaque;
- fn key_confirmed(t: &Self::Opaque) {
- t.0.key_confirmed.lock().unwrap().push(());
- }
+ fn send(t: &Self::Opaque, size: usize, sent: bool, _keypair: &Arc<KeyPair>, _counter: u64) {
+ t.send.log((size, sent))
}
- // wait for scheduling
- fn wait() {
- thread::sleep(Duration::from_millis(15));
+ fn recv(t: &Self::Opaque, size: usize, sent: bool, _keypair: &Arc<KeyPair>) {
+ t.recv.log((size, sent))
}
- fn init() {
- let _ = env_logger::builder().is_test(true).try_init();
+ fn need_key(t: &Self::Opaque) {
+ t.need_key.log(());
}
- fn make_packet_padded(size: usize, src: IpAddr, dst: IpAddr, id: u64) -> Vec<u8> {
- let p = make_packet(size, src, dst, id);
- let mut o = vec![0; p.len() + SIZE_MESSAGE_PREFIX];
- o[SIZE_MESSAGE_PREFIX..SIZE_MESSAGE_PREFIX + p.len()].copy_from_slice(&p[..]);
- o
+ fn key_confirmed(t: &Self::Opaque) {
+ t.key_confirmed.log(());
}
+}
- #[bench]
- fn bench_outbound(b: &mut Bencher) {
- struct BencherCallbacks {}
- impl Callbacks for BencherCallbacks {
- type Opaque = Arc<AtomicUsize>;
- fn send(
- t: &Self::Opaque,
- size: usize,
- _sent: bool,
- _keypair: &Arc<KeyPair>,
- _counter: u64,
- ) {
- t.fetch_add(size, Ordering::SeqCst);
- }
- fn recv(_: &Self::Opaque, _size: usize, _sent: bool, _keypair: &Arc<KeyPair>) {}
- fn need_key(_: &Self::Opaque) {}
- fn key_confirmed(_: &Self::Opaque) {}
- }
-
- // create device
- let (_fake, _reader, tun_writer, _mtu) = dummy::TunTest::create(false);
- let router: Device<_, BencherCallbacks, dummy::TunWriter, dummy::VoidBind> =
- Device::new(num_cpus::get(), tun_writer);
-
- // add new peer
- let opaque = Arc::new(AtomicUsize::new(0));
- let peer = router.new_peer(opaque.clone());
- peer.add_keypair(dummy_keypair(true));
-
- // add subnet to peer
- let (mask, len, dst) = ("192.168.1.0", 24, "192.168.1.20");
- let mask: IpAddr = mask.parse().unwrap();
- peer.add_allowed_ip(mask, len);
-
- // create "IP packet"
- let dst = dst.parse().unwrap();
- let src = match dst {
- IpAddr::V4(_) => "127.0.0.1".parse().unwrap(),
- IpAddr::V6(_) => "::1".parse().unwrap(),
- };
- let msg = make_packet_padded(1024, src, dst, 0);
-
- // every iteration sends 10 GB
- b.iter(|| {
- opaque.store(0, Ordering::SeqCst);
- while opaque.load(Ordering::Acquire) < 10 * 1024 * 1024 {
- router.send(msg.to_vec()).unwrap();
- }
- });
- }
+fn init() {
+ let _ = env_logger::builder().is_test(true).try_init();
+}
- #[test]
- fn test_outbound() {
- init();
+fn make_packet_padded(size: usize, src: IpAddr, dst: IpAddr, id: u64) -> Vec<u8> {
+ let p = make_packet(size, src, dst, id);
+ let mut o = vec![0; p.len() + SIZE_MESSAGE_PREFIX];
+ o[SIZE_MESSAGE_PREFIX..SIZE_MESSAGE_PREFIX + p.len()].copy_from_slice(&p[..]);
+ o
+}
- // create device
- let (_fake, _reader, tun_writer, _mtu) = dummy::TunTest::create(false);
- let router: Device<_, TestCallbacks, _, _> = Device::new(1, tun_writer);
- router.set_outbound_writer(dummy::VoidBind::new());
+#[bench]
+fn bench_outbound(b: &mut Bencher) {
+ struct BencherCallbacks {}
+ impl Callbacks for BencherCallbacks {
+ type Opaque = Arc<AtomicUsize>;
+ fn send(
+ t: &Self::Opaque,
+ size: usize,
+ _sent: bool,
+ _keypair: &Arc<KeyPair>,
+ _counter: u64,
+ ) {
+ t.fetch_add(size, Ordering::SeqCst);
+ }
+ fn recv(_: &Self::Opaque, _size: usize, _sent: bool, _keypair: &Arc<KeyPair>) {}
+ fn need_key(_: &Self::Opaque) {}
+ fn key_confirmed(_: &Self::Opaque) {}
+ }
- let tests = vec![
- ("192.168.1.0", 24, "192.168.1.20", true),
- ("172.133.133.133", 32, "172.133.133.133", true),
- ("172.133.133.133", 32, "172.133.133.132", false),
- (
- "2001:db8::ff00:42:0000",
- 112,
- "2001:db8::ff00:42:3242",
- true,
- ),
- (
- "2001:db8::ff00:42:8000",
- 113,
- "2001:db8::ff00:42:0660",
- false,
- ),
- (
- "2001:db8::ff00:42:8000",
- 113,
- "2001:db8::ff00:42:ffff",
- true,
- ),
- ];
+ // create device
+ let (_fake, _reader, tun_writer, _mtu) = dummy::TunTest::create(false);
+ let router: Device<_, BencherCallbacks, dummy::TunWriter, dummy::VoidBind> =
+ Device::new(num_cpus::get(), tun_writer);
+
+ // add new peer
+ let opaque = Arc::new(AtomicUsize::new(0));
+ let peer = router.new_peer(opaque.clone());
+ peer.add_keypair(dummy_keypair(true));
+
+ // add subnet to peer
+ let (mask, len, dst) = ("192.168.1.0", 24, "192.168.1.20");
+ let mask: IpAddr = mask.parse().unwrap();
+ peer.add_allowed_ip(mask, len);
+
+ // create "IP packet"
+ let dst = dst.parse().unwrap();
+ let src = match dst {
+ IpAddr::V4(_) => "127.0.0.1".parse().unwrap(),
+ IpAddr::V6(_) => "::1".parse().unwrap(),
+ };
+ let msg = make_packet_padded(1024, src, dst, 0);
+
+ // every iteration sends 10 GB
+ b.iter(|| {
+ opaque.store(0, Ordering::SeqCst);
+ while opaque.load(Ordering::Acquire) < 10 * 1024 * 1024 {
+ router.send(msg.to_vec()).unwrap();
+ }
+ });
+}
- for (num, (mask, len, dst, okay)) in tests.iter().enumerate() {
- println!(
- "Check: {} {} {}/{}",
- dst,
- if *okay { "\\in" } else { "\\notin" },
- mask,
- len
- );
- for set_key in vec![true, false] {
- debug!("index = {}, set_key = {}", num, set_key);
+#[test]
+fn test_outbound() {
+ init();
+
+ // create device
+ let (_fake, _reader, tun_writer, _mtu) = dummy::TunTest::create(false);
+ let router: Device<_, TestCallbacks, _, _> = Device::new(1, tun_writer);
+ router.set_outbound_writer(dummy::VoidBind::new());
+
+ let tests = vec![
+ ("192.168.1.0", 24, "192.168.1.20", true),
+ ("172.133.133.133", 32, "172.133.133.133", true),
+ ("172.133.133.133", 32, "172.133.133.132", false),
+ (
+ "2001:db8::ff00:42:0000",
+ 112,
+ "2001:db8::ff00:42:3242",
+ true,
+ ),
+ (
+ "2001:db8::ff00:42:8000",
+ 113,
+ "2001:db8::ff00:42:0660",
+ false,
+ ),
+ (
+ "2001:db8::ff00:42:8000",
+ 113,
+ "2001:db8::ff00:42:ffff",
+ true,
+ ),
+ ];
+
+ for (mask, len, dst, okay) in tests.iter() {
+ let len = *len;
+ let okay = *okay;
+
+ println!(
+ "Check: {} {} {}/{}",
+ dst,
+ if okay { "\\in" } else { "\\notin" },
+ mask,
+ len
+ );
+
+ for set_key in vec![true, false] {
+ for confirm_with_staged_packet in vec![true, false] {
+ let send_keepalive = (!confirm_with_staged_packet || !okay) && set_key;
+ let send_payload = okay && set_key;
+ let need_key = ((confirm_with_staged_packet && set_key) || !set_key) && okay;
+
+ println!(
+ " confirm_with_staged_packet = {}, send_keepalive = {}, set_key = {}",
+ confirm_with_staged_packet, send_keepalive, set_key
+ );
// add new peer
let opaque = Opaque::new();
let peer = router.new_peer(opaque.clone());
let mask: IpAddr = mask.parse().unwrap();
- if set_key {
+
+ // confirm using keepalive
+ if set_key && (!confirm_with_staged_packet) {
peer.add_keypair(dummy_keypair(true));
}
// map subnet to peer
- peer.add_allowed_ip(mask, *len);
+ peer.add_allowed_ip(mask, len);
// create "IP packet"
let dst = dst.parse().unwrap();
@@ -231,246 +266,279 @@ mod tests {
IpAddr::V4(_) => "127.0.0.1".parse().unwrap(),
IpAddr::V6(_) => "::1".parse().unwrap(),
};
- let msg = make_packet_padded(1024, src, dst, 0);
+ let msg = make_packet_padded(SIZE_MSG, src, dst, 0);
- // cryptkey route the IP packet
+ // crypto-key route the IP packet
let res = router.send(msg);
+ assert_eq!(
+ res.is_ok(),
+ okay,
+ "crypto-routing / destination lookup failure"
+ );
- // allow some scheduling
- wait();
+ // confirm using staged packet
+ if set_key && confirm_with_staged_packet {
+ peer.add_keypair(dummy_keypair(true));
+ }
- if *okay {
- // cryptkey routing succeeded
- assert!(res.is_ok(), "crypt-key routing should succeed: {:?}", res);
+ // check for key-material request
+ if need_key {
assert_eq!(
- opaque.need_key().is_some(),
- !set_key,
+ opaque.need_key.wait(TIMEOUT),
+ Some(()),
"should have requested a new key, if no encryption state was set"
);
+ }
+
+ // check for keepalive
+ if send_keepalive {
assert_eq!(
- opaque.send().is_some(),
- set_key,
- "transmission should have been attempted"
- );
- assert!(
- opaque.recv().is_none(),
- "no messages should have been marked as received"
- );
- } else {
- // no such cryptkey route
- assert!(res.is_err(), "crypt-key routing should fail");
- assert!(
- opaque.need_key().is_none(),
- "should not request a new-key if crypt-key routing failed"
+ opaque.send.wait(TIMEOUT),
+ Some((SIZE_KEEPALIVE, false)),
+ "keepalive should be sent before transport message"
);
+ }
+
+ // check for encryption of payload
+ if send_payload {
assert_eq!(
- opaque.send(),
- if set_key {
- Some((SIZE_KEEPALIVE, false))
- } else {
- None
- },
- "transmission should only happen if key was set (keepalive)",
- );
- assert!(
- opaque.recv().is_none(),
- "no messages should have been marked as received",
- );
+ opaque.send.wait(TIMEOUT),
+ Some((SIZE_KEEPALIVE + SIZE_MSG, false)),
+ "message buffer should be encrypted"
+ )
}
+
+ // check that we handled all events
+ no_events!(opaque);
}
}
-
- println!("Test complete, drop device");
}
+}
- #[test]
- fn test_bidirectional() {
- init();
+#[test]
+fn test_bidirectional() {
+ init();
- let tests = [
+ let tests = [
+ (
+ ("192.168.1.0", 24, "192.168.1.20", true),
+ ("172.133.133.133", 32, "172.133.133.133", true),
+ ),
+ (
+ ("192.168.1.0", 24, "192.168.1.20", true),
+ ("172.133.133.133", 32, "172.133.133.133", true),
+ ),
+ (
(
- ("192.168.1.0", 24, "192.168.1.20", true),
- ("172.133.133.133", 32, "172.133.133.133", true),
+ "2001:db8::ff00:42:8000",
+ 113,
+ "2001:db8::ff00:42:ffff",
+ true,
),
(
- ("192.168.1.0", 24, "192.168.1.20", true),
- ("172.133.133.133", 32, "172.133.133.133", true),
+ "2001:db8::ff40:42:8000",
+ 113,
+ "2001:db8::ff40:42:ffff",
+ true,
),
+ ),
+ (
(
- (
- "2001:db8::ff00:42:8000",
- 113,
- "2001:db8::ff00:42:ffff",
- true,
- ),
- (
- "2001:db8::ff40:42:8000",
- 113,
- "2001:db8::ff40:42:ffff",
- true,
- ),
+ "2001:db8::ff00:42:8000",
+ 113,
+ "2001:db8::ff00:42:ffff",
+ true,
),
(
- (
- "2001:db8::ff00:42:8000",
- 113,
- "2001:db8::ff00:42:ffff",
- true,
- ),
- (
- "2001:db8::ff40:42:8000",
- 113,
- "2001:db8::ff40:42:ffff",
- true,
- ),
+ "2001:db8::ff40:42:8000",
+ 113,
+ "2001:db8::ff40:42:ffff",
+ true,
),
- ];
+ ),
+ ];
- for stage in vec![true, false] {
- for (p1, p2) in tests.iter() {
- let ((bind_reader1, bind_writer1), (bind_reader2, bind_writer2)) =
- dummy::PairBind::pair();
+ for (p1, p2) in tests.iter() {
+ for confirm_with_staged_packet in vec![true, false] {
+ println!(
+ "peer1 = {:?}, peer2 = {:?}, confirm_with_staged_packet = {}",
+ p1, p2, confirm_with_staged_packet
+ );
- // create matching device
- let (_fake, _, tun_writer1, _) = dummy::TunTest::create(false);
- let (_fake, _, tun_writer2, _) = dummy::TunTest::create(false);
+ let ((bind_reader1, bind_writer1), (bind_reader2, bind_writer2)) =
+ dummy::PairBind::pair();
- let router1: Device<_, TestCallbacks, _, _> = Device::new(1, tun_writer1);
- router1.set_outbound_writer(bind_writer1);
+ let confirm_packet_size = if confirm_with_staged_packet {
+ SIZE_KEEPALIVE + SIZE_MSG
+ } else {
+ SIZE_KEEPALIVE
+ };
- let router2: Device<_, TestCallbacks, _, _> = Device::new(1, tun_writer2);
- router2.set_outbound_writer(bind_writer2);
+ // create matching device
+ let (_fake, _, tun_writer1, _) = dummy::TunTest::create(false);
+ let (_fake, _, tun_writer2, _) = dummy::TunTest::create(false);
- // prepare opaque values for tracing callbacks
+ let router1: Device<_, TestCallbacks, _, _> = Device::new(1, tun_writer1);
+ router1.set_outbound_writer(bind_writer1);
- let opaque1 = Opaque::new();
- let opaque2 = Opaque::new();
+ let router2: Device<_, TestCallbacks, _, _> = Device::new(1, tun_writer2);
+ router2.set_outbound_writer(bind_writer2);
- // create peers with matching keypairs and assign subnets
+ // prepare opaque values for tracing callbacks
- let peer1 = router1.new_peer(opaque1.clone());
- let peer2 = router2.new_peer(opaque2.clone());
+ let opaque1 = Opaque::new();
+ let opaque2 = Opaque::new();
- {
- let (mask, len, _ip, _okay) = p1;
- let mask: IpAddr = mask.parse().unwrap();
- peer1.add_allowed_ip(mask, *len);
- peer1.add_keypair(dummy_keypair(false));
- }
+ // create peers with matching keypairs and assign subnets
- {
- let (mask, len, _ip, _okay) = p2;
- let mask: IpAddr = mask.parse().unwrap();
- peer2.add_allowed_ip(mask, *len);
- peer2.set_endpoint(dummy::UnitEndpoint::new());
- }
+ let peer1 = router1.new_peer(opaque1.clone());
+ let peer2 = router2.new_peer(opaque2.clone());
- if stage {
- println!("confirm using staged packet");
-
- // create IP packet
- let (_mask, _len, ip1, _okay) = p1;
- let (_mask, _len, ip2, _okay) = p2;
- let msg = make_packet_padded(
- 1024,
- ip1.parse().unwrap(), // src
- ip2.parse().unwrap(), // dst
- 0,
- );
+ {
+ let (mask, len, _ip, _okay) = p1;
+ let mask: IpAddr = mask.parse().unwrap();
+ peer1.add_allowed_ip(mask, *len);
+ peer1.add_keypair(dummy_keypair(false));
+ }
+
+ {
+ let (mask, len, _ip, _okay) = p2;
+ let mask: IpAddr = mask.parse().unwrap();
+ peer2.add_allowed_ip(mask, *len);
+ peer2.set_endpoint(dummy::UnitEndpoint::new());
+ }
- // stage packet for sending
- router2.send(msg).expect("failed to sent staged packet");
- wait();
+ if confirm_with_staged_packet {
+ // create IP packet
+ let (_mask, _len, ip1, _okay) = p1;
+ let (_mask, _len, ip2, _okay) = p2;
+ let msg = make_packet_padded(
+ SIZE_MSG,
+ ip1.parse().unwrap(), // src
+ ip2.parse().unwrap(), // dst
+ 0,
+ );
- // validate events
- assert!(opaque2.recv().is_none());
- assert!(
- opaque2.send().is_none(),
- "sending should fail as not key is set"
- );
- assert!(
- opaque2.need_key().is_some(),
- "a new key should be requested since a packet was attempted transmitted"
- );
- assert!(opaque2.is_empty(), "callbacks should only run once");
- }
+ // stage packet for sending
+ router2.send(msg).expect("failed to sent staged packet");
- // this should cause a key-confirmation packet (keepalive or staged packet)
- // this also causes peer1 to learn the "endpoint" for peer2
- assert!(peer1.get_endpoint().is_none());
- peer2.add_keypair(dummy_keypair(true));
+ // a new key should have been requested from the handshake machine
+ assert_eq!(
+ opaque2.need_key.wait(TIMEOUT),
+ Some(()),
+ "a new key should be requested since a packet was attempted transmitted"
+ );
- wait();
- assert!(opaque2.send().is_some());
- assert!(opaque2.is_empty(), "events on peer2 should be 'send'");
- assert!(opaque1.is_empty(), "nothing should happened on peer1");
+ no_events!(opaque1);
+ no_events!(opaque2);
+ }
- // read confirming message received by the other end ("across the internet")
- let mut buf = vec![0u8; 2048];
- let (len, from) = bind_reader1.read(&mut buf).unwrap();
- buf.truncate(len);
- router1.recv(from, buf).unwrap();
-
- wait();
- assert!(opaque1.recv().is_some());
- assert!(opaque1.key_confirmed().is_some());
- assert!(
- opaque1.is_empty(),
- "events on peer1 should be 'recv' and 'key_confirmed'"
+ // add a keypair
+ assert_eq!(peer1.get_endpoint(), None, "no endpoint has yet been set");
+ peer2.add_keypair(dummy_keypair(true));
+
+ // this should cause a key-confirmation packet (keepalive or staged packet)
+ assert_eq!(
+ opaque2.send.wait(TIMEOUT),
+ Some((confirm_packet_size, true)),
+ "expected successful transmission of a confirmation packet"
+ );
+
+ // no other events should fire
+ no_events!(opaque1);
+ no_events!(opaque2);
+
+ // read confirming message received by the other end ("across the internet")
+ let mut buf = vec![0u8; SIZE_MSG * 2];
+ let (len, from) = bind_reader1.read(&mut buf).unwrap();
+ buf.truncate(len);
+
+ assert_eq!(
+ len,
+ if confirm_with_staged_packet {
+ SIZE_MSG + SIZE_KEEPALIVE
+ } else {
+ SIZE_KEEPALIVE
+ },
+ "unexpected size of confirmation message"
+ );
+
+ // pass to the router for processing
+ router1
+ .recv(from, buf)
+ .expect("failed to receive confirmation message");
+
+ // check that a receive event is fired
+ assert_eq!(
+ opaque1.recv.wait(TIMEOUT),
+ Some((confirm_packet_size, true)),
+ "we expect processing to be successful"
+ );
+
+ // the key is confirmed
+ assert_eq!(
+ opaque1.key_confirmed.wait(TIMEOUT),
+ Some(()),
+ "confirmation message should confirm the key"
+ );
+
+ // peer1 learns the endpoint
+ assert!(
+ peer1.get_endpoint().is_some(),
+ "peer1 should learn the endpoint of peer2 from the confirmation message (roaming)"
+ );
+
+ // no other events should fire
+ no_events!(opaque1);
+ no_events!(opaque2);
+ // now that peer1 has an endpoint
+ // route packets in the other direction: peer1 -> peer2
+ for id in 1..11 {
+ println!("packet: {}", id);
+
+ let message_size = 1024;
+
+ // pass IP packet to router
+ let (_mask, _len, ip1, _okay) = p1;
+ let (_mask, _len, ip2, _okay) = p2;
+ let msg = make_packet_padded(
+ message_size,
+ ip2.parse().unwrap(), // src
+ ip1.parse().unwrap(), // dst
+ id,
);
- assert!(peer1.get_endpoint().is_some());
- assert!(opaque2.is_empty(), "nothing should happened on peer2");
- // now that peer1 has an endpoint
- // route packets : peer1 -> peer2
+ router1
+ .send(msg)
+ .expect("we expect routing to be successful");
- for id in 1..11 {
- println!("round: {}", id);
- assert!(
- opaque1.is_empty(),
- "we should have asserted a value for every callback on peer1"
- );
- assert!(
- opaque2.is_empty(),
- "we should have asserted a value for every callback on peer2"
- );
+ // encryption succeeds and the correct size is logged
+ assert_eq!(
+ opaque1.send.wait(TIMEOUT),
+ Some((message_size + SIZE_KEEPALIVE, true)),
+ "expected send event for peer1 -> peer2 payload"
+ );
- // pass IP packet to router
- let (_mask, _len, ip1, _okay) = p1;
- let (_mask, _len, ip2, _okay) = p2;
- let msg = make_packet_padded(
- 1024,
- ip2.parse().unwrap(), // src
- ip1.parse().unwrap(), // dst
- id,
- );
- router1.send(msg).unwrap();
+ // otherwise no events
+ no_events!(opaque1);
+ no_events!(opaque2);
- wait();
- assert!(opaque1.send().is_some(), "encryption should succeed");
- assert!(
- opaque1.recv().is_none(),
- "receiving callback should not be called"
- );
- assert!(opaque1.need_key().is_none());
-
- // receive ("across the internet") on the other end
- let mut buf = vec![0u8; 2048];
- let (len, from) = bind_reader2.read(&mut buf).unwrap();
- buf.truncate(len);
- router2.recv(from, buf).unwrap();
-
- wait();
- assert!(
- opaque2.send().is_none(),
- "sending callback should not be called"
- );
- assert!(
- opaque2.recv().is_some(),
- "decryption and routing should succeed"
- );
- assert!(opaque2.need_key().is_none());
- }
+ // receive ("across the internet") on the other end
+ let mut buf = vec![0u8; 2048];
+ let (len, from) = bind_reader2.read(&mut buf).unwrap();
+ buf.truncate(len);
+ router2.recv(from, buf).unwrap();
+
+ // check that decryption succeeds
+ assert_eq!(
+ opaque2.recv.wait(TIMEOUT),
+ Some((message_size + SIZE_KEEPALIVE, true)),
+ "decryption and routing should succeed"
+ );
+
+ // otherwise no events
+ no_events!(opaque1);
+ no_events!(opaque2);
}
}
}
diff --git a/src/wireguard/router/worker.rs b/src/wireguard/router/worker.rs
index bbb644c..459a198 100644
--- a/src/wireguard/router/worker.rs
+++ b/src/wireguard/router/worker.rs
@@ -6,6 +6,7 @@ use super::receive::ReceiveJob;
use super::send::SendJob;
use crossbeam_channel::Receiver;
+use log;
pub enum JobUnion<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> {
Outbound(SendJob<E, C, T, B>),
@@ -16,8 +17,12 @@ pub fn worker<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>(
receiver: Receiver<JobUnion<E, C, T, B>>,
) {
loop {
+ log::trace!("pool worker awaiting job");
match receiver.recv() {
- Err(_) => break,
+ Err(e) => {
+ log::debug!("worker stopped with {}", e);
+ break;
+ }
Ok(JobUnion::Inbound(job)) => {
job.parallel_work();
job.queue().consume();