diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/crypto_pool.rs | 98 | ||||
-rw-r--r-- | src/interface/peer_server.rs | 45 | ||||
-rw-r--r-- | src/lib.rs | 2 |
3 files changed, 64 insertions, 81 deletions
diff --git a/src/crypto_pool.rs b/src/crypto_pool.rs index db5826e..c99002a 100644 --- a/src/crypto_pool.rs +++ b/src/crypto_pool.rs @@ -1,8 +1,6 @@ use consts::{PADDING_MULTIPLE, TRANSPORT_OVERHEAD, TRANSPORT_HEADER_SIZE}; use crossbeam_channel::{bounded, Receiver, Sender}; -use futures::sync::mpsc; -use futures::executor; -use futures::Sink; +use futures::task::Task; use num_cpus; use snow::AsyncTransportState; use std::thread; @@ -13,8 +11,8 @@ use ip_packet::IpPacket; use byteorder::{ByteOrder, LittleEndian}; pub enum Work { - Decrypt((mpsc::UnboundedSender<DecryptResult>, DecryptWork)), - Encrypt((mpsc::UnboundedSender<EncryptResult>, EncryptWork)), + Decrypt((Sender<DecryptResult>, Task, DecryptWork)), + Encrypt((Sender<EncryptResult>, Task, EncryptWork)), } pub struct EncryptWork { @@ -49,7 +47,7 @@ pub struct DecryptResult { /// Spawn a thread pool to efficiently process /// the CPU-intensive encryption/decryption. pub fn create() -> Sender<Work> { - let threads = num_cpus::get(); // One thread for I/O. + let threads = num_cpus::get() - 2; // One thread for I/O. let (sender, receiver) = bounded(4096); debug!("spinning up a crypto pool with {} threads", threads); @@ -62,55 +60,55 @@ pub fn create() -> Sender<Work> { } fn worker(receiver: Receiver<Work>) { - loop { - select_loop! { - recv(receiver, work) => { - match work { - Work::Decrypt((tx, element)) => { - let mut raw_packet = vec![0u8; element.packet.len()]; - let nonce = element.packet.nonce(); - let len = element.transport.read_transport_message(nonce, element.packet.payload(), &mut raw_packet).unwrap(); - if len > 0 { - let len = IpPacket::new(&raw_packet[..len]) - .ok_or_else(||format_err!("invalid IP packet (len {})", len)).unwrap() - .length(); - raw_packet.truncate(len as usize); - } else { - raw_packet.truncate(0); - } + while let Some(work) = receiver.recv() { + match work { + Work::Decrypt((tx, task, element)) => { + let mut raw_packet = vec![0u8; element.packet.len()]; + let nonce = element.packet.nonce(); + let len = element.transport.read_transport_message(nonce, element.packet.payload(), &mut raw_packet).unwrap(); + if len > 0 { + let len = IpPacket::new(&raw_packet[..len]) + .ok_or_else(||format_err!("invalid IP packet (len {})", len)).unwrap() + .length(); + raw_packet.truncate(len as usize); + } else { + raw_packet.truncate(0); + } - executor::spawn(tx.send(DecryptResult { - endpoint: element.endpoint, - orig_packet: element.packet, - out_packet: raw_packet, - session_type: element.session_type, - })).wait_future().unwrap(); - }, - Work::Encrypt((tx, mut element)) => { - let padding = if element.in_packet.len() % PADDING_MULTIPLE != 0 { - PADDING_MULTIPLE - (element.in_packet.len() % PADDING_MULTIPLE) - } else { 0 }; - let padded_len = element.in_packet.len() + padding; - let mut out_packet = vec![0u8; padded_len + TRANSPORT_OVERHEAD]; + tx.send(DecryptResult { + endpoint: element.endpoint, + orig_packet: element.packet, + out_packet: raw_packet, + session_type: element.session_type, + }); - out_packet[0] = 4; - LittleEndian::write_u32(&mut out_packet[4..], element.their_index); - LittleEndian::write_u64(&mut out_packet[8..], element.nonce); + task.notify(); + }, + Work::Encrypt((tx, task, mut element)) => { + let padding = if element.in_packet.len() % PADDING_MULTIPLE != 0 { + PADDING_MULTIPLE - (element.in_packet.len() % PADDING_MULTIPLE) + } else { 0 }; + let padded_len = element.in_packet.len() + padding; + let mut out_packet = vec![0u8; padded_len + TRANSPORT_OVERHEAD]; - element.in_packet.resize(padded_len, 0); - let len = element.transport.write_transport_message(element.nonce, - &element.in_packet, - &mut out_packet[16..]).unwrap(); - out_packet.truncate(TRANSPORT_HEADER_SIZE + len); + out_packet[0] = 4; + LittleEndian::write_u32(&mut out_packet[4..], element.their_index); + LittleEndian::write_u64(&mut out_packet[8..], element.nonce); - executor::spawn(tx.send(EncryptResult { - endpoint: element.endpoint, - our_index: element.our_index, - out_packet, - })).wait_future().unwrap(); - } + element.in_packet.resize(padded_len, 0); + let len = element.transport.write_transport_message(element.nonce, + &element.in_packet, + &mut out_packet[16..]).unwrap(); + out_packet.truncate(TRANSPORT_HEADER_SIZE + len); + + tx.send(EncryptResult { + endpoint: element.endpoint, + our_index: element.our_index, + out_packet, + }); + + task.notify(); } } } - } }
\ No newline at end of file diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs index 8d1b8ba..aa69b72 100644 --- a/src/interface/peer_server.rs +++ b/src/interface/peer_server.rs @@ -13,7 +13,7 @@ use timer::{Timer, TimerMessage}; use byteorder::{ByteOrder, LittleEndian}; use crossbeam_channel as crossbeam; use failure::{Error, err_msg}; -use futures::{Async, Future, Stream, Poll, sync, unsync::mpsc, task}; +use futures::{Async, Future, Stream, Poll, unsync::mpsc, task}; use rand::{self, Rng, ThreadRng}; use udp::{Endpoint, UdpSocket, PeerServerMessage, UdpChannel}; use tokio_core::reactor::Handle; @@ -39,12 +39,12 @@ struct Channel<T> { } struct SyncChannel<T> { - tx: sync::mpsc::UnboundedSender<T>, - rx: sync::mpsc::UnboundedReceiver<T>, + tx: crossbeam::Sender<T>, + rx: crossbeam::Receiver<T>, } -impl<T> From<(sync::mpsc::UnboundedSender<T>, sync::mpsc::UnboundedReceiver<T>)> for SyncChannel<T> { - fn from(pair: (sync::mpsc::UnboundedSender<T>, sync::mpsc::UnboundedReceiver<T>)) -> Self { +impl<T> From<(crossbeam::Sender<T>, crossbeam::Receiver<T>)> for SyncChannel<T> { + fn from(pair: (crossbeam::Sender<T>, crossbeam::Receiver<T>)) -> Self { Self { tx: pair.0, rx: pair.1, @@ -96,8 +96,8 @@ impl PeerServer { under_load_until : Instant::now(), rng : rand::thread_rng(), crypto_pool : crypto_pool::create(), - decrypt_channel : sync::mpsc::unbounded().into(), - encrypt_channel : sync::mpsc::unbounded().into(), + decrypt_channel : crossbeam::unbounded().into(), + encrypt_channel : crossbeam::unbounded().into(), }) } @@ -314,16 +314,16 @@ impl PeerServer { let mut peer = peer_ref.borrow_mut(); let tx = self.decrypt_channel.tx.clone(); - let work = crypto_pool::Work::Decrypt((tx, peer.handle_incoming_transport(addr, packet)?)); - self.crypto_pool.send(work)?; + let work = crypto_pool::Work::Decrypt((tx, task::current(), peer.handle_incoming_transport(addr, packet)?)); + self.crypto_pool.send(work); Ok(()) } fn encrypt_and_send(&mut self, peer: &mut Peer, packet: Vec<u8>) -> Result<(), Error> { let tx = self.encrypt_channel.tx.clone(); - let work = crypto_pool::Work::Encrypt((tx, peer.handle_outgoing_transport(packet)?)); - self.crypto_pool.send(work)?; + let work = crypto_pool::Work::Encrypt((tx, task::current(), peer.handle_outgoing_transport(packet)?)); + self.crypto_pool.send(work); Ok(()) } @@ -651,27 +651,12 @@ impl Future for PeerServer { } } - loop { - match self.decrypt_channel.rx.poll() { - Ok(Async::Ready(Some(result))) => { - let _ = self.handle_ingress_decrypted_transport(result).map_err(|e| warn!("ingress decrypt: {:?}", e)); - }, - Ok(Async::NotReady) => { break; }, - Ok(Async::Ready(None)) => bail!("incoming udp stream ended unexpectedly"), - Err(e) => bail!("incoming udp stream error: {:?}", e) - } + while let Some(result) = self.decrypt_channel.rx.try_recv() { + let _ = self.handle_ingress_decrypted_transport(result).map_err(|e| warn!("ingress decrypt: {:?}", e)); } - loop { - // Handle UDP packets from the outside world - match self.encrypt_channel.rx.poll() { - Ok(Async::Ready(Some(result))) => { - let _ = self.handle_egress_encrypted_packet(result).map_err(|e| warn!("egress encrypted: {:?}", e)); - }, - Ok(Async::NotReady) => { break; }, - Ok(Async::Ready(None)) => bail!("incoming udp stream ended unexpectedly"), - Err(e) => bail!("incoming udp stream error: {:?}", e) - } + while let Some(result) = self.encrypt_channel.rx.try_recv() { + let _ = self.handle_egress_encrypted_packet(result).map_err(|e| warn!("egress encrypted: {:?}", e)); } loop { @@ -9,7 +9,6 @@ #![cfg_attr(feature = "cargo-clippy", allow(unreadable_literal))] #![cfg_attr(feature = "cargo-clippy", allow(decimal_literal_representation))] -#[macro_use] extern crate crossbeam_channel; #[macro_use] extern crate derive_deref; #[macro_use] extern crate failure; #[macro_use] extern crate futures; @@ -22,6 +21,7 @@ extern crate byteorder; extern crate bytes; extern crate chacha20_poly1305_aead; extern crate crossbeam; +extern crate crossbeam_channel; extern crate hex; extern crate libc; extern crate mio; |