aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock28
-rw-r--r--Cargo.toml2
-rw-r--r--src/crypto_pool.rs98
-rw-r--r--src/interface/peer_server.rs45
-rw-r--r--src/lib.rs2
5 files changed, 83 insertions, 92 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 48db648..ca65675 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -240,12 +240,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "crossbeam-channel"
-version = "0.1.3"
+version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"crossbeam-epoch 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
- "crossbeam-utils 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
+ "crossbeam-utils 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "libc 0.2.42 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)",
+ "smallvec 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -279,6 +281,11 @@ dependencies = [
]
[[package]]
+name = "crossbeam-utils"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
name = "curve25519-dalek"
version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -904,13 +911,13 @@ dependencies = [
[[package]]
name = "ring"
-version = "0.13.0-alpha4"
+version = "0.13.0-alpha5"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"cc 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.42 (registry+https://github.com/rust-lang/crates.io-index)",
- "untrusted 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "untrusted 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -1038,7 +1045,7 @@ dependencies = [
"failure 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.5.0-pre.2 (registry+https://github.com/rust-lang/crates.io-index)",
- "ring 0.13.0-alpha4 (registry+https://github.com/rust-lang/crates.io-index)",
+ "ring 0.13.0-alpha5 (registry+https://github.com/rust-lang/crates.io-index)",
"rust-crypto 0.2.36 (git+https://github.com/mcginty/rust-crypto)",
"rustc_version 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"static_slice 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -1461,7 +1468,7 @@ dependencies = [
[[package]]
name = "untrusted"
-version = "0.6.1"
+version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
@@ -1530,7 +1537,7 @@ dependencies = [
"colored 1.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
"criterion 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
- "crossbeam-channel 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
+ "crossbeam-channel 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"daemonize 0.2.3 (git+https://github.com/mcginty/daemonize)",
"derive_deref 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"failure 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -1610,10 +1617,11 @@ dependencies = [
"checksum criterion-plot 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "9f7f7c88a8d341dd9fd9e31a72ca2ca24428db79afb491852873b2c784e037e6"
"checksum criterion-stats 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "dd48feb0253b2968ff3085e7f3fba6738c9ff859f420a2fb81a48986eb66da36"
"checksum crossbeam 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "24ce9782d4d5c53674646a6a4c1863a21a8fc0cb649b3c94dfc16e45071dea19"
-"checksum crossbeam-channel 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "862becd07e73da5746de6d9b3ba055c9bb8b10afd0d2b51155a6e30d81cd20b3"
+"checksum crossbeam-channel 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b45c6ba620feae538943c106977c6348c16ad3b03dd8aaecd25a4224345fa795"
"checksum crossbeam-deque 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "fe8153ef04a7594ded05b427ffad46ddeaf22e63fd48d42b3e1e3bb4db07cae7"
"checksum crossbeam-epoch 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9b4e2817eb773f770dcb294127c011e22771899c21d18fce7dd739c0b9832e81"
"checksum crossbeam-utils 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d636a8b3bcc1b409d7ffd3facef8f21dcb4009626adbd0c5e6c4305c07253c7b"
+"checksum crossbeam-utils 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b71f220442ed14749909b543d4dd7ec3918cb1fe289fd96e88d0abe6ca049783"
"checksum curve25519-dalek 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e5808ccadbb61565fd184702be128ac43a6a33561bcd976a0d7a388b06ad696d"
"checksum daemonize 0.2.3 (git+https://github.com/mcginty/daemonize)" = "<none>"
"checksum derive_deref 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "232c0157c26fd44067bcee2025879139cc983d8eed21f8cb5965b6a44e23fb67"
@@ -1688,7 +1696,7 @@ dependencies = [
"checksum redox_termios 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7e891cfe48e9100a70a3b6eb652fef28920c117d366339687bd5576160db0f76"
"checksum regex 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "9329abc99e39129fcceabd24cf5d85b4671ef7c29c50e972bc5afe32438ec384"
"checksum regex-syntax 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)" = "7d707a4fa2637f2dca2ef9fd02225ec7661fe01a53623c1e6515b6916511f7a7"
-"checksum ring 0.13.0-alpha4 (registry+https://github.com/rust-lang/crates.io-index)" = "d835120175f4cfea267529ebf36fa3851ba1de1e44202f6241e5dee517edaebe"
+"checksum ring 0.13.0-alpha5 (registry+https://github.com/rust-lang/crates.io-index)" = "3845516753f91b4511f9b17c917ea6fa4bc5a7853a9947b0f66731aff51cdef5"
"checksum rust-crypto 0.2.36 (git+https://github.com/mcginty/rust-crypto)" = "<none>"
"checksum rustc-demangle 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "76d7ba1feafada44f2d38eed812bd2489a03c0f5abb975799251518b68848649"
"checksum rustc-serialize 0.3.24 (registry+https://github.com/rust-lang/crates.io-index)" = "dcf128d1287d2ea9d80910b5f1120d0b8eede3fbf1abe91c40d39ea7d51e6fda"
@@ -1751,7 +1759,7 @@ dependencies = [
"checksum unicode-xid 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f860d7d29cf02cb2f3f359fd35991af3d30bac52c57d265a3c461074cb4dc"
"checksum unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fc72304796d0818e357ead4e000d19c9c174ab23dc11093ac919054d20a6a7fc"
"checksum unreachable 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "382810877fe448991dfc7f0dd6e3ae5d58088fd0ea5e35189655f84e6814fa56"
-"checksum untrusted 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "70afa43c8c5d23a53a3c39ec9b56232c5badc19f6bb5ad529c1d6448a7241365"
+"checksum untrusted 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "55cd1f4b4e96b46aeb8d4855db4a7a9bd96eeeb5c6a1ab54593328761642ce2f"
"checksum utf8-ranges 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "662fab6525a98beff2921d7f61a39e7d59e0b425ebc7d0d9e66d316e55124122"
"checksum vec_map 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "05c78687fb1a80548ae3250346c3db86a80a7cdd77bda190189f2d0a0987c81a"
"checksum void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d"
diff --git a/Cargo.toml b/Cargo.toml
index 75de0a9..71f86e2 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -35,7 +35,7 @@ byteorder = "^1.2"
bytes = "0.4"
chacha20-poly1305-aead = "^0.1"
crossbeam = "^0.3"
-crossbeam-channel = "^0.1"
+crossbeam-channel = "^0.2"
derive_deref = "^1.0"
failure = "^0.1"
futures = "^0.1"
diff --git a/src/crypto_pool.rs b/src/crypto_pool.rs
index db5826e..c99002a 100644
--- a/src/crypto_pool.rs
+++ b/src/crypto_pool.rs
@@ -1,8 +1,6 @@
use consts::{PADDING_MULTIPLE, TRANSPORT_OVERHEAD, TRANSPORT_HEADER_SIZE};
use crossbeam_channel::{bounded, Receiver, Sender};
-use futures::sync::mpsc;
-use futures::executor;
-use futures::Sink;
+use futures::task::Task;
use num_cpus;
use snow::AsyncTransportState;
use std::thread;
@@ -13,8 +11,8 @@ use ip_packet::IpPacket;
use byteorder::{ByteOrder, LittleEndian};
pub enum Work {
- Decrypt((mpsc::UnboundedSender<DecryptResult>, DecryptWork)),
- Encrypt((mpsc::UnboundedSender<EncryptResult>, EncryptWork)),
+ Decrypt((Sender<DecryptResult>, Task, DecryptWork)),
+ Encrypt((Sender<EncryptResult>, Task, EncryptWork)),
}
pub struct EncryptWork {
@@ -49,7 +47,7 @@ pub struct DecryptResult {
/// Spawn a thread pool to efficiently process
/// the CPU-intensive encryption/decryption.
pub fn create() -> Sender<Work> {
- let threads = num_cpus::get(); // One thread for I/O.
+ let threads = num_cpus::get() - 2; // One thread for I/O.
let (sender, receiver) = bounded(4096);
debug!("spinning up a crypto pool with {} threads", threads);
@@ -62,55 +60,55 @@ pub fn create() -> Sender<Work> {
}
fn worker(receiver: Receiver<Work>) {
- loop {
- select_loop! {
- recv(receiver, work) => {
- match work {
- Work::Decrypt((tx, element)) => {
- let mut raw_packet = vec![0u8; element.packet.len()];
- let nonce = element.packet.nonce();
- let len = element.transport.read_transport_message(nonce, element.packet.payload(), &mut raw_packet).unwrap();
- if len > 0 {
- let len = IpPacket::new(&raw_packet[..len])
- .ok_or_else(||format_err!("invalid IP packet (len {})", len)).unwrap()
- .length();
- raw_packet.truncate(len as usize);
- } else {
- raw_packet.truncate(0);
- }
+ while let Some(work) = receiver.recv() {
+ match work {
+ Work::Decrypt((tx, task, element)) => {
+ let mut raw_packet = vec![0u8; element.packet.len()];
+ let nonce = element.packet.nonce();
+ let len = element.transport.read_transport_message(nonce, element.packet.payload(), &mut raw_packet).unwrap();
+ if len > 0 {
+ let len = IpPacket::new(&raw_packet[..len])
+ .ok_or_else(||format_err!("invalid IP packet (len {})", len)).unwrap()
+ .length();
+ raw_packet.truncate(len as usize);
+ } else {
+ raw_packet.truncate(0);
+ }
- executor::spawn(tx.send(DecryptResult {
- endpoint: element.endpoint,
- orig_packet: element.packet,
- out_packet: raw_packet,
- session_type: element.session_type,
- })).wait_future().unwrap();
- },
- Work::Encrypt((tx, mut element)) => {
- let padding = if element.in_packet.len() % PADDING_MULTIPLE != 0 {
- PADDING_MULTIPLE - (element.in_packet.len() % PADDING_MULTIPLE)
- } else { 0 };
- let padded_len = element.in_packet.len() + padding;
- let mut out_packet = vec![0u8; padded_len + TRANSPORT_OVERHEAD];
+ tx.send(DecryptResult {
+ endpoint: element.endpoint,
+ orig_packet: element.packet,
+ out_packet: raw_packet,
+ session_type: element.session_type,
+ });
- out_packet[0] = 4;
- LittleEndian::write_u32(&mut out_packet[4..], element.their_index);
- LittleEndian::write_u64(&mut out_packet[8..], element.nonce);
+ task.notify();
+ },
+ Work::Encrypt((tx, task, mut element)) => {
+ let padding = if element.in_packet.len() % PADDING_MULTIPLE != 0 {
+ PADDING_MULTIPLE - (element.in_packet.len() % PADDING_MULTIPLE)
+ } else { 0 };
+ let padded_len = element.in_packet.len() + padding;
+ let mut out_packet = vec![0u8; padded_len + TRANSPORT_OVERHEAD];
- element.in_packet.resize(padded_len, 0);
- let len = element.transport.write_transport_message(element.nonce,
- &element.in_packet,
- &mut out_packet[16..]).unwrap();
- out_packet.truncate(TRANSPORT_HEADER_SIZE + len);
+ out_packet[0] = 4;
+ LittleEndian::write_u32(&mut out_packet[4..], element.their_index);
+ LittleEndian::write_u64(&mut out_packet[8..], element.nonce);
- executor::spawn(tx.send(EncryptResult {
- endpoint: element.endpoint,
- our_index: element.our_index,
- out_packet,
- })).wait_future().unwrap();
- }
+ element.in_packet.resize(padded_len, 0);
+ let len = element.transport.write_transport_message(element.nonce,
+ &element.in_packet,
+ &mut out_packet[16..]).unwrap();
+ out_packet.truncate(TRANSPORT_HEADER_SIZE + len);
+
+ tx.send(EncryptResult {
+ endpoint: element.endpoint,
+ our_index: element.our_index,
+ out_packet,
+ });
+
+ task.notify();
}
}
}
- }
} \ No newline at end of file
diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs
index 8d1b8ba..aa69b72 100644
--- a/src/interface/peer_server.rs
+++ b/src/interface/peer_server.rs
@@ -13,7 +13,7 @@ use timer::{Timer, TimerMessage};
use byteorder::{ByteOrder, LittleEndian};
use crossbeam_channel as crossbeam;
use failure::{Error, err_msg};
-use futures::{Async, Future, Stream, Poll, sync, unsync::mpsc, task};
+use futures::{Async, Future, Stream, Poll, unsync::mpsc, task};
use rand::{self, Rng, ThreadRng};
use udp::{Endpoint, UdpSocket, PeerServerMessage, UdpChannel};
use tokio_core::reactor::Handle;
@@ -39,12 +39,12 @@ struct Channel<T> {
}
struct SyncChannel<T> {
- tx: sync::mpsc::UnboundedSender<T>,
- rx: sync::mpsc::UnboundedReceiver<T>,
+ tx: crossbeam::Sender<T>,
+ rx: crossbeam::Receiver<T>,
}
-impl<T> From<(sync::mpsc::UnboundedSender<T>, sync::mpsc::UnboundedReceiver<T>)> for SyncChannel<T> {
- fn from(pair: (sync::mpsc::UnboundedSender<T>, sync::mpsc::UnboundedReceiver<T>)) -> Self {
+impl<T> From<(crossbeam::Sender<T>, crossbeam::Receiver<T>)> for SyncChannel<T> {
+ fn from(pair: (crossbeam::Sender<T>, crossbeam::Receiver<T>)) -> Self {
Self {
tx: pair.0,
rx: pair.1,
@@ -96,8 +96,8 @@ impl PeerServer {
under_load_until : Instant::now(),
rng : rand::thread_rng(),
crypto_pool : crypto_pool::create(),
- decrypt_channel : sync::mpsc::unbounded().into(),
- encrypt_channel : sync::mpsc::unbounded().into(),
+ decrypt_channel : crossbeam::unbounded().into(),
+ encrypt_channel : crossbeam::unbounded().into(),
})
}
@@ -314,16 +314,16 @@ impl PeerServer {
let mut peer = peer_ref.borrow_mut();
let tx = self.decrypt_channel.tx.clone();
- let work = crypto_pool::Work::Decrypt((tx, peer.handle_incoming_transport(addr, packet)?));
- self.crypto_pool.send(work)?;
+ let work = crypto_pool::Work::Decrypt((tx, task::current(), peer.handle_incoming_transport(addr, packet)?));
+ self.crypto_pool.send(work);
Ok(())
}
fn encrypt_and_send(&mut self, peer: &mut Peer, packet: Vec<u8>) -> Result<(), Error> {
let tx = self.encrypt_channel.tx.clone();
- let work = crypto_pool::Work::Encrypt((tx, peer.handle_outgoing_transport(packet)?));
- self.crypto_pool.send(work)?;
+ let work = crypto_pool::Work::Encrypt((tx, task::current(), peer.handle_outgoing_transport(packet)?));
+ self.crypto_pool.send(work);
Ok(())
}
@@ -651,27 +651,12 @@ impl Future for PeerServer {
}
}
- loop {
- match self.decrypt_channel.rx.poll() {
- Ok(Async::Ready(Some(result))) => {
- let _ = self.handle_ingress_decrypted_transport(result).map_err(|e| warn!("ingress decrypt: {:?}", e));
- },
- Ok(Async::NotReady) => { break; },
- Ok(Async::Ready(None)) => bail!("incoming udp stream ended unexpectedly"),
- Err(e) => bail!("incoming udp stream error: {:?}", e)
- }
+ while let Some(result) = self.decrypt_channel.rx.try_recv() {
+ let _ = self.handle_ingress_decrypted_transport(result).map_err(|e| warn!("ingress decrypt: {:?}", e));
}
- loop {
- // Handle UDP packets from the outside world
- match self.encrypt_channel.rx.poll() {
- Ok(Async::Ready(Some(result))) => {
- let _ = self.handle_egress_encrypted_packet(result).map_err(|e| warn!("egress encrypted: {:?}", e));
- },
- Ok(Async::NotReady) => { break; },
- Ok(Async::Ready(None)) => bail!("incoming udp stream ended unexpectedly"),
- Err(e) => bail!("incoming udp stream error: {:?}", e)
- }
+ while let Some(result) = self.encrypt_channel.rx.try_recv() {
+ let _ = self.handle_egress_encrypted_packet(result).map_err(|e| warn!("egress encrypted: {:?}", e));
}
loop {
diff --git a/src/lib.rs b/src/lib.rs
index 106124d..9ebaebb 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -9,7 +9,6 @@
#![cfg_attr(feature = "cargo-clippy", allow(unreadable_literal))]
#![cfg_attr(feature = "cargo-clippy", allow(decimal_literal_representation))]
-#[macro_use] extern crate crossbeam_channel;
#[macro_use] extern crate derive_deref;
#[macro_use] extern crate failure;
#[macro_use] extern crate futures;
@@ -22,6 +21,7 @@ extern crate byteorder;
extern crate bytes;
extern crate chacha20_poly1305_aead;
extern crate crossbeam;
+extern crate crossbeam_channel;
extern crate hex;
extern crate libc;
extern crate mio;