diff options
author | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-09-04 21:42:10 +0200 |
---|---|---|
committer | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-09-04 21:42:10 +0200 |
commit | af9c76452f115c9a5a1e41e87d43e481e8bf4f0f (patch) | |
tree | 8716ba2638f448ab7d28acd216c2991efed99b63 /src | |
parent | Expanded outbound test (diff) | |
download | wireguard-rs-af9c76452f115c9a5a1e41e87d43e481e8bf4f0f.tar.xz wireguard-rs-af9c76452f115c9a5a1e41e87d43e481e8bf4f0f.zip |
More extensive outbound test
Diffstat (limited to 'src')
-rw-r--r-- | src/router/device.rs | 4 | ||||
-rw-r--r-- | src/router/peer.rs | 35 | ||||
-rw-r--r-- | src/router/tests.rs | 232 | ||||
-rw-r--r-- | src/router/workers.rs | 12 |
4 files changed, 167 insertions, 116 deletions
diff --git a/src/router/device.rs b/src/router/device.rs index 2617350..57ab418 100644 --- a/src/router/device.rs +++ b/src/router/device.rs @@ -8,6 +8,8 @@ use std::sync::{Arc, Weak}; use std::thread; use std::time::Instant; +use log::debug; + use spin; use treebitmap::IpLookupTable; @@ -84,6 +86,8 @@ impl<C: Callbacks, T: Tun, B: Bind> Drop for Device<C, T, B> { } _ => false, } {} + + debug!("device dropped"); } } diff --git a/src/router/peer.rs b/src/router/peer.rs index e9f62d5..3489bbf 100644 --- a/src/router/peer.rs +++ b/src/router/peer.rs @@ -5,6 +5,8 @@ use std::sync::mpsc::{sync_channel, SyncSender}; use std::sync::{Arc, Weak}; use std::thread; +use log::debug; + use spin::Mutex; use arraydeque::{ArrayDeque, Saturating, Wrapping}; @@ -54,8 +56,8 @@ pub struct PeerInner<C: Callbacks, T: Tun, B: Bind> { pub struct Peer<C: Callbacks, T: Tun, B: Bind> { state: Arc<PeerInner<C, T, B>>, - thread_outbound: thread::JoinHandle<()>, - thread_inbound: thread::JoinHandle<()>, + thread_outbound: Option<thread::JoinHandle<()>>, + thread_inbound: Option<thread::JoinHandle<()>>, } fn treebit_list<A, E, C: Callbacks, T: Tun, B: Bind>( @@ -109,6 +111,16 @@ impl<C: Callbacks, T: Tun, B: Bind> Drop for Peer<C, T, B> { let peer = &self.state; peer.stopped.store(true, Ordering::SeqCst); + // drop channels + + mem::replace(&mut *peer.inbound.lock(), sync_channel(0).0); + mem::replace(&mut *peer.outbound.lock(), sync_channel(0).0); + + // join with workers + + mem::replace(&mut self.thread_inbound, None).map(|v| v.join()); + mem::replace(&mut self.thread_outbound, None).map(|v| v.join()); + // remove from cryptkey router treebit_remove(self, &peer.device.ipv4); @@ -130,7 +142,7 @@ impl<C: Callbacks, T: Tun, B: Bind> Drop for Peer<C, T, B> { } } - // null key-material (TODO: extend) + // null key-material keys.next = None; keys.current = None; @@ -138,6 +150,8 @@ impl<C: Callbacks, T: Tun, B: Bind> Drop for Peer<C, T, B> { *peer.ekey.lock() = None; *peer.endpoint.lock() = None; + + debug!("peer dropped & removed from device"); } } @@ -153,10 +167,10 @@ pub fn new_peer<C: Callbacks, T: Tun, B: Bind>( let device = device.clone(); Arc::new(PeerInner { opaque, + device, inbound: Mutex::new(in_tx), outbound: Mutex::new(out_tx), stopped: AtomicBool::new(false), - device: device, ekey: spin::Mutex::new(None), endpoint: spin::Mutex::new(None), keys: spin::Mutex::new(KeyWheel { @@ -187,8 +201,8 @@ pub fn new_peer<C: Callbacks, T: Tun, B: Bind>( Peer { state: peer, - thread_inbound, - thread_outbound, + thread_inbound: Some(thread_inbound), + thread_outbound: Some(thread_outbound), } } @@ -212,21 +226,22 @@ impl<C: Callbacks, T: Tun, B: Bind> PeerInner<C, T, B> { let key = match self.ekey.lock().as_mut() { None => { // add to staged packets (create no job) + debug!("execute callback: call_need_key"); (self.device.call_need_key)(&self.opaque); self.staged_packets.lock().push_back(msg); return None; } Some(mut state) => { - // allocate nonce - state.nonce += 1; - if state.nonce >= REJECT_AFTER_MESSAGES { - state.nonce -= 1; + // avoid integer overflow in nonce + if state.nonce >= REJECT_AFTER_MESSAGES - 1 { return None; } + debug!("encryption state available, nonce = {}", state.nonce); // set transport message fields header.f_counter.set(state.nonce); header.f_receiver.set(state.id); + state.nonce += 1; state.key } }; diff --git a/src/router/tests.rs b/src/router/tests.rs index c2ff378..6feeb72 100644 --- a/src/router/tests.rs +++ b/src/router/tests.rs @@ -1,5 +1,6 @@ use std::error::Error; use std::fmt; +use std::io; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -121,110 +122,141 @@ fn dummy_keypair(initiator: bool) -> KeyPair { } } -#[test] -fn test_outbound() { - // type for tracking events inside the router module - struct Flags { - send: AtomicBool, - recv: AtomicBool, - need_key: AtomicBool, - } - - type Opaque = Arc<Flags>; - - let opaque = Arc::new(Flags { - send: AtomicBool::new(false), - recv: AtomicBool::new(false), - need_key: AtomicBool::new(false), - }); - - // create device - let workers = 4; - let router = Device::new( - workers, - TunTest {}, - BindTest {}, - |t: &Opaque, data: bool, sent: bool| t.send.store(true, Ordering::SeqCst), - |t: &Opaque, data: bool, sent: bool| t.recv.store(true, Ordering::SeqCst), - |t: &Opaque| t.need_key.store(true, Ordering::SeqCst), - ); - - // create peer - let peer = router.new_peer(opaque.clone()); - let tests = vec![ - ("192.168.1.0", 24, "192.168.1.20", true), - ("172.133.133.133", 32, "172.133.133.133", true), - ("172.133.133.133", 32, "172.133.133.132", false), - ( - "2001:db8::ff00:42:0000", - 112, - "2001:db8::ff00:42:3242", - true, - ), - ( - "2001:db8::ff00:42:8000", - 113, - "2001:db8::ff00:42:0660", - false, - ), - ( - "2001:db8::ff00:42:8000", - 113, - "2001:db8::ff00:42:ffff", - true, - ), - ]; - - peer.add_keypair(dummy_keypair(true)); - - for (mask, len, ip, okay) in &tests { - opaque.send.store(false, Ordering::SeqCst); - opaque.recv.store(false, Ordering::SeqCst); - opaque.need_key.store(false, Ordering::SeqCst); - - let mask: IpAddr = mask.parse().unwrap(); - - // map subnet to peer - peer.add_subnet(mask, *len); - - // create "IP packet" - let mut msg = Vec::<u8>::new(); - msg.resize(SIZE_MESSAGE_PREFIX + 1024, 0); - if mask.is_ipv4() { - let mut packet = MutableIpv4Packet::new(&mut msg[SIZE_MESSAGE_PREFIX..]).unwrap(); - packet.set_destination(ip.parse().unwrap()); - packet.set_version(4); - } else { - let mut packet = MutableIpv6Packet::new(&mut msg[SIZE_MESSAGE_PREFIX..]).unwrap(); - packet.set_destination(ip.parse().unwrap()); - packet.set_version(6); - } - - // cryptkey route the IP packet - let res = router.send(msg); - - // allow some scheduling - thread::sleep(Duration::from_millis(1)); +#[cfg(test)] +mod tests { + use super::*; + use env_logger; - if *okay { - // cryptkey routing succeeded - assert!(res.is_ok()); + fn init() { + let _ = env_logger::builder().is_test(true).try_init(); + } - // attempted to send message - assert_eq!(opaque.need_key.load(Ordering::Acquire), false); - assert_eq!(opaque.send.load(Ordering::Acquire), true); - assert_eq!(opaque.recv.load(Ordering::Acquire), false); - } else { - // no such cryptkey route - assert!(res.is_err()); + #[test] + fn test_outbound() { + init(); - // did not attempt to send message - assert_eq!(opaque.need_key.load(Ordering::Acquire), false); - assert_eq!(opaque.send.load(Ordering::Acquire), false); - assert_eq!(opaque.recv.load(Ordering::Acquire), false); + // type for tracking events inside the router module + struct Flags { + send: AtomicBool, + recv: AtomicBool, + need_key: AtomicBool, } - // clear subnets for next test - peer.remove_subnets(); + type Opaque = Arc<Flags>; + + // create device + let workers = 4; + let router = Device::new( + workers, + TunTest {}, + BindTest {}, + |t: &Opaque, _data: bool, _sent: bool| t.send.store(true, Ordering::SeqCst), + |t: &Opaque, _data: bool, _sent: bool| t.recv.store(true, Ordering::SeqCst), + |t: &Opaque| t.need_key.store(true, Ordering::SeqCst), + ); + + // create peer + let tests = vec![ + ("192.168.1.0", 24, "192.168.1.20", true), + ("172.133.133.133", 32, "172.133.133.133", true), + ("172.133.133.133", 32, "172.133.133.132", false), + ( + "2001:db8::ff00:42:0000", + 112, + "2001:db8::ff00:42:3242", + true, + ), + ( + "2001:db8::ff00:42:8000", + 113, + "2001:db8::ff00:42:0660", + false, + ), + ( + "2001:db8::ff00:42:8000", + 113, + "2001:db8::ff00:42:ffff", + true, + ), + ]; + + for (num, (mask, len, ip, okay)) in tests.iter().enumerate() { + for set_key in vec![true, false] { + // add new peer + let opaque = Arc::new(Flags { + send: AtomicBool::new(false), + recv: AtomicBool::new(false), + need_key: AtomicBool::new(false), + }); + let peer = router.new_peer(opaque.clone()); + let mask: IpAddr = mask.parse().unwrap(); + + if set_key { + peer.add_keypair(dummy_keypair(true)); + } + + // map subnet to peer + peer.add_subnet(mask, *len); + + // create "IP packet" + let mut msg = Vec::<u8>::new(); + msg.resize(SIZE_MESSAGE_PREFIX + 1024, 0); + if mask.is_ipv4() { + let mut packet = + MutableIpv4Packet::new(&mut msg[SIZE_MESSAGE_PREFIX..]).unwrap(); + packet.set_destination(ip.parse().unwrap()); + packet.set_version(4); + } else { + let mut packet = + MutableIpv6Packet::new(&mut msg[SIZE_MESSAGE_PREFIX..]).unwrap(); + packet.set_destination(ip.parse().unwrap()); + packet.set_version(6); + } + + // cryptkey route the IP packet + let res = router.send(msg); + + // allow some scheduling + thread::sleep(Duration::from_millis(20)); + + if *okay { + // cryptkey routing succeeded + assert!(res.is_ok(), "crypt-key routing should succeed"); + assert_eq!( + opaque.need_key.load(Ordering::Acquire), + !set_key, + "should have requested a new key, if no encryption state was set" + ); + assert_eq!( + opaque.send.load(Ordering::Acquire), + set_key, + "transmission should have been attempted" + ); + assert_eq!( + opaque.recv.load(Ordering::Acquire), + false, + "no messages should have been marked as received" + ); + } else { + // no such cryptkey route + assert!(res.is_err(), "crypt-key routing should fail"); + assert_eq!( + opaque.need_key.load(Ordering::Acquire), + false, + "should not request a new-key if crypt-key routing failed" + ); + assert_eq!( + opaque.send.load(Ordering::Acquire), + false, + "transmission should not have been attempted", + ); + assert_eq!( + opaque.recv.load(Ordering::Acquire), + false, + "no messages should have been marked as received", + ); + } + } + } } } diff --git a/src/router/workers.rs b/src/router/workers.rs index 537f238..ec6db57 100644 --- a/src/router/workers.rs +++ b/src/router/workers.rs @@ -1,16 +1,12 @@ -use std::iter; use std::mem; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{sync_channel, Receiver, TryRecvError}; +use std::sync::mpsc::Receiver; use std::sync::{Arc, Weak}; -use std::thread; use futures::sync::oneshot; use futures::*; -use spin; +use log::debug; -use crossbeam_deque::{Injector, Steal, Stealer, Worker}; use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305}; use zerocopy::{AsBytes, LayoutVerified}; @@ -174,12 +170,16 @@ pub fn worker_parallel(receiver: Receiver<JobParallel>) { match buf.op { Operation::Encryption => { + debug!("worker, process encryption"); + // note: extends the vector to accommodate the tag key.seal_in_place_append_tag(nonce, Aad::empty(), &mut buf.msg) .unwrap(); buf.okay = true; } Operation::Decryption => { + debug!("worker, process decryption"); + // opening failure is signaled by fault state buf.okay = match key.open_in_place(nonce, Aad::empty(), &mut buf.msg) { Ok(_) => true, |