From d834918f6fe28f1c48f2846961d95dbde48f18e7 Mon Sep 17 00:00:00 2001 From: Jake McGinty Date: Fri, 1 Jun 2018 23:01:17 -0500 Subject: follow crossbeam directions to see if perf improves --- src/crypto_pool.rs | 95 ++++++++++++++++++++++++++++-------------------------- 1 file changed, 49 insertions(+), 46 deletions(-) (limited to 'src/crypto_pool.rs') diff --git a/src/crypto_pool.rs b/src/crypto_pool.rs index 2208e98..e0756e9 100644 --- a/src/crypto_pool.rs +++ b/src/crypto_pool.rs @@ -1,4 +1,5 @@ use consts::{PADDING_MULTIPLE, TRANSPORT_OVERHEAD, TRANSPORT_HEADER_SIZE}; +use crossbeam; use crossbeam_channel::{unbounded, Receiver, Sender}; use futures::sync::mpsc; use futures::executor; @@ -52,61 +53,63 @@ pub fn create() -> Sender { let threads = num_cpus::get() - 1; // One thread for I/O. let (sender, receiver) = unbounded(); - for i in 0..threads { - let rx = receiver.clone(); - thread::Builder::new().name(format!("wireguard-rs-crypto-{}", i)) - .spawn(move || worker(rx.clone())).unwrap(); - } + crossbeam::scope(|s| { + for i in 0..threads { + let rx = receiver.clone(); + s.spawn(move || worker(rx.clone())); + } + }); sender } fn worker(receiver: Receiver) { - loop { - let work = receiver.recv().expect("channel to crypto worker thread broken."); - match work { - Work::Decrypt((tx, element)) => { - let mut raw_packet = vec![0u8; element.packet.len()]; - let nonce = element.packet.nonce(); - let len = element.transport.read_transport_message(nonce, element.packet.payload(), &mut raw_packet).unwrap(); - if len > 0 { - let len = IpPacket::new(&raw_packet[..len]) - .ok_or_else(||format_err!("invalid IP packet (len {})", len)).unwrap() - .length(); - raw_packet.truncate(len as usize); - } else { - raw_packet.truncate(0); - } + select_loop! { + recv(receiver, work) => { + match work { + Work::Decrypt((tx, element)) => { + let mut raw_packet = vec![0u8; element.packet.len()]; + let nonce = element.packet.nonce(); + let len = element.transport.read_transport_message(nonce, element.packet.payload(), &mut raw_packet).unwrap(); + if len > 0 { + let len = IpPacket::new(&raw_packet[..len]) + .ok_or_else(||format_err!("invalid IP packet (len {})", len)).unwrap() + .length(); + raw_packet.truncate(len as usize); + } else { + raw_packet.truncate(0); + } - tx.unbounded_send(DecryptResult { - endpoint: element.endpoint, - orig_packet: element.packet, - out_packet: raw_packet, - session_type: element.session_type, - }).unwrap(); - }, - Work::Encrypt((tx, mut element)) => { - let padding = if element.in_packet.len() % PADDING_MULTIPLE != 0 { - PADDING_MULTIPLE - (element.in_packet.len() % PADDING_MULTIPLE) - } else { 0 }; - let padded_len = element.in_packet.len() + padding; - let mut out_packet = vec![0u8; padded_len + TRANSPORT_OVERHEAD]; + tx.unbounded_send(DecryptResult { + endpoint: element.endpoint, + orig_packet: element.packet, + out_packet: raw_packet, + session_type: element.session_type, + }).unwrap(); + }, + Work::Encrypt((tx, mut element)) => { + let padding = if element.in_packet.len() % PADDING_MULTIPLE != 0 { + PADDING_MULTIPLE - (element.in_packet.len() % PADDING_MULTIPLE) + } else { 0 }; + let padded_len = element.in_packet.len() + padding; + let mut out_packet = vec![0u8; padded_len + TRANSPORT_OVERHEAD]; - out_packet[0] = 4; - LittleEndian::write_u32(&mut out_packet[4..], element.their_index); - LittleEndian::write_u64(&mut out_packet[8..], element.nonce); + out_packet[0] = 4; + LittleEndian::write_u32(&mut out_packet[4..], element.their_index); + LittleEndian::write_u64(&mut out_packet[8..], element.nonce); - element.in_packet.resize(padded_len, 0); - let len = element.transport.write_transport_message(element.nonce, - &element.in_packet, - &mut out_packet[16..]).unwrap(); - out_packet.truncate(TRANSPORT_HEADER_SIZE + len); + element.in_packet.resize(padded_len, 0); + let len = element.transport.write_transport_message(element.nonce, + &element.in_packet, + &mut out_packet[16..]).unwrap(); + out_packet.truncate(TRANSPORT_HEADER_SIZE + len); - tx.unbounded_send(EncryptResult { - endpoint: element.endpoint, - our_index: element.our_index, - out_packet, - }).unwrap(); + tx.unbounded_send(EncryptResult { + endpoint: element.endpoint, + our_index: element.our_index, + out_packet, + }).unwrap(); + } } } } -- cgit v1.2.3-59-g8ed1b