aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJake McGinty <me@jake.su>2018-06-02 00:21:47 -0500
committerJake McGinty <me@jake.su>2018-06-02 00:22:21 -0500
commit24bfd58d21a12be33601de8b683e42744711b96c (patch)
tree1cc8c9f0ffe0a03d163b96062221108636280d34
parentwell, this is interesting. (diff)
downloadwireguard-rs-24bfd58d21a12be33601de8b683e42744711b96c.tar.xz
wireguard-rs-24bfd58d21a12be33601de8b683e42744711b96c.zip
try using futures_cpupool with new layout
-rw-r--r--Cargo.toml1
-rw-r--r--src/crypto_pool.rs115
-rw-r--r--src/interface/peer_server.rs8
-rw-r--r--src/lib.rs1
4 files changed, 63 insertions, 62 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 75de0a9..5122f67 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -39,6 +39,7 @@ crossbeam-channel = "^0.1"
derive_deref = "^1.0"
failure = "^0.1"
futures = "^0.1"
+futures-cpupool = "^0.1"
lazy_static = "^1"
libc = { git = "https://github.com/rust-lang/libc" }
log = "^0.4"
diff --git a/src/crypto_pool.rs b/src/crypto_pool.rs
index b8e4536..fefc2f4 100644
--- a/src/crypto_pool.rs
+++ b/src/crypto_pool.rs
@@ -4,6 +4,8 @@ use crossbeam_channel::{unbounded, Receiver, Sender};
use futures::sync::mpsc;
use futures::executor;
use futures::Sink;
+use futures::future::*;
+use futures_cpupool::CpuPool;
use num_cpus;
use snow::AsyncTransportState;
use std::thread;
@@ -47,73 +49,70 @@ pub struct DecryptResult {
pub session_type: SessionType,
}
-/// Spawn a thread pool to efficiently process
-/// the CPU-intensive encryption/decryption.
-pub fn create() -> Sender<Work> {
- let threads = num_cpus::get(); // One thread for I/O.
- let (sender, receiver) = unbounded();
+pub struct Pool {
+ pool: CpuPool,
+}
- debug!("spinning up a crypto pool with {} threads", threads);
- crossbeam::scope(|s| {
- for _ in 0..threads {
- let rx = receiver.clone();
- thread::spawn(move || worker(rx.clone()));
+impl Pool {
+ pub fn new() -> Self {
+ let pool = CpuPool::new_num_cpus();
+ Self {
+ pool
}
- });
+ }
- sender
+ pub fn send(&self, work: Work) {
+ self.pool.spawn_fn(move || {
+ worker(work);
+ ok::<(), ()>(())
+ }).forget();
+ }
}
-fn worker(receiver: Receiver<Work>) {
- loop {
- select_loop! {
- recv(receiver, work) => {
- match work {
- Work::Decrypt((tx, element)) => {
- let mut raw_packet = vec![0u8; element.packet.len()];
- let nonce = element.packet.nonce();
- let len = element.transport.read_transport_message(nonce, element.packet.payload(), &mut raw_packet).unwrap();
- if len > 0 {
- let len = IpPacket::new(&raw_packet[..len])
- .ok_or_else(||format_err!("invalid IP packet (len {})", len)).unwrap()
- .length();
- raw_packet.truncate(len as usize);
- } else {
- raw_packet.truncate(0);
- }
+fn worker(work: Work) {
+ match work {
+ Work::Decrypt((tx, element)) => {
+ let mut raw_packet = vec![0u8; element.packet.len()];
+ let nonce = element.packet.nonce();
+ let len = element.transport.read_transport_message(nonce, element.packet.payload(), &mut raw_packet).unwrap();
+ if len > 0 {
+ let len = IpPacket::new(&raw_packet[..len])
+ .ok_or_else(||format_err!("invalid IP packet (len {})", len)).unwrap()
+ .length();
+ raw_packet.truncate(len as usize);
+ } else {
+ raw_packet.truncate(0);
+ }
- executor::spawn(tx.send(DecryptResult {
- endpoint: element.endpoint,
- orig_packet: element.packet,
- out_packet: raw_packet,
- session_type: element.session_type,
- })).wait_future().unwrap();
- },
- Work::Encrypt((tx, mut element)) => {
- let padding = if element.in_packet.len() % PADDING_MULTIPLE != 0 {
- PADDING_MULTIPLE - (element.in_packet.len() % PADDING_MULTIPLE)
- } else { 0 };
- let padded_len = element.in_packet.len() + padding;
- let mut out_packet = vec![0u8; padded_len + TRANSPORT_OVERHEAD];
+ executor::spawn(tx.send(DecryptResult {
+ endpoint: element.endpoint,
+ orig_packet: element.packet,
+ out_packet: raw_packet,
+ session_type: element.session_type,
+ })).wait_future().unwrap();
+ },
+ Work::Encrypt((tx, mut element)) => {
+ let padding = if element.in_packet.len() % PADDING_MULTIPLE != 0 {
+ PADDING_MULTIPLE - (element.in_packet.len() % PADDING_MULTIPLE)
+ } else { 0 };
+ let padded_len = element.in_packet.len() + padding;
+ let mut out_packet = vec![0u8; padded_len + TRANSPORT_OVERHEAD];
- out_packet[0] = 4;
- LittleEndian::write_u32(&mut out_packet[4..], element.their_index);
- LittleEndian::write_u64(&mut out_packet[8..], element.nonce);
+ out_packet[0] = 4;
+ LittleEndian::write_u32(&mut out_packet[4..], element.their_index);
+ LittleEndian::write_u64(&mut out_packet[8..], element.nonce);
- element.in_packet.resize(padded_len, 0);
- let len = element.transport.write_transport_message(element.nonce,
- &element.in_packet,
- &mut out_packet[16..]).unwrap();
- out_packet.truncate(TRANSPORT_HEADER_SIZE + len);
+ element.in_packet.resize(padded_len, 0);
+ let len = element.transport.write_transport_message(element.nonce,
+ &element.in_packet,
+ &mut out_packet[16..]).unwrap();
+ out_packet.truncate(TRANSPORT_HEADER_SIZE + len);
- executor::spawn(tx.send(EncryptResult {
- endpoint: element.endpoint,
- our_index: element.our_index,
- out_packet,
- })).wait_future().unwrap();
- }
- }
+ executor::spawn(tx.send(EncryptResult {
+ endpoint: element.endpoint,
+ our_index: element.our_index,
+ out_packet,
+ })).wait_future().unwrap();
}
}
- }
} \ No newline at end of file
diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs
index 8d1b8ba..bafeed5 100644
--- a/src/interface/peer_server.rs
+++ b/src/interface/peer_server.rs
@@ -75,7 +75,7 @@ pub struct PeerServer {
rate_limiter : RateLimiter,
under_load_until : Instant,
rng : ThreadRng,
- crypto_pool : crossbeam::Sender<crypto_pool::Work>,
+ crypto_pool : crypto_pool::Pool,
decrypt_channel : SyncChannel<crypto_pool::DecryptResult>,
encrypt_channel : SyncChannel<crypto_pool::EncryptResult>,
}
@@ -95,7 +95,7 @@ impl PeerServer {
rate_limiter : RateLimiter::new(&handle)?,
under_load_until : Instant::now(),
rng : rand::thread_rng(),
- crypto_pool : crypto_pool::create(),
+ crypto_pool : crypto_pool::Pool::new(),
decrypt_channel : sync::mpsc::unbounded().into(),
encrypt_channel : sync::mpsc::unbounded().into(),
})
@@ -315,7 +315,7 @@ impl PeerServer {
let mut peer = peer_ref.borrow_mut();
let tx = self.decrypt_channel.tx.clone();
let work = crypto_pool::Work::Decrypt((tx, peer.handle_incoming_transport(addr, packet)?));
- self.crypto_pool.send(work)?;
+ self.crypto_pool.send(work);
Ok(())
}
@@ -323,7 +323,7 @@ impl PeerServer {
fn encrypt_and_send(&mut self, peer: &mut Peer, packet: Vec<u8>) -> Result<(), Error> {
let tx = self.encrypt_channel.tx.clone();
let work = crypto_pool::Work::Encrypt((tx, peer.handle_outgoing_transport(packet)?));
- self.crypto_pool.send(work)?;
+ self.crypto_pool.send(work);
Ok(())
}
diff --git a/src/lib.rs b/src/lib.rs
index 106124d..12717da 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -22,6 +22,7 @@ extern crate byteorder;
extern crate bytes;
extern crate chacha20_poly1305_aead;
extern crate crossbeam;
+extern crate futures_cpupool;
extern crate hex;
extern crate libc;
extern crate mio;