diff options
author | Jake McGinty <me@jake.su> | 2018-06-02 00:21:47 -0500 |
---|---|---|
committer | Jake McGinty <me@jake.su> | 2018-06-02 00:22:21 -0500 |
commit | 24bfd58d21a12be33601de8b683e42744711b96c (patch) | |
tree | 1cc8c9f0ffe0a03d163b96062221108636280d34 | |
parent | well, this is interesting. (diff) | |
download | wireguard-rs-24bfd58d21a12be33601de8b683e42744711b96c.tar.xz wireguard-rs-24bfd58d21a12be33601de8b683e42744711b96c.zip |
try using futures_cpupool with new layout
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | src/crypto_pool.rs | 115 | ||||
-rw-r--r-- | src/interface/peer_server.rs | 8 | ||||
-rw-r--r-- | src/lib.rs | 1 |
4 files changed, 63 insertions, 62 deletions
@@ -39,6 +39,7 @@ crossbeam-channel = "^0.1" derive_deref = "^1.0" failure = "^0.1" futures = "^0.1" +futures-cpupool = "^0.1" lazy_static = "^1" libc = { git = "https://github.com/rust-lang/libc" } log = "^0.4" diff --git a/src/crypto_pool.rs b/src/crypto_pool.rs index b8e4536..fefc2f4 100644 --- a/src/crypto_pool.rs +++ b/src/crypto_pool.rs @@ -4,6 +4,8 @@ use crossbeam_channel::{unbounded, Receiver, Sender}; use futures::sync::mpsc; use futures::executor; use futures::Sink; +use futures::future::*; +use futures_cpupool::CpuPool; use num_cpus; use snow::AsyncTransportState; use std::thread; @@ -47,73 +49,70 @@ pub struct DecryptResult { pub session_type: SessionType, } -/// Spawn a thread pool to efficiently process -/// the CPU-intensive encryption/decryption. -pub fn create() -> Sender<Work> { - let threads = num_cpus::get(); // One thread for I/O. - let (sender, receiver) = unbounded(); +pub struct Pool { + pool: CpuPool, +} - debug!("spinning up a crypto pool with {} threads", threads); - crossbeam::scope(|s| { - for _ in 0..threads { - let rx = receiver.clone(); - thread::spawn(move || worker(rx.clone())); +impl Pool { + pub fn new() -> Self { + let pool = CpuPool::new_num_cpus(); + Self { + pool } - }); + } - sender + pub fn send(&self, work: Work) { + self.pool.spawn_fn(move || { + worker(work); + ok::<(), ()>(()) + }).forget(); + } } -fn worker(receiver: Receiver<Work>) { - loop { - select_loop! { - recv(receiver, work) => { - match work { - Work::Decrypt((tx, element)) => { - let mut raw_packet = vec![0u8; element.packet.len()]; - let nonce = element.packet.nonce(); - let len = element.transport.read_transport_message(nonce, element.packet.payload(), &mut raw_packet).unwrap(); - if len > 0 { - let len = IpPacket::new(&raw_packet[..len]) - .ok_or_else(||format_err!("invalid IP packet (len {})", len)).unwrap() - .length(); - raw_packet.truncate(len as usize); - } else { - raw_packet.truncate(0); - } +fn worker(work: Work) { + match work { + Work::Decrypt((tx, element)) => { + let mut raw_packet = vec![0u8; element.packet.len()]; + let nonce = element.packet.nonce(); + let len = element.transport.read_transport_message(nonce, element.packet.payload(), &mut raw_packet).unwrap(); + if len > 0 { + let len = IpPacket::new(&raw_packet[..len]) + .ok_or_else(||format_err!("invalid IP packet (len {})", len)).unwrap() + .length(); + raw_packet.truncate(len as usize); + } else { + raw_packet.truncate(0); + } - executor::spawn(tx.send(DecryptResult { - endpoint: element.endpoint, - orig_packet: element.packet, - out_packet: raw_packet, - session_type: element.session_type, - })).wait_future().unwrap(); - }, - Work::Encrypt((tx, mut element)) => { - let padding = if element.in_packet.len() % PADDING_MULTIPLE != 0 { - PADDING_MULTIPLE - (element.in_packet.len() % PADDING_MULTIPLE) - } else { 0 }; - let padded_len = element.in_packet.len() + padding; - let mut out_packet = vec![0u8; padded_len + TRANSPORT_OVERHEAD]; + executor::spawn(tx.send(DecryptResult { + endpoint: element.endpoint, + orig_packet: element.packet, + out_packet: raw_packet, + session_type: element.session_type, + })).wait_future().unwrap(); + }, + Work::Encrypt((tx, mut element)) => { + let padding = if element.in_packet.len() % PADDING_MULTIPLE != 0 { + PADDING_MULTIPLE - (element.in_packet.len() % PADDING_MULTIPLE) + } else { 0 }; + let padded_len = element.in_packet.len() + padding; + let mut out_packet = vec![0u8; padded_len + TRANSPORT_OVERHEAD]; - out_packet[0] = 4; - LittleEndian::write_u32(&mut out_packet[4..], element.their_index); - LittleEndian::write_u64(&mut out_packet[8..], element.nonce); + out_packet[0] = 4; + LittleEndian::write_u32(&mut out_packet[4..], element.their_index); + LittleEndian::write_u64(&mut out_packet[8..], element.nonce); - element.in_packet.resize(padded_len, 0); - let len = element.transport.write_transport_message(element.nonce, - &element.in_packet, - &mut out_packet[16..]).unwrap(); - out_packet.truncate(TRANSPORT_HEADER_SIZE + len); + element.in_packet.resize(padded_len, 0); + let len = element.transport.write_transport_message(element.nonce, + &element.in_packet, + &mut out_packet[16..]).unwrap(); + out_packet.truncate(TRANSPORT_HEADER_SIZE + len); - executor::spawn(tx.send(EncryptResult { - endpoint: element.endpoint, - our_index: element.our_index, - out_packet, - })).wait_future().unwrap(); - } - } + executor::spawn(tx.send(EncryptResult { + endpoint: element.endpoint, + our_index: element.our_index, + out_packet, + })).wait_future().unwrap(); } } - } }
\ No newline at end of file diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs index 8d1b8ba..bafeed5 100644 --- a/src/interface/peer_server.rs +++ b/src/interface/peer_server.rs @@ -75,7 +75,7 @@ pub struct PeerServer { rate_limiter : RateLimiter, under_load_until : Instant, rng : ThreadRng, - crypto_pool : crossbeam::Sender<crypto_pool::Work>, + crypto_pool : crypto_pool::Pool, decrypt_channel : SyncChannel<crypto_pool::DecryptResult>, encrypt_channel : SyncChannel<crypto_pool::EncryptResult>, } @@ -95,7 +95,7 @@ impl PeerServer { rate_limiter : RateLimiter::new(&handle)?, under_load_until : Instant::now(), rng : rand::thread_rng(), - crypto_pool : crypto_pool::create(), + crypto_pool : crypto_pool::Pool::new(), decrypt_channel : sync::mpsc::unbounded().into(), encrypt_channel : sync::mpsc::unbounded().into(), }) @@ -315,7 +315,7 @@ impl PeerServer { let mut peer = peer_ref.borrow_mut(); let tx = self.decrypt_channel.tx.clone(); let work = crypto_pool::Work::Decrypt((tx, peer.handle_incoming_transport(addr, packet)?)); - self.crypto_pool.send(work)?; + self.crypto_pool.send(work); Ok(()) } @@ -323,7 +323,7 @@ impl PeerServer { fn encrypt_and_send(&mut self, peer: &mut Peer, packet: Vec<u8>) -> Result<(), Error> { let tx = self.encrypt_channel.tx.clone(); let work = crypto_pool::Work::Encrypt((tx, peer.handle_outgoing_transport(packet)?)); - self.crypto_pool.send(work)?; + self.crypto_pool.send(work); Ok(()) } @@ -22,6 +22,7 @@ extern crate byteorder; extern crate bytes; extern crate chacha20_poly1305_aead; extern crate crossbeam; +extern crate futures_cpupool; extern crate hex; extern crate libc; extern crate mio; |