From e0db9861bcf7194c29888c28184785f969199c38 Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Sat, 14 Dec 2019 13:37:51 +0100 Subject: Added profiler feature --- Cargo.lock | 28 +++++++++++++++++ Cargo.toml | 6 +++- src/main.rs | 63 +++++++++++++++++++++++++++++++++++---- src/wireguard/constants.rs | 2 +- src/wireguard/peer.rs | 6 ++-- src/wireguard/queue.rs | 2 +- src/wireguard/router/constants.rs | 4 ++- src/wireguard/router/inbound.rs | 4 ++- src/wireguard/router/outbound.rs | 34 +++++++++++---------- src/wireguard/router/pool.rs | 47 ++++++++++++++++++++++------- src/wireguard/router/runq.rs | 27 ++++++++++++++--- src/wireguard/wireguard.rs | 8 ++--- 12 files changed, 183 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c87f66b..b4a076d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -165,6 +165,16 @@ dependencies = [ "bitflags 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "cpuprofiler" +version = "0.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "error-chain 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "pkg-config 0.3.17 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "crypto-mac" version = "0.7.0" @@ -215,6 +225,15 @@ dependencies = [ "termcolor 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "error-chain" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "backtrace 0.3.35 (registry+https://github.com/rust-lang/crates.io-index)", + "version_check 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "failure" version = "0.1.5" @@ -471,6 +490,11 @@ name = "opaque-debug" version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "pkg-config" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "pnet" version = "0.22.0" @@ -1187,6 +1211,7 @@ dependencies = [ "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "chacha20poly1305 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "clear_on_drop 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", + "cpuprofiler 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)", "daemonize 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "digest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1277,11 +1302,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum chacha20poly1305 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "40cd3ddeae0b0ea7fe848a06e4fbf3f02463648b9395bd1139368ce42b44543e" "checksum clear_on_drop 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "97276801e127ffb46b66ce23f35cc96bd454fa311294bced4bbace7baa8b1d17" "checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" +"checksum cpuprofiler 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "43f8479dbcfd2bbaa0c0c26779b913052b375981cdf533091f2127ea3d42e52b" "checksum crypto-mac 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4434400df11d95d556bac068ddfedd482915eb18fe8bea89bc80b6e4b1c179e5" "checksum curve25519-dalek 1.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8b7dcd30ba50cdf88b55b033456138b7c0ac4afdc436d82e1b79f370f24cc66d" "checksum daemonize 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "70c24513e34f53b640819f0ac9f705b673fcf4006d7aab8778bee72ebfc89815" "checksum digest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5" "checksum env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "aafcde04e90a5226a6443b7aabdb016ba2f8307c847d524724bd9b346dd1a2d3" +"checksum error-chain 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3ab49e9dcb602294bc42f9a7dfc9bc6e936fca4418ea300dbfb84fe16de0b7d9" "checksum failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "795bd83d3abeb9220f257e597aa0080a508b27533824adf336529648f6abf7e2" "checksum failure_derive 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "ea1063915fd7ef4309e222a5a07cf9c319fb9c7836b1f89b85458672dbb127e1" "checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3" @@ -1314,6 +1341,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum num-traits 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "6ba9a427cfca2be13aa6f6403b0b7e7368fe982bfa16fccc450ce74c46cd9b32" "checksum num_cpus 1.10.1 (registry+https://github.com/rust-lang/crates.io-index)" = "bcef43580c035376c0705c42792c294b66974abbfd2789b511784023f71f3273" "checksum opaque-debug 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c" +"checksum pkg-config 0.3.17 (registry+https://github.com/rust-lang/crates.io-index)" = "05da548ad6865900e60eaba7f589cc0783590a92e940c26953ff81ddbab2d677" "checksum pnet 0.22.0 (registry+https://github.com/rust-lang/crates.io-index)" = "63d693c84430248366146e3181ff9d330243464fa9e6146c372b2f3eb2e2d8e7" "checksum pnet_base 0.22.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4df28acf2fcc77436dd2b91a9a0c2bb617f9ca5f2acefee1a4135058b9f9801f" "checksum pnet_datalink 0.22.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7b34f8ca857599d05b6b082e9baff8d27c54cb9c26568cf3c0993a5755816966" diff --git a/Cargo.toml b/Cargo.toml index 8bf3fe3..86eed49 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ clear_on_drop = "0.2.3" env_logger = "0.6" num_cpus = "^1.10" daemonize = "0.4.1" +cpuprofiler = { version = "*", optional = true } [target.'cfg(unix)'.dependencies] libc = "0.2" @@ -37,8 +38,11 @@ version = "^0.5" version = "2.1" features = ["nightly"] +[features] +profiler = ["cpuprofiler"] + [dev-dependencies] -proptest = "0.9.4" pnet = "^0.22" +proptest = "0.9.4" rand_chacha = "0.2.1" rand_core = "0.5" diff --git a/src/main.rs b/src/main.rs index e68c771..57822f7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,19 @@ #![feature(weak_into_raw)] #![allow(dead_code)] +#[cfg(feature = "profiler")] +extern crate cpuprofiler; + +#[cfg(feature = "profiler")] +use cpuprofiler::PROFILER; + +#[cfg(feature = "profiler")] +use libc::atexit; + +mod configuration; +mod platform; +mod wireguard; + use log; use daemonize::Daemonize; @@ -10,18 +23,47 @@ use std::env; use std::process::exit; use std::thread; -mod configuration; -mod platform; -mod wireguard; - use configuration::Configuration; use platform::tun::{PlatformTun, Status}; use platform::uapi::{BindUAPI, PlatformUAPI}; use platform::*; +// destructor which stops the profiler upon program exit. +#[cfg(feature = "profiler")] +pub extern "C" fn dtor_profiler_stop() { +} + +#[cfg(feature = "profiler")] +fn profiler_stop() { + PROFILER.lock().unwrap().stop().unwrap(); +} + +#[cfg(not(feature = "profiler"))] +fn profiler_stop() {} + +#[cfg(feature = "profiler")] +fn profiler_start(name: &str) { + use std::path::Path; + + // find first available path to save profiler output + let mut n = 0; + loop { + let path = format!("./{}-{}.profile", name, n); + if !Path::new(path.as_str()).exists() { + println!("Starting profiler: {}", path); + PROFILER.lock().unwrap().start(path).unwrap(); + unsafe { + assert_eq!(atexit(dtor_profiler_stop), 0); + } + break; + }; + n += 1; + } +} + fn main() { - // parse commandline arguments + // parse command line arguments let mut name = None; let mut drop_privileges = true; let mut foreground = false; @@ -82,6 +124,10 @@ fn main() { // drop privileges if drop_privileges {} + // start profiler (if enabled) + #[cfg(feature = "profiler")] + profiler_start(name.as_str()); + // create WireGuard device let wg: wireguard::Wireguard = wireguard::Wireguard::new(writer); @@ -104,6 +150,7 @@ fn main() { match status.event() { Err(e) => { log::info!("Tun device error {}", e); + profiler_stop(); exit(0); } Ok(tun::TunEvent::Up(mtu)) => { @@ -134,6 +181,7 @@ fn main() { // start UAPI server thread::spawn(move || loop { + // accept and handle UAPI config connections match uapi.connect() { Ok(mut stream) => { let cfg = cfg.clone(); @@ -146,8 +194,13 @@ fn main() { break; } } + + // exit + profiler_stop(); + exit(0); }); // block until all tun readers closed wait.wait(); + profiler_stop(); } diff --git a/src/wireguard/constants.rs b/src/wireguard/constants.rs index c53c559..97ce6b1 100644 --- a/src/wireguard/constants.rs +++ b/src/wireguard/constants.rs @@ -23,4 +23,4 @@ pub const MESSAGE_PADDING_MULTIPLE: usize = 16; * used in places to avoid Option by instead using a long "expired" Instant: * (Instant::now() - TIME_HORIZON) */ -pub const TIME_HORIZON: Duration = Duration::from_secs(3600 * 24); +pub const TIME_HORIZON: Duration = Duration::from_secs(60 * 60 * 24); diff --git a/src/wireguard/peer.rs b/src/wireguard/peer.rs index 448db96..85e340f 100644 --- a/src/wireguard/peer.rs +++ b/src/wireguard/peer.rs @@ -28,9 +28,9 @@ pub struct PeerInner { pub wg: Arc>, // handshake state - pub walltime_last_handshake: Mutex>, - pub last_handshake_sent: Mutex, // instant for last handshake - pub handshake_queued: AtomicBool, // is a handshake job currently queued for the peer? + pub walltime_last_handshake: Mutex>, // walltime for last handshake (for UAPI status) + pub last_handshake_sent: Mutex, // instant for last handshake + pub handshake_queued: AtomicBool, // is a handshake job currently queued for the peer? // stats and configuration pub pk: PublicKey, // public key, DISCUSS: avoid this. TODO: remove diff --git a/src/wireguard/queue.rs b/src/wireguard/queue.rs index a0fcf03..f484320 100644 --- a/src/wireguard/queue.rs +++ b/src/wireguard/queue.rs @@ -21,7 +21,7 @@ impl ParallelQueue { /// /// # Arguments /// - /// - `queues`: number of readers/writers + /// - `queues`: number of readers /// - `capacity`: capacity of each internal queue /// pub fn new(queues: usize, capacity: usize) -> (Self, Vec>) { diff --git a/src/wireguard/router/constants.rs b/src/wireguard/router/constants.rs index 0ca824a..6129fd7 100644 --- a/src/wireguard/router/constants.rs +++ b/src/wireguard/router/constants.rs @@ -4,4 +4,6 @@ pub const MAX_STAGED_PACKETS: usize = 128; // performance constants -pub const WORKER_QUEUE_SIZE: usize = MAX_STAGED_PACKETS; +pub const PARALLEL_QUEUE_SIZE: usize = MAX_STAGED_PACKETS; +pub const INORDER_QUEUE_SIZE: usize = PARALLEL_QUEUE_SIZE; +pub const MAX_INORDER_CONSUME: usize = INORDER_QUEUE_SIZE; diff --git a/src/wireguard/router/inbound.rs b/src/wireguard/router/inbound.rs index 5a27c95..db6d3f3 100644 --- a/src/wireguard/router/inbound.rs +++ b/src/wireguard/router/inbound.rs @@ -1,3 +1,4 @@ +use super::constants::MAX_INORDER_CONSUME; use super::device::DecryptionState; use super::device::Device; use super::messages::TransportHeader; @@ -185,6 +186,7 @@ pub fn sequential>( // handle message from the peers inbound queue device.run_inbound.run(|peer| { - peer.inbound.handle(|body| work(&peer, body)); + peer.inbound + .handle(|body| work(&peer, body), MAX_INORDER_CONSUME) }); } diff --git a/src/wireguard/router/outbound.rs b/src/wireguard/router/outbound.rs index 9ecffd8..a555ecb 100644 --- a/src/wireguard/router/outbound.rs +++ b/src/wireguard/router/outbound.rs @@ -1,3 +1,4 @@ +use super::constants::MAX_INORDER_CONSUME; use super::device::Device; use super::messages::{TransportHeader, TYPE_TRANSPORT}; use super::peer::Peer; @@ -88,20 +89,23 @@ pub fn sequential>( device: Device, ) { device.run_outbound.run(|peer| { - peer.outbound.handle(|body| { - log::trace!("worker, sequential section, obtained job"); - - // send to peer - let xmit = peer.send(&body.msg[..]).is_ok(); - - // trigger callback - C::send( - &peer.opaque, - body.msg.len(), - xmit, - &body.keypair, - body.counter, - ); - }); + peer.outbound.handle( + |body| { + log::trace!("worker, sequential section, obtained job"); + + // send to peer + let xmit = peer.send(&body.msg[..]).is_ok(); + + // trigger callback + C::send( + &peer.opaque, + body.msg.len(), + xmit, + &body.keypair, + body.counter, + ); + }, + MAX_INORDER_CONSUME, + ) }); } diff --git a/src/wireguard/router/pool.rs b/src/wireguard/router/pool.rs index 07a9bfa..686c788 100644 --- a/src/wireguard/router/pool.rs +++ b/src/wireguard/router/pool.rs @@ -4,10 +4,9 @@ use std::mem; use std::sync::mpsc::Receiver; use std::sync::Arc; +use super::constants::INORDER_QUEUE_SIZE; use super::runq::{RunQueue, ToKey}; -const INORDER_QUEUE_SIZE: usize = 64; - pub struct InnerJob { // peer (used by worker to schedule/handle inorder queue), // when the peer is None, the job is complete @@ -52,28 +51,50 @@ pub struct InorderQueue { } impl InorderQueue { - pub fn send(&self, job: Job) -> bool { - self.queue.lock().push_back(job).is_ok() - } - pub fn new() -> InorderQueue { InorderQueue { queue: Mutex::new(ArrayDeque::new()), } } + /// Add a new job to the in-order queue + /// + /// # Arguments + /// + /// - `job`: The job added to the back of the queue + /// + /// # Returns + /// + /// True if the element was added, + /// false to indicate that the queue is full. + pub fn send(&self, job: Job) -> bool { + self.queue.lock().push_back(job).is_ok() + } + + /// Consume completed jobs from the in-order queue + /// + /// # Arguments + /// + /// - `f`: function to apply to the body of each jobof each job. + /// - `limit`: maximum number of jobs to handle before returning + /// + /// # Returns + /// + /// A boolean indicating if the limit was reached: + /// true indicating that the limit was reached, + /// while false implies that the queue is empty or an uncompleted job was reached. #[inline(always)] - pub fn handle(&self, f: F) { + pub fn handle(&self, f: F, mut limit: usize) -> bool { // take the mutex let mut queue = self.queue.lock(); - loop { + while limit > 0 { // attempt to extract front element let front = queue.pop_front(); let elem = match front { Some(elem) => elem, _ => { - return; + return false; } }; @@ -90,13 +111,17 @@ impl InorderQueue { // job not complete yet, return job to front if ret { queue.push_front(elem).unwrap(); - return; + return false; } + limit -= 1; } + + // did not complete all jobs + true } } -/// Allows easy construction of a semi-parallel worker. +/// Allows easy construction of a parallel worker. /// Applicable for both decryption and encryption workers. #[inline(always)] pub fn worker_parallel< diff --git a/src/wireguard/router/runq.rs b/src/wireguard/router/runq.rs index 936a53c..44e11a1 100644 --- a/src/wireguard/router/runq.rs +++ b/src/wireguard/router/runq.rs @@ -58,7 +58,21 @@ impl RunQueue { } } - pub fn run ()>(&self, f: F) { + /// Run (consume from) the run queue using the provided function. + /// The function should return wheter the given element should be rescheduled. + /// + /// # Arguments + /// + /// - `f` : function to apply to every element + /// + /// # Note + /// + /// The function f may be called again even when the element was not inserted back in to the + /// queue since the last applciation and no rescheduling was requested. + /// + /// This happens then the function handles all work for T, + /// but T is added to the run queue while the function is running. + pub fn run bool>(&self, f: F) { let mut inner = self.inner.lock().unwrap(); loop { // fetch next element @@ -86,10 +100,16 @@ impl RunQueue { mem::drop(inner); // drop guard // handle element - f(&elem); + let rerun = f(&elem); - // retake lock and check if should be added back to queue + // if the function requested a re-run add the element to the back of the queue inner = self.inner.lock().unwrap(); + if rerun { + inner.queue.push_back(elem); + continue; + } + + // otherwise check if new requests have come in since we ran the function match inner.members.entry(key) { Entry::Occupied(occ) => { if *occ.get() == old_n { @@ -111,7 +131,6 @@ impl RunQueue { #[cfg(test)] mod tests { use super::*; - use std::sync::Arc; use std::thread; use std::time::Duration; diff --git a/src/wireguard/wireguard.rs b/src/wireguard/wireguard.rs index d0c0e53..2cd6ce4 100644 --- a/src/wireguard/wireguard.rs +++ b/src/wireguard/wireguard.rs @@ -343,8 +343,7 @@ impl Wireguard { // create vector big enough for any message given current MTU let mtu = wg.mtu.load(Ordering::Relaxed); let size = mtu + handshake::MAX_HANDSHAKE_MSG_SIZE; - let mut msg: Vec = Vec::with_capacity(size); - msg.resize(size, 0); + let mut msg: Vec = vec![0; size]; // read UDP packet into vector let (size, src) = match reader.read(&mut msg) { @@ -413,8 +412,7 @@ impl Wireguard { // create vector big enough for any transport message (based on MTU) let mtu = wg.mtu.load(Ordering::Relaxed); let size = mtu + router::SIZE_MESSAGE_PREFIX + 1; - let mut msg: Vec = Vec::with_capacity(size + router::CAPACITY_MESSAGE_POSTFIX); - msg.resize(size, 0); + let mut msg: Vec = vec![0; size + router::CAPACITY_MESSAGE_POSTFIX]; // read a new IP packet let payload = match reader.read(&mut msg[..], router::SIZE_MESSAGE_PREFIX) { @@ -426,7 +424,7 @@ impl Wireguard { }; debug!("TUN worker, IP packet of {} bytes (MTU = {})", payload, mtu); - // TODO: start device down + // check if device is down if mtu == 0 { continue; } -- cgit v1.2.3-59-g8ed1b