aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/crypto_pool.rs98
-rw-r--r--src/interface/peer_server.rs45
-rw-r--r--src/lib.rs2
3 files changed, 64 insertions, 81 deletions
diff --git a/src/crypto_pool.rs b/src/crypto_pool.rs
index db5826e..c99002a 100644
--- a/src/crypto_pool.rs
+++ b/src/crypto_pool.rs
@@ -1,8 +1,6 @@
use consts::{PADDING_MULTIPLE, TRANSPORT_OVERHEAD, TRANSPORT_HEADER_SIZE};
use crossbeam_channel::{bounded, Receiver, Sender};
-use futures::sync::mpsc;
-use futures::executor;
-use futures::Sink;
+use futures::task::Task;
use num_cpus;
use snow::AsyncTransportState;
use std::thread;
@@ -13,8 +11,8 @@ use ip_packet::IpPacket;
use byteorder::{ByteOrder, LittleEndian};
pub enum Work {
- Decrypt((mpsc::UnboundedSender<DecryptResult>, DecryptWork)),
- Encrypt((mpsc::UnboundedSender<EncryptResult>, EncryptWork)),
+ Decrypt((Sender<DecryptResult>, Task, DecryptWork)),
+ Encrypt((Sender<EncryptResult>, Task, EncryptWork)),
}
pub struct EncryptWork {
@@ -49,7 +47,7 @@ pub struct DecryptResult {
/// Spawn a thread pool to efficiently process
/// the CPU-intensive encryption/decryption.
pub fn create() -> Sender<Work> {
- let threads = num_cpus::get(); // One thread for I/O.
+ let threads = num_cpus::get() - 2; // One thread for I/O.
let (sender, receiver) = bounded(4096);
debug!("spinning up a crypto pool with {} threads", threads);
@@ -62,55 +60,55 @@ pub fn create() -> Sender<Work> {
}
fn worker(receiver: Receiver<Work>) {
- loop {
- select_loop! {
- recv(receiver, work) => {
- match work {
- Work::Decrypt((tx, element)) => {
- let mut raw_packet = vec![0u8; element.packet.len()];
- let nonce = element.packet.nonce();
- let len = element.transport.read_transport_message(nonce, element.packet.payload(), &mut raw_packet).unwrap();
- if len > 0 {
- let len = IpPacket::new(&raw_packet[..len])
- .ok_or_else(||format_err!("invalid IP packet (len {})", len)).unwrap()
- .length();
- raw_packet.truncate(len as usize);
- } else {
- raw_packet.truncate(0);
- }
+ while let Some(work) = receiver.recv() {
+ match work {
+ Work::Decrypt((tx, task, element)) => {
+ let mut raw_packet = vec![0u8; element.packet.len()];
+ let nonce = element.packet.nonce();
+ let len = element.transport.read_transport_message(nonce, element.packet.payload(), &mut raw_packet).unwrap();
+ if len > 0 {
+ let len = IpPacket::new(&raw_packet[..len])
+ .ok_or_else(||format_err!("invalid IP packet (len {})", len)).unwrap()
+ .length();
+ raw_packet.truncate(len as usize);
+ } else {
+ raw_packet.truncate(0);
+ }
- executor::spawn(tx.send(DecryptResult {
- endpoint: element.endpoint,
- orig_packet: element.packet,
- out_packet: raw_packet,
- session_type: element.session_type,
- })).wait_future().unwrap();
- },
- Work::Encrypt((tx, mut element)) => {
- let padding = if element.in_packet.len() % PADDING_MULTIPLE != 0 {
- PADDING_MULTIPLE - (element.in_packet.len() % PADDING_MULTIPLE)
- } else { 0 };
- let padded_len = element.in_packet.len() + padding;
- let mut out_packet = vec![0u8; padded_len + TRANSPORT_OVERHEAD];
+ tx.send(DecryptResult {
+ endpoint: element.endpoint,
+ orig_packet: element.packet,
+ out_packet: raw_packet,
+ session_type: element.session_type,
+ });
- out_packet[0] = 4;
- LittleEndian::write_u32(&mut out_packet[4..], element.their_index);
- LittleEndian::write_u64(&mut out_packet[8..], element.nonce);
+ task.notify();
+ },
+ Work::Encrypt((tx, task, mut element)) => {
+ let padding = if element.in_packet.len() % PADDING_MULTIPLE != 0 {
+ PADDING_MULTIPLE - (element.in_packet.len() % PADDING_MULTIPLE)
+ } else { 0 };
+ let padded_len = element.in_packet.len() + padding;
+ let mut out_packet = vec![0u8; padded_len + TRANSPORT_OVERHEAD];
- element.in_packet.resize(padded_len, 0);
- let len = element.transport.write_transport_message(element.nonce,
- &element.in_packet,
- &mut out_packet[16..]).unwrap();
- out_packet.truncate(TRANSPORT_HEADER_SIZE + len);
+ out_packet[0] = 4;
+ LittleEndian::write_u32(&mut out_packet[4..], element.their_index);
+ LittleEndian::write_u64(&mut out_packet[8..], element.nonce);
- executor::spawn(tx.send(EncryptResult {
- endpoint: element.endpoint,
- our_index: element.our_index,
- out_packet,
- })).wait_future().unwrap();
- }
+ element.in_packet.resize(padded_len, 0);
+ let len = element.transport.write_transport_message(element.nonce,
+ &element.in_packet,
+ &mut out_packet[16..]).unwrap();
+ out_packet.truncate(TRANSPORT_HEADER_SIZE + len);
+
+ tx.send(EncryptResult {
+ endpoint: element.endpoint,
+ our_index: element.our_index,
+ out_packet,
+ });
+
+ task.notify();
}
}
}
- }
} \ No newline at end of file
diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs
index 8d1b8ba..aa69b72 100644
--- a/src/interface/peer_server.rs
+++ b/src/interface/peer_server.rs
@@ -13,7 +13,7 @@ use timer::{Timer, TimerMessage};
use byteorder::{ByteOrder, LittleEndian};
use crossbeam_channel as crossbeam;
use failure::{Error, err_msg};
-use futures::{Async, Future, Stream, Poll, sync, unsync::mpsc, task};
+use futures::{Async, Future, Stream, Poll, unsync::mpsc, task};
use rand::{self, Rng, ThreadRng};
use udp::{Endpoint, UdpSocket, PeerServerMessage, UdpChannel};
use tokio_core::reactor::Handle;
@@ -39,12 +39,12 @@ struct Channel<T> {
}
struct SyncChannel<T> {
- tx: sync::mpsc::UnboundedSender<T>,
- rx: sync::mpsc::UnboundedReceiver<T>,
+ tx: crossbeam::Sender<T>,
+ rx: crossbeam::Receiver<T>,
}
-impl<T> From<(sync::mpsc::UnboundedSender<T>, sync::mpsc::UnboundedReceiver<T>)> for SyncChannel<T> {
- fn from(pair: (sync::mpsc::UnboundedSender<T>, sync::mpsc::UnboundedReceiver<T>)) -> Self {
+impl<T> From<(crossbeam::Sender<T>, crossbeam::Receiver<T>)> for SyncChannel<T> {
+ fn from(pair: (crossbeam::Sender<T>, crossbeam::Receiver<T>)) -> Self {
Self {
tx: pair.0,
rx: pair.1,
@@ -96,8 +96,8 @@ impl PeerServer {
under_load_until : Instant::now(),
rng : rand::thread_rng(),
crypto_pool : crypto_pool::create(),
- decrypt_channel : sync::mpsc::unbounded().into(),
- encrypt_channel : sync::mpsc::unbounded().into(),
+ decrypt_channel : crossbeam::unbounded().into(),
+ encrypt_channel : crossbeam::unbounded().into(),
})
}
@@ -314,16 +314,16 @@ impl PeerServer {
let mut peer = peer_ref.borrow_mut();
let tx = self.decrypt_channel.tx.clone();
- let work = crypto_pool::Work::Decrypt((tx, peer.handle_incoming_transport(addr, packet)?));
- self.crypto_pool.send(work)?;
+ let work = crypto_pool::Work::Decrypt((tx, task::current(), peer.handle_incoming_transport(addr, packet)?));
+ self.crypto_pool.send(work);
Ok(())
}
fn encrypt_and_send(&mut self, peer: &mut Peer, packet: Vec<u8>) -> Result<(), Error> {
let tx = self.encrypt_channel.tx.clone();
- let work = crypto_pool::Work::Encrypt((tx, peer.handle_outgoing_transport(packet)?));
- self.crypto_pool.send(work)?;
+ let work = crypto_pool::Work::Encrypt((tx, task::current(), peer.handle_outgoing_transport(packet)?));
+ self.crypto_pool.send(work);
Ok(())
}
@@ -651,27 +651,12 @@ impl Future for PeerServer {
}
}
- loop {
- match self.decrypt_channel.rx.poll() {
- Ok(Async::Ready(Some(result))) => {
- let _ = self.handle_ingress_decrypted_transport(result).map_err(|e| warn!("ingress decrypt: {:?}", e));
- },
- Ok(Async::NotReady) => { break; },
- Ok(Async::Ready(None)) => bail!("incoming udp stream ended unexpectedly"),
- Err(e) => bail!("incoming udp stream error: {:?}", e)
- }
+ while let Some(result) = self.decrypt_channel.rx.try_recv() {
+ let _ = self.handle_ingress_decrypted_transport(result).map_err(|e| warn!("ingress decrypt: {:?}", e));
}
- loop {
- // Handle UDP packets from the outside world
- match self.encrypt_channel.rx.poll() {
- Ok(Async::Ready(Some(result))) => {
- let _ = self.handle_egress_encrypted_packet(result).map_err(|e| warn!("egress encrypted: {:?}", e));
- },
- Ok(Async::NotReady) => { break; },
- Ok(Async::Ready(None)) => bail!("incoming udp stream ended unexpectedly"),
- Err(e) => bail!("incoming udp stream error: {:?}", e)
- }
+ while let Some(result) = self.encrypt_channel.rx.try_recv() {
+ let _ = self.handle_egress_encrypted_packet(result).map_err(|e| warn!("egress encrypted: {:?}", e));
}
loop {
diff --git a/src/lib.rs b/src/lib.rs
index 106124d..9ebaebb 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -9,7 +9,6 @@
#![cfg_attr(feature = "cargo-clippy", allow(unreadable_literal))]
#![cfg_attr(feature = "cargo-clippy", allow(decimal_literal_representation))]
-#[macro_use] extern crate crossbeam_channel;
#[macro_use] extern crate derive_deref;
#[macro_use] extern crate failure;
#[macro_use] extern crate futures;
@@ -22,6 +21,7 @@ extern crate byteorder;
extern crate bytes;
extern crate chacha20_poly1305_aead;
extern crate crossbeam;
+extern crate crossbeam_channel;
extern crate hex;
extern crate libc;
extern crate mio;