diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/router/constants.rs | 2 | ||||
-rw-r--r-- | src/router/device.rs | 9 | ||||
-rw-r--r-- | src/router/mod.rs | 1 | ||||
-rw-r--r-- | src/router/peer.rs | 4 | ||||
-rw-r--r-- | src/router/tests.rs | 44 | ||||
-rw-r--r-- | src/router/workers.rs | 5 |
6 files changed, 44 insertions, 21 deletions
diff --git a/src/router/constants.rs b/src/router/constants.rs new file mode 100644 index 0000000..b3015ed --- /dev/null +++ b/src/router/constants.rs @@ -0,0 +1,2 @@ +pub const MAX_STAGED_PACKETS: usize = 128; +pub const WORKER_QUEUE_SIZE: usize = MAX_STAGED_PACKETS; diff --git a/src/router/device.rs b/src/router/device.rs index 58ca2f6..2617350 100644 --- a/src/router/device.rs +++ b/src/router/device.rs @@ -18,6 +18,7 @@ use super::peer; use super::peer::{Peer, PeerInner}; use super::SIZE_MESSAGE_PREFIX; +use super::constants::WORKER_QUEUE_SIZE; use super::messages::TYPE_TRANSPORT; use super::types::{Callback, Callbacks, KeyCallback, Opaque, PhantomCallbacks, RouterError}; use super::workers::{worker_parallel, JobParallel}; @@ -113,13 +114,9 @@ impl<O: Opaque, R: Callback<O>, S: Callback<O>, K: KeyCallback<O>, T: Tun, B: Bi let mut queues = Vec::with_capacity(num_workers); let mut threads = Vec::with_capacity(num_workers); for _ in 0..num_workers { - // allocate work queue - let (tx, rx) = sync_channel(128); + let (tx, rx) = sync_channel(WORKER_QUEUE_SIZE); queues.push(spin::Mutex::new(tx)); - - // start worker thread - let device = inner.clone(); - threads.push(thread::spawn(move || worker_parallel(device, rx))); + threads.push(thread::spawn(move || worker_parallel(rx))); } // return exported device handle diff --git a/src/router/mod.rs b/src/router/mod.rs index 0e4bce1..ec560b4 100644 --- a/src/router/mod.rs +++ b/src/router/mod.rs @@ -1,4 +1,5 @@ mod anti_replay; +mod constants; mod device; mod messages; mod peer; diff --git a/src/router/peer.rs b/src/router/peer.rs index a31dfcf..e9f62d5 100644 --- a/src/router/peer.rs +++ b/src/router/peer.rs @@ -22,17 +22,15 @@ use super::device::DeviceInner; use super::device::EncryptionState; use super::messages::TransportHeader; -use futures::sync::oneshot; use futures::*; use super::workers::Operation; use super::workers::{worker_inbound, worker_outbound}; use super::workers::{JobBuffer, JobInbound, JobOutbound, JobParallel}; +use super::constants::MAX_STAGED_PACKETS; use super::types::Callbacks; -const MAX_STAGED_PACKETS: usize = 128; - pub struct KeyWheel { next: Option<Arc<KeyPair>>, // next key state (unconfirmed) current: Option<Arc<KeyPair>>, // current key state (used for encryption) diff --git a/src/router/tests.rs b/src/router/tests.rs index 1e049c4..c2ff378 100644 --- a/src/router/tests.rs +++ b/src/router/tests.rs @@ -123,17 +123,30 @@ fn dummy_keypair(initiator: bool) -> KeyPair { #[test] fn test_outbound() { - let opaque = Arc::new(AtomicBool::new(false)); + // type for tracking events inside the router module + struct Flags { + send: AtomicBool, + recv: AtomicBool, + need_key: AtomicBool, + } + + type Opaque = Arc<Flags>; - // create device (with Opaque = ()) + let opaque = Arc::new(Flags { + send: AtomicBool::new(false), + recv: AtomicBool::new(false), + need_key: AtomicBool::new(false), + }); + + // create device let workers = 4; let router = Device::new( workers, TunTest {}, BindTest {}, - |t: &Arc<AtomicBool>, data: bool, sent: bool| println!("send"), - |t: &Arc<AtomicBool>, data: bool, sent: bool| {}, - |t: &Arc<AtomicBool>| t.store(true, Ordering::SeqCst), + |t: &Opaque, data: bool, sent: bool| t.send.store(true, Ordering::SeqCst), + |t: &Opaque, data: bool, sent: bool| t.recv.store(true, Ordering::SeqCst), + |t: &Opaque| t.need_key.store(true, Ordering::SeqCst), ); // create peer @@ -165,7 +178,9 @@ fn test_outbound() { peer.add_keypair(dummy_keypair(true)); for (mask, len, ip, okay) in &tests { - opaque.store(false, Ordering::SeqCst); + opaque.send.store(false, Ordering::SeqCst); + opaque.recv.store(false, Ordering::SeqCst); + opaque.need_key.store(false, Ordering::SeqCst); let mask: IpAddr = mask.parse().unwrap(); @@ -187,15 +202,28 @@ fn test_outbound() { // cryptkey route the IP packet let res = router.send(msg); + + // allow some scheduling + thread::sleep(Duration::from_millis(1)); + if *okay { // cryptkey routing succeeded assert!(res.is_ok()); - // and a key should have been requested - // assert!(opaque.load(Ordering::Acquire), "did not request key"); + // attempted to send message + assert_eq!(opaque.need_key.load(Ordering::Acquire), false); + assert_eq!(opaque.send.load(Ordering::Acquire), true); + assert_eq!(opaque.recv.load(Ordering::Acquire), false); } else { + // no such cryptkey route assert!(res.is_err()); + + // did not attempt to send message + assert_eq!(opaque.need_key.load(Ordering::Acquire), false); + assert_eq!(opaque.send.load(Ordering::Acquire), false); + assert_eq!(opaque.recv.load(Ordering::Acquire), false); } + // clear subnets for next test peer.remove_subnets(); } diff --git a/src/router/workers.rs b/src/router/workers.rs index e79502f..537f238 100644 --- a/src/router/workers.rs +++ b/src/router/workers.rs @@ -141,10 +141,7 @@ pub fn worker_outbound<C: Callbacks, T: Tun, B: Bind>( } } -pub fn worker_parallel<C: Callbacks, T: Tun, B: Bind>( - device: Arc<DeviceInner<C, T, B>>, - receiver: Receiver<JobParallel>, -) { +pub fn worker_parallel(receiver: Receiver<JobParallel>) { loop { // fetch next job let (tx, mut buf) = match receiver.recv() { |