From 8551e03ee3a27492f8423db3eda7ddfd1135bd50 Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Thu, 5 Sep 2019 19:55:10 +0200 Subject: Added outbound benchmark Decent performance (~1.5Gb/s on old XPS laptop from 2014), biggest bottleneck seems to be the heap allocator, swapping with jemalloc yields 2x performance. --- src/router/device.rs | 4 +- src/router/peer.rs | 17 +++----- src/router/tests.rs | 83 +++++++++++++++++++++++++++++++-------- src/router/workers.rs | 107 ++++++++++++++++++++++++-------------------------- src/types/keys.rs | 4 -- 5 files changed, 126 insertions(+), 89 deletions(-) diff --git a/src/router/device.rs b/src/router/device.rs index 57ab418..2196dd1 100644 --- a/src/router/device.rs +++ b/src/router/device.rs @@ -58,7 +58,7 @@ pub struct EncryptionState { pub struct DecryptionState { pub key: [u8; 32], - pub keypair: Weak, + pub keypair: Weak, // only the key-wheel has a strong reference pub confirmed: AtomicBool, pub protector: spin::Mutex, pub peer: Weak>, @@ -147,7 +147,7 @@ impl Device { /// /// # Arguments /// - /// - pt_msg: IP packet to cryptkey route + /// - msg: IP packet to crypt-key route /// pub fn send(&self, msg: Vec) -> Result<(), RouterError> { // ensure that the type field access is within bounds diff --git a/src/router/peer.rs b/src/router/peer.rs index 3489bbf..634f980 100644 --- a/src/router/peer.rs +++ b/src/router/peer.rs @@ -41,11 +41,10 @@ pub struct KeyWheel { } pub struct PeerInner { - pub stopped: AtomicBool, + pub device: Arc>, pub opaque: C::Opaque, pub outbound: Mutex>, pub inbound: Mutex>>, - pub device: Arc>, pub staged_packets: Mutex; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake pub rx_bytes: AtomicU64, // received bytes pub tx_bytes: AtomicU64, // transmitted bytes @@ -106,10 +105,12 @@ fn treebit_remove( impl Drop for Peer { fn drop(&mut self) { - // mark peer as stopped - let peer = &self.state; - peer.stopped.store(true, Ordering::SeqCst); + + // remove from cryptkey router + + treebit_remove(self, &peer.device.ipv4); + treebit_remove(self, &peer.device.ipv6); // drop channels @@ -121,11 +122,6 @@ impl Drop for Peer { mem::replace(&mut self.thread_inbound, None).map(|v| v.join()); mem::replace(&mut self.thread_outbound, None).map(|v| v.join()); - // remove from cryptkey router - - treebit_remove(self, &peer.device.ipv4); - treebit_remove(self, &peer.device.ipv6); - // release ids from the receiver map let mut keys = peer.keys.lock(); @@ -170,7 +166,6 @@ pub fn new_peer( device, inbound: Mutex::new(in_tx), outbound: Mutex::new(out_tx), - stopped: AtomicBool::new(false), ekey: spin::Mutex::new(None), endpoint: spin::Mutex::new(None), keys: spin::Mutex::new(KeyWheel { diff --git a/src/router/tests.rs b/src/router/tests.rs index 84705a1..5463532 100644 --- a/src/router/tests.rs +++ b/src/router/tests.rs @@ -1,7 +1,6 @@ use std::error::Error; use std::fmt; -use std::io; -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::net::{IpAddr, SocketAddr}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::thread; @@ -11,7 +10,9 @@ use pnet::packet::ipv4::MutableIpv4Packet; use pnet::packet::ipv6::MutableIpv6Packet; use super::super::types::{Bind, Key, KeyPair, Tun}; -use super::{Device, Peer, SIZE_MESSAGE_PREFIX}; +use super::{Device, SIZE_MESSAGE_PREFIX}; + +extern crate test; #[derive(Debug)] enum TunError {} @@ -127,11 +128,72 @@ mod tests { use super::*; use env_logger; use log::debug; + use std::sync::atomic::AtomicU64; + use test::Bencher; fn init() { let _ = env_logger::builder().is_test(true).try_init(); } + fn make_packet(size: usize, ip: IpAddr) -> Vec { + // create "IP packet" + let mut msg = Vec::with_capacity(SIZE_MESSAGE_PREFIX + size + 16); + msg.resize(SIZE_MESSAGE_PREFIX + size, 0); + match ip { + IpAddr::V4(ip) => { + let mut packet = MutableIpv4Packet::new(&mut msg[SIZE_MESSAGE_PREFIX..]).unwrap(); + packet.set_destination(ip); + packet.set_version(4); + } + IpAddr::V6(ip) => { + let mut packet = MutableIpv6Packet::new(&mut msg[SIZE_MESSAGE_PREFIX..]).unwrap(); + packet.set_destination(ip); + packet.set_version(6); + } + } + msg + } + + #[bench] + fn bench_outbound(b: &mut Bencher) { + // type for tracking number of packets + type Opaque = Arc; + + // create device + let workers = 4; + let router = Device::new( + workers, + TunTest {}, + BindTest {}, + |t: &Opaque, _data: bool, _sent: bool| { + t.fetch_add(1, Ordering::SeqCst); + }, + |t: &Opaque, _data: bool, _sent: bool| {}, + |t: &Opaque| {}, + ); + + // add new peer + let opaque = Arc::new(AtomicU64::new(0)); + let peer = router.new_peer(opaque.clone()); + peer.add_keypair(dummy_keypair(true)); + + // add subnet to peer + let (mask, len, ip) = ("192.168.1.0", 24, "192.168.1.20"); + let mask: IpAddr = mask.parse().unwrap(); + let ip: IpAddr = ip.parse().unwrap(); + peer.add_subnet(mask, len); + + b.iter(|| { + opaque.store(0, Ordering::SeqCst); + // wait till 10 MB + while opaque.load(Ordering::Acquire) < 10 * 1024 { + // create "IP packet" + let msg = make_packet(1024, ip); + router.send(msg).unwrap(); + } + }); + } + #[test] fn test_outbound() { init(); @@ -155,7 +217,6 @@ mod tests { |t: &Opaque| t.need_key.store(true, Ordering::SeqCst), ); - // create peer let tests = vec![ ("192.168.1.0", 24, "192.168.1.20", true), ("172.133.133.133", 32, "172.133.133.133", true), @@ -201,19 +262,7 @@ mod tests { peer.add_subnet(mask, *len); // create "IP packet" - let mut msg = Vec::::new(); - msg.resize(SIZE_MESSAGE_PREFIX + 1024, 0); - if mask.is_ipv4() { - let mut packet = - MutableIpv4Packet::new(&mut msg[SIZE_MESSAGE_PREFIX..]).unwrap(); - packet.set_destination(ip.parse().unwrap()); - packet.set_version(4); - } else { - let mut packet = - MutableIpv6Packet::new(&mut msg[SIZE_MESSAGE_PREFIX..]).unwrap(); - packet.set_destination(ip.parse().unwrap()); - packet.set_version(6); - } + let msg = make_packet(1024, ip.parse().unwrap()); // cryptkey route the IP packet let res = router.send(msg); diff --git a/src/router/workers.rs b/src/router/workers.rs index ec6db57..b18b038 100644 --- a/src/router/workers.rs +++ b/src/router/workers.rs @@ -8,6 +8,7 @@ use futures::*; use log::debug; use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305}; +use std::sync::atomic::{AtomicBool, Ordering}; use zerocopy::{AsBytes, LayoutVerified}; use super::device::DecryptionState; @@ -40,67 +41,63 @@ pub fn worker_inbound( peer: Arc>, // related peer receiver: Receiver>, ) { - /* - fn inner( - device: &Arc>, - peer: &Arc>, - ) { - // wait for job to complete - loop { - match buf.try_lock() { - None => (), - Some(buf) => match buf.status { - Status::Fault => break (), - Status::Done => { - // parse / cast - let (header, packet) = match LayoutVerified::new_from_prefix(&buf.msg[..]) { - Some(v) => v, - None => continue, - }; - let header: LayoutVerified<&[u8], TransportHeader> = header; - - // obtain strong reference to decryption state - let state = if let Some(state) = state.upgrade() { - state - } else { - break; - }; - - // check for replay - if !state.protector.lock().update(header.f_counter.get()) { - break; - } + loop { + // fetch job + let (state, rx) = match receiver.recv() { + Ok(v) => v, + _ => { + return; + } + }; - // check for confirms key - if !state.confirmed.swap(true, Ordering::SeqCst) { - peer.confirm_key(state.keypair.clone()); + // wait for job to complete + let _ = rx + .map(|buf| { + if buf.okay { + // parse / cast + let (header, packet) = match LayoutVerified::new_from_prefix(&buf.msg[..]) { + Some(v) => v, + None => { + return; } + }; + let header: LayoutVerified<&[u8], TransportHeader> = header; + + // obtain strong reference to decryption state + let state = if let Some(state) = state.upgrade() { + state + } else { + return; + }; + + // check for replay + if !state.protector.lock().update(header.f_counter.get()) { + return; + } - // update endpoint, TODO - - // write packet to TUN device, TODO - - // trigger callback - debug_assert!( - packet.len() >= CHACHA20_POLY1305.nonce_len(), - "this should be checked earlier in the pipeline" - ); - (device.call_recv)( - &peer.opaque, - packet.len() > CHACHA20_POLY1305.nonce_len(), - true, - ); - break; + // check for confirms key + if !state.confirmed.swap(true, Ordering::SeqCst) { + peer.confirm_key(state.keypair.clone()); } - _ => (), - }, - }; - // default is to park - thread::park() - } + // update endpoint, TODO + + // write packet to TUN device, TODO + + // trigger callback + debug_assert!( + packet.len() >= CHACHA20_POLY1305.nonce_len(), + "this should be checked earlier in the pipeline" + ); + (device.call_recv)( + &peer.opaque, + packet.len() > CHACHA20_POLY1305.nonce_len(), + true, + ); + } + }) + .wait(); } - */ } pub fn worker_outbound( diff --git a/src/types/keys.rs b/src/types/keys.rs index d2c4139..89cacf9 100644 --- a/src/types/keys.rs +++ b/src/types/keys.rs @@ -1,10 +1,6 @@ use clear_on_drop::clear::Clear; use std::time::Instant; -/* This file holds types passed between components. - * Whenever a type cannot be held local to a single module. - */ - #[derive(Debug, Clone)] pub struct Key { pub key: [u8; 32], -- cgit v1.2.3-59-g8ed1b