aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Cargo.toml3
-rw-r--r--src/crypto_pool.rs95
-rw-r--r--src/lib.rs4
3 files changed, 53 insertions, 49 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 06d7852..75de0a9 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -34,7 +34,8 @@ blake2-rfc = "0.2"
byteorder = "^1.2"
bytes = "0.4"
chacha20-poly1305-aead = "^0.1"
-crossbeam-channel = "0.1"
+crossbeam = "^0.3"
+crossbeam-channel = "^0.1"
derive_deref = "^1.0"
failure = "^0.1"
futures = "^0.1"
diff --git a/src/crypto_pool.rs b/src/crypto_pool.rs
index 2208e98..e0756e9 100644
--- a/src/crypto_pool.rs
+++ b/src/crypto_pool.rs
@@ -1,4 +1,5 @@
use consts::{PADDING_MULTIPLE, TRANSPORT_OVERHEAD, TRANSPORT_HEADER_SIZE};
+use crossbeam;
use crossbeam_channel::{unbounded, Receiver, Sender};
use futures::sync::mpsc;
use futures::executor;
@@ -52,61 +53,63 @@ pub fn create() -> Sender<Work> {
let threads = num_cpus::get() - 1; // One thread for I/O.
let (sender, receiver) = unbounded();
- for i in 0..threads {
- let rx = receiver.clone();
- thread::Builder::new().name(format!("wireguard-rs-crypto-{}", i))
- .spawn(move || worker(rx.clone())).unwrap();
- }
+ crossbeam::scope(|s| {
+ for i in 0..threads {
+ let rx = receiver.clone();
+ s.spawn(move || worker(rx.clone()));
+ }
+ });
sender
}
fn worker(receiver: Receiver<Work>) {
- loop {
- let work = receiver.recv().expect("channel to crypto worker thread broken.");
- match work {
- Work::Decrypt((tx, element)) => {
- let mut raw_packet = vec![0u8; element.packet.len()];
- let nonce = element.packet.nonce();
- let len = element.transport.read_transport_message(nonce, element.packet.payload(), &mut raw_packet).unwrap();
- if len > 0 {
- let len = IpPacket::new(&raw_packet[..len])
- .ok_or_else(||format_err!("invalid IP packet (len {})", len)).unwrap()
- .length();
- raw_packet.truncate(len as usize);
- } else {
- raw_packet.truncate(0);
- }
+ select_loop! {
+ recv(receiver, work) => {
+ match work {
+ Work::Decrypt((tx, element)) => {
+ let mut raw_packet = vec![0u8; element.packet.len()];
+ let nonce = element.packet.nonce();
+ let len = element.transport.read_transport_message(nonce, element.packet.payload(), &mut raw_packet).unwrap();
+ if len > 0 {
+ let len = IpPacket::new(&raw_packet[..len])
+ .ok_or_else(||format_err!("invalid IP packet (len {})", len)).unwrap()
+ .length();
+ raw_packet.truncate(len as usize);
+ } else {
+ raw_packet.truncate(0);
+ }
- tx.unbounded_send(DecryptResult {
- endpoint: element.endpoint,
- orig_packet: element.packet,
- out_packet: raw_packet,
- session_type: element.session_type,
- }).unwrap();
- },
- Work::Encrypt((tx, mut element)) => {
- let padding = if element.in_packet.len() % PADDING_MULTIPLE != 0 {
- PADDING_MULTIPLE - (element.in_packet.len() % PADDING_MULTIPLE)
- } else { 0 };
- let padded_len = element.in_packet.len() + padding;
- let mut out_packet = vec![0u8; padded_len + TRANSPORT_OVERHEAD];
+ tx.unbounded_send(DecryptResult {
+ endpoint: element.endpoint,
+ orig_packet: element.packet,
+ out_packet: raw_packet,
+ session_type: element.session_type,
+ }).unwrap();
+ },
+ Work::Encrypt((tx, mut element)) => {
+ let padding = if element.in_packet.len() % PADDING_MULTIPLE != 0 {
+ PADDING_MULTIPLE - (element.in_packet.len() % PADDING_MULTIPLE)
+ } else { 0 };
+ let padded_len = element.in_packet.len() + padding;
+ let mut out_packet = vec![0u8; padded_len + TRANSPORT_OVERHEAD];
- out_packet[0] = 4;
- LittleEndian::write_u32(&mut out_packet[4..], element.their_index);
- LittleEndian::write_u64(&mut out_packet[8..], element.nonce);
+ out_packet[0] = 4;
+ LittleEndian::write_u32(&mut out_packet[4..], element.their_index);
+ LittleEndian::write_u64(&mut out_packet[8..], element.nonce);
- element.in_packet.resize(padded_len, 0);
- let len = element.transport.write_transport_message(element.nonce,
- &element.in_packet,
- &mut out_packet[16..]).unwrap();
- out_packet.truncate(TRANSPORT_HEADER_SIZE + len);
+ element.in_packet.resize(padded_len, 0);
+ let len = element.transport.write_transport_message(element.nonce,
+ &element.in_packet,
+ &mut out_packet[16..]).unwrap();
+ out_packet.truncate(TRANSPORT_HEADER_SIZE + len);
- tx.unbounded_send(EncryptResult {
- endpoint: element.endpoint,
- our_index: element.our_index,
- out_packet,
- }).unwrap();
+ tx.unbounded_send(EncryptResult {
+ endpoint: element.endpoint,
+ our_index: element.our_index,
+ out_packet,
+ }).unwrap();
+ }
}
}
}
diff --git a/src/lib.rs b/src/lib.rs
index 0fb68f3..106124d 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -9,19 +9,19 @@
#![cfg_attr(feature = "cargo-clippy", allow(unreadable_literal))]
#![cfg_attr(feature = "cargo-clippy", allow(decimal_literal_representation))]
+#[macro_use] extern crate crossbeam_channel;
#[macro_use] extern crate derive_deref;
#[macro_use] extern crate failure;
#[macro_use] extern crate futures;
#[macro_use] extern crate lazy_static;
#[macro_use] extern crate log;
#[macro_use] extern crate tokio_core;
-
extern crate base64;
extern crate blake2_rfc;
extern crate byteorder;
extern crate bytes;
extern crate chacha20_poly1305_aead;
-extern crate crossbeam_channel;
+extern crate crossbeam;
extern crate hex;
extern crate libc;
extern crate mio;