summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2019-09-05 19:55:10 +0200
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2019-09-05 19:58:12 +0200
commit8551e03ee3a27492f8423db3eda7ddfd1135bd50 (patch)
tree1b73587437388d4084c98315d792e4b55da62650 /src
parentOutput test number and parameters to debug (diff)
downloadwireguard-rs-8551e03ee3a27492f8423db3eda7ddfd1135bd50.tar.xz
wireguard-rs-8551e03ee3a27492f8423db3eda7ddfd1135bd50.zip
Added outbound benchmark
Decent performance (~1.5Gb/s on old XPS laptop from 2014), biggest bottleneck seems to be the heap allocator, swapping with jemalloc yields 2x performance.
Diffstat (limited to 'src')
-rw-r--r--src/router/device.rs4
-rw-r--r--src/router/peer.rs17
-rw-r--r--src/router/tests.rs83
-rw-r--r--src/router/workers.rs107
-rw-r--r--src/types/keys.rs4
5 files changed, 126 insertions, 89 deletions
diff --git a/src/router/device.rs b/src/router/device.rs
index 57ab418..2196dd1 100644
--- a/src/router/device.rs
+++ b/src/router/device.rs
@@ -58,7 +58,7 @@ pub struct EncryptionState {
pub struct DecryptionState<C: Callbacks, T: Tun, B: Bind> {
pub key: [u8; 32],
- pub keypair: Weak<KeyPair>,
+ pub keypair: Weak<KeyPair>, // only the key-wheel has a strong reference
pub confirmed: AtomicBool,
pub protector: spin::Mutex<AntiReplay>,
pub peer: Weak<PeerInner<C, T, B>>,
@@ -147,7 +147,7 @@ impl<C: Callbacks, T: Tun, B: Bind> Device<C, T, B> {
///
/// # Arguments
///
- /// - pt_msg: IP packet to cryptkey route
+ /// - msg: IP packet to crypt-key route
///
pub fn send(&self, msg: Vec<u8>) -> Result<(), RouterError> {
// ensure that the type field access is within bounds
diff --git a/src/router/peer.rs b/src/router/peer.rs
index 3489bbf..634f980 100644
--- a/src/router/peer.rs
+++ b/src/router/peer.rs
@@ -41,11 +41,10 @@ pub struct KeyWheel {
}
pub struct PeerInner<C: Callbacks, T: Tun, B: Bind> {
- pub stopped: AtomicBool,
+ pub device: Arc<DeviceInner<C, T, B>>,
pub opaque: C::Opaque,
pub outbound: Mutex<SyncSender<JobOutbound>>,
pub inbound: Mutex<SyncSender<JobInbound<C, T, B>>>,
- pub device: Arc<DeviceInner<C, T, B>>,
pub staged_packets: Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake
pub rx_bytes: AtomicU64, // received bytes
pub tx_bytes: AtomicU64, // transmitted bytes
@@ -106,10 +105,12 @@ fn treebit_remove<A: Address, C: Callbacks, T: Tun, B: Bind>(
impl<C: Callbacks, T: Tun, B: Bind> Drop for Peer<C, T, B> {
fn drop(&mut self) {
- // mark peer as stopped
-
let peer = &self.state;
- peer.stopped.store(true, Ordering::SeqCst);
+
+ // remove from cryptkey router
+
+ treebit_remove(self, &peer.device.ipv4);
+ treebit_remove(self, &peer.device.ipv6);
// drop channels
@@ -121,11 +122,6 @@ impl<C: Callbacks, T: Tun, B: Bind> Drop for Peer<C, T, B> {
mem::replace(&mut self.thread_inbound, None).map(|v| v.join());
mem::replace(&mut self.thread_outbound, None).map(|v| v.join());
- // remove from cryptkey router
-
- treebit_remove(self, &peer.device.ipv4);
- treebit_remove(self, &peer.device.ipv6);
-
// release ids from the receiver map
let mut keys = peer.keys.lock();
@@ -170,7 +166,6 @@ pub fn new_peer<C: Callbacks, T: Tun, B: Bind>(
device,
inbound: Mutex::new(in_tx),
outbound: Mutex::new(out_tx),
- stopped: AtomicBool::new(false),
ekey: spin::Mutex::new(None),
endpoint: spin::Mutex::new(None),
keys: spin::Mutex::new(KeyWheel {
diff --git a/src/router/tests.rs b/src/router/tests.rs
index 84705a1..5463532 100644
--- a/src/router/tests.rs
+++ b/src/router/tests.rs
@@ -1,7 +1,6 @@
use std::error::Error;
use std::fmt;
-use std::io;
-use std::net::{IpAddr, Ipv4Addr, SocketAddr};
+use std::net::{IpAddr, SocketAddr};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
@@ -11,7 +10,9 @@ use pnet::packet::ipv4::MutableIpv4Packet;
use pnet::packet::ipv6::MutableIpv6Packet;
use super::super::types::{Bind, Key, KeyPair, Tun};
-use super::{Device, Peer, SIZE_MESSAGE_PREFIX};
+use super::{Device, SIZE_MESSAGE_PREFIX};
+
+extern crate test;
#[derive(Debug)]
enum TunError {}
@@ -127,11 +128,72 @@ mod tests {
use super::*;
use env_logger;
use log::debug;
+ use std::sync::atomic::AtomicU64;
+ use test::Bencher;
fn init() {
let _ = env_logger::builder().is_test(true).try_init();
}
+ fn make_packet(size: usize, ip: IpAddr) -> Vec<u8> {
+ // create "IP packet"
+ let mut msg = Vec::with_capacity(SIZE_MESSAGE_PREFIX + size + 16);
+ msg.resize(SIZE_MESSAGE_PREFIX + size, 0);
+ match ip {
+ IpAddr::V4(ip) => {
+ let mut packet = MutableIpv4Packet::new(&mut msg[SIZE_MESSAGE_PREFIX..]).unwrap();
+ packet.set_destination(ip);
+ packet.set_version(4);
+ }
+ IpAddr::V6(ip) => {
+ let mut packet = MutableIpv6Packet::new(&mut msg[SIZE_MESSAGE_PREFIX..]).unwrap();
+ packet.set_destination(ip);
+ packet.set_version(6);
+ }
+ }
+ msg
+ }
+
+ #[bench]
+ fn bench_outbound(b: &mut Bencher) {
+ // type for tracking number of packets
+ type Opaque = Arc<AtomicU64>;
+
+ // create device
+ let workers = 4;
+ let router = Device::new(
+ workers,
+ TunTest {},
+ BindTest {},
+ |t: &Opaque, _data: bool, _sent: bool| {
+ t.fetch_add(1, Ordering::SeqCst);
+ },
+ |t: &Opaque, _data: bool, _sent: bool| {},
+ |t: &Opaque| {},
+ );
+
+ // add new peer
+ let opaque = Arc::new(AtomicU64::new(0));
+ let peer = router.new_peer(opaque.clone());
+ peer.add_keypair(dummy_keypair(true));
+
+ // add subnet to peer
+ let (mask, len, ip) = ("192.168.1.0", 24, "192.168.1.20");
+ let mask: IpAddr = mask.parse().unwrap();
+ let ip: IpAddr = ip.parse().unwrap();
+ peer.add_subnet(mask, len);
+
+ b.iter(|| {
+ opaque.store(0, Ordering::SeqCst);
+ // wait till 10 MB
+ while opaque.load(Ordering::Acquire) < 10 * 1024 {
+ // create "IP packet"
+ let msg = make_packet(1024, ip);
+ router.send(msg).unwrap();
+ }
+ });
+ }
+
#[test]
fn test_outbound() {
init();
@@ -155,7 +217,6 @@ mod tests {
|t: &Opaque| t.need_key.store(true, Ordering::SeqCst),
);
- // create peer
let tests = vec![
("192.168.1.0", 24, "192.168.1.20", true),
("172.133.133.133", 32, "172.133.133.133", true),
@@ -201,19 +262,7 @@ mod tests {
peer.add_subnet(mask, *len);
// create "IP packet"
- let mut msg = Vec::<u8>::new();
- msg.resize(SIZE_MESSAGE_PREFIX + 1024, 0);
- if mask.is_ipv4() {
- let mut packet =
- MutableIpv4Packet::new(&mut msg[SIZE_MESSAGE_PREFIX..]).unwrap();
- packet.set_destination(ip.parse().unwrap());
- packet.set_version(4);
- } else {
- let mut packet =
- MutableIpv6Packet::new(&mut msg[SIZE_MESSAGE_PREFIX..]).unwrap();
- packet.set_destination(ip.parse().unwrap());
- packet.set_version(6);
- }
+ let msg = make_packet(1024, ip.parse().unwrap());
// cryptkey route the IP packet
let res = router.send(msg);
diff --git a/src/router/workers.rs b/src/router/workers.rs
index ec6db57..b18b038 100644
--- a/src/router/workers.rs
+++ b/src/router/workers.rs
@@ -8,6 +8,7 @@ use futures::*;
use log::debug;
use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305};
+use std::sync::atomic::{AtomicBool, Ordering};
use zerocopy::{AsBytes, LayoutVerified};
use super::device::DecryptionState;
@@ -40,67 +41,63 @@ pub fn worker_inbound<C: Callbacks, T: Tun, B: Bind>(
peer: Arc<PeerInner<C, T, B>>, // related peer
receiver: Receiver<JobInbound<C, T, B>>,
) {
- /*
- fn inner<C: Callbacks, T: Tun, B: Bind>(
- device: &Arc<DeviceInner<C, T, B>>,
- peer: &Arc<PeerInner<C, T, B>>,
- ) {
- // wait for job to complete
- loop {
- match buf.try_lock() {
- None => (),
- Some(buf) => match buf.status {
- Status::Fault => break (),
- Status::Done => {
- // parse / cast
- let (header, packet) = match LayoutVerified::new_from_prefix(&buf.msg[..]) {
- Some(v) => v,
- None => continue,
- };
- let header: LayoutVerified<&[u8], TransportHeader> = header;
-
- // obtain strong reference to decryption state
- let state = if let Some(state) = state.upgrade() {
- state
- } else {
- break;
- };
-
- // check for replay
- if !state.protector.lock().update(header.f_counter.get()) {
- break;
- }
+ loop {
+ // fetch job
+ let (state, rx) = match receiver.recv() {
+ Ok(v) => v,
+ _ => {
+ return;
+ }
+ };
- // check for confirms key
- if !state.confirmed.swap(true, Ordering::SeqCst) {
- peer.confirm_key(state.keypair.clone());
+ // wait for job to complete
+ let _ = rx
+ .map(|buf| {
+ if buf.okay {
+ // parse / cast
+ let (header, packet) = match LayoutVerified::new_from_prefix(&buf.msg[..]) {
+ Some(v) => v,
+ None => {
+ return;
}
+ };
+ let header: LayoutVerified<&[u8], TransportHeader> = header;
+
+ // obtain strong reference to decryption state
+ let state = if let Some(state) = state.upgrade() {
+ state
+ } else {
+ return;
+ };
+
+ // check for replay
+ if !state.protector.lock().update(header.f_counter.get()) {
+ return;
+ }
- // update endpoint, TODO
-
- // write packet to TUN device, TODO
-
- // trigger callback
- debug_assert!(
- packet.len() >= CHACHA20_POLY1305.nonce_len(),
- "this should be checked earlier in the pipeline"
- );
- (device.call_recv)(
- &peer.opaque,
- packet.len() > CHACHA20_POLY1305.nonce_len(),
- true,
- );
- break;
+ // check for confirms key
+ if !state.confirmed.swap(true, Ordering::SeqCst) {
+ peer.confirm_key(state.keypair.clone());
}
- _ => (),
- },
- };
- // default is to park
- thread::park()
- }
+ // update endpoint, TODO
+
+ // write packet to TUN device, TODO
+
+ // trigger callback
+ debug_assert!(
+ packet.len() >= CHACHA20_POLY1305.nonce_len(),
+ "this should be checked earlier in the pipeline"
+ );
+ (device.call_recv)(
+ &peer.opaque,
+ packet.len() > CHACHA20_POLY1305.nonce_len(),
+ true,
+ );
+ }
+ })
+ .wait();
}
- */
}
pub fn worker_outbound<C: Callbacks, T: Tun, B: Bind>(
diff --git a/src/types/keys.rs b/src/types/keys.rs
index d2c4139..89cacf9 100644
--- a/src/types/keys.rs
+++ b/src/types/keys.rs
@@ -1,10 +1,6 @@
use clear_on_drop::clear::Clear;
use std::time::Instant;
-/* This file holds types passed between components.
- * Whenever a type cannot be held local to a single module.
- */
-
#[derive(Debug, Clone)]
pub struct Key {
pub key: [u8; 32],