aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2019-09-04 19:22:47 +0200
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2019-09-04 19:22:47 +0200
commit310be99fa671e98b1e3e84deeb4d1345abedabf6 (patch)
tree874224914200cb5a5e349e53d1c167b223b2d561 /src
parentSimply passing of JobBuffer ownership (diff)
downloadwireguard-rs-310be99fa671e98b1e3e84deeb4d1345abedabf6.tar.xz
wireguard-rs-310be99fa671e98b1e3e84deeb4d1345abedabf6.zip
Expanded outbound test
Diffstat (limited to 'src')
-rw-r--r--src/router/constants.rs2
-rw-r--r--src/router/device.rs9
-rw-r--r--src/router/mod.rs1
-rw-r--r--src/router/peer.rs4
-rw-r--r--src/router/tests.rs44
-rw-r--r--src/router/workers.rs5
6 files changed, 44 insertions, 21 deletions
diff --git a/src/router/constants.rs b/src/router/constants.rs
new file mode 100644
index 0000000..b3015ed
--- /dev/null
+++ b/src/router/constants.rs
@@ -0,0 +1,2 @@
+pub const MAX_STAGED_PACKETS: usize = 128;
+pub const WORKER_QUEUE_SIZE: usize = MAX_STAGED_PACKETS;
diff --git a/src/router/device.rs b/src/router/device.rs
index 58ca2f6..2617350 100644
--- a/src/router/device.rs
+++ b/src/router/device.rs
@@ -18,6 +18,7 @@ use super::peer;
use super::peer::{Peer, PeerInner};
use super::SIZE_MESSAGE_PREFIX;
+use super::constants::WORKER_QUEUE_SIZE;
use super::messages::TYPE_TRANSPORT;
use super::types::{Callback, Callbacks, KeyCallback, Opaque, PhantomCallbacks, RouterError};
use super::workers::{worker_parallel, JobParallel};
@@ -113,13 +114,9 @@ impl<O: Opaque, R: Callback<O>, S: Callback<O>, K: KeyCallback<O>, T: Tun, B: Bi
let mut queues = Vec::with_capacity(num_workers);
let mut threads = Vec::with_capacity(num_workers);
for _ in 0..num_workers {
- // allocate work queue
- let (tx, rx) = sync_channel(128);
+ let (tx, rx) = sync_channel(WORKER_QUEUE_SIZE);
queues.push(spin::Mutex::new(tx));
-
- // start worker thread
- let device = inner.clone();
- threads.push(thread::spawn(move || worker_parallel(device, rx)));
+ threads.push(thread::spawn(move || worker_parallel(rx)));
}
// return exported device handle
diff --git a/src/router/mod.rs b/src/router/mod.rs
index 0e4bce1..ec560b4 100644
--- a/src/router/mod.rs
+++ b/src/router/mod.rs
@@ -1,4 +1,5 @@
mod anti_replay;
+mod constants;
mod device;
mod messages;
mod peer;
diff --git a/src/router/peer.rs b/src/router/peer.rs
index a31dfcf..e9f62d5 100644
--- a/src/router/peer.rs
+++ b/src/router/peer.rs
@@ -22,17 +22,15 @@ use super::device::DeviceInner;
use super::device::EncryptionState;
use super::messages::TransportHeader;
-use futures::sync::oneshot;
use futures::*;
use super::workers::Operation;
use super::workers::{worker_inbound, worker_outbound};
use super::workers::{JobBuffer, JobInbound, JobOutbound, JobParallel};
+use super::constants::MAX_STAGED_PACKETS;
use super::types::Callbacks;
-const MAX_STAGED_PACKETS: usize = 128;
-
pub struct KeyWheel {
next: Option<Arc<KeyPair>>, // next key state (unconfirmed)
current: Option<Arc<KeyPair>>, // current key state (used for encryption)
diff --git a/src/router/tests.rs b/src/router/tests.rs
index 1e049c4..c2ff378 100644
--- a/src/router/tests.rs
+++ b/src/router/tests.rs
@@ -123,17 +123,30 @@ fn dummy_keypair(initiator: bool) -> KeyPair {
#[test]
fn test_outbound() {
- let opaque = Arc::new(AtomicBool::new(false));
+ // type for tracking events inside the router module
+ struct Flags {
+ send: AtomicBool,
+ recv: AtomicBool,
+ need_key: AtomicBool,
+ }
+
+ type Opaque = Arc<Flags>;
- // create device (with Opaque = ())
+ let opaque = Arc::new(Flags {
+ send: AtomicBool::new(false),
+ recv: AtomicBool::new(false),
+ need_key: AtomicBool::new(false),
+ });
+
+ // create device
let workers = 4;
let router = Device::new(
workers,
TunTest {},
BindTest {},
- |t: &Arc<AtomicBool>, data: bool, sent: bool| println!("send"),
- |t: &Arc<AtomicBool>, data: bool, sent: bool| {},
- |t: &Arc<AtomicBool>| t.store(true, Ordering::SeqCst),
+ |t: &Opaque, data: bool, sent: bool| t.send.store(true, Ordering::SeqCst),
+ |t: &Opaque, data: bool, sent: bool| t.recv.store(true, Ordering::SeqCst),
+ |t: &Opaque| t.need_key.store(true, Ordering::SeqCst),
);
// create peer
@@ -165,7 +178,9 @@ fn test_outbound() {
peer.add_keypair(dummy_keypair(true));
for (mask, len, ip, okay) in &tests {
- opaque.store(false, Ordering::SeqCst);
+ opaque.send.store(false, Ordering::SeqCst);
+ opaque.recv.store(false, Ordering::SeqCst);
+ opaque.need_key.store(false, Ordering::SeqCst);
let mask: IpAddr = mask.parse().unwrap();
@@ -187,15 +202,28 @@ fn test_outbound() {
// cryptkey route the IP packet
let res = router.send(msg);
+
+ // allow some scheduling
+ thread::sleep(Duration::from_millis(1));
+
if *okay {
// cryptkey routing succeeded
assert!(res.is_ok());
- // and a key should have been requested
- // assert!(opaque.load(Ordering::Acquire), "did not request key");
+ // attempted to send message
+ assert_eq!(opaque.need_key.load(Ordering::Acquire), false);
+ assert_eq!(opaque.send.load(Ordering::Acquire), true);
+ assert_eq!(opaque.recv.load(Ordering::Acquire), false);
} else {
+ // no such cryptkey route
assert!(res.is_err());
+
+ // did not attempt to send message
+ assert_eq!(opaque.need_key.load(Ordering::Acquire), false);
+ assert_eq!(opaque.send.load(Ordering::Acquire), false);
+ assert_eq!(opaque.recv.load(Ordering::Acquire), false);
}
+
// clear subnets for next test
peer.remove_subnets();
}
diff --git a/src/router/workers.rs b/src/router/workers.rs
index e79502f..537f238 100644
--- a/src/router/workers.rs
+++ b/src/router/workers.rs
@@ -141,10 +141,7 @@ pub fn worker_outbound<C: Callbacks, T: Tun, B: Bind>(
}
}
-pub fn worker_parallel<C: Callbacks, T: Tun, B: Bind>(
- device: Arc<DeviceInner<C, T, B>>,
- receiver: Receiver<JobParallel>,
-) {
+pub fn worker_parallel(receiver: Receiver<JobParallel>) {
loop {
// fetch next job
let (tx, mut buf) = match receiver.recv() {