From a2caa612a3c983da76cd6ef080c7c13c9724ff82 Mon Sep 17 00:00:00 2001 From: Jake McGinty Date: Sun, 24 Jun 2018 00:37:46 +0800 Subject: update dependencies and strategy --- Cargo.lock | 28 ++++++++----- Cargo.toml | 2 +- src/crypto_pool.rs | 98 ++++++++++++++++++++++---------------------- src/interface/peer_server.rs | 45 +++++++------------- src/lib.rs | 2 +- 5 files changed, 83 insertions(+), 92 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 48db648..ca65675 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -240,12 +240,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "crossbeam-channel" -version = "0.1.3" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "crossbeam-epoch 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", - "crossbeam-utils 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-utils 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.42 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)", + "smallvec 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -278,6 +280,11 @@ dependencies = [ "cfg-if 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "crossbeam-utils" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "curve25519-dalek" version = "0.17.0" @@ -904,13 +911,13 @@ dependencies = [ [[package]] name = "ring" -version = "0.13.0-alpha4" +version = "0.13.0-alpha5" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "cc 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.42 (registry+https://github.com/rust-lang/crates.io-index)", - "untrusted 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", + "untrusted 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1038,7 +1045,7 @@ dependencies = [ "failure 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.5.0-pre.2 (registry+https://github.com/rust-lang/crates.io-index)", - "ring 0.13.0-alpha4 (registry+https://github.com/rust-lang/crates.io-index)", + "ring 0.13.0-alpha5 (registry+https://github.com/rust-lang/crates.io-index)", "rust-crypto 0.2.36 (git+https://github.com/mcginty/rust-crypto)", "rustc_version 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "static_slice 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1461,7 +1468,7 @@ dependencies = [ [[package]] name = "untrusted" -version = "0.6.1" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] @@ -1530,7 +1537,7 @@ dependencies = [ "colored 1.6.0 (registry+https://github.com/rust-lang/crates.io-index)", "criterion 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", "crossbeam 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", - "crossbeam-channel 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-channel 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "daemonize 0.2.3 (git+https://github.com/mcginty/daemonize)", "derive_deref 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "failure 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1610,10 +1617,11 @@ dependencies = [ "checksum criterion-plot 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "9f7f7c88a8d341dd9fd9e31a72ca2ca24428db79afb491852873b2c784e037e6" "checksum criterion-stats 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "dd48feb0253b2968ff3085e7f3fba6738c9ff859f420a2fb81a48986eb66da36" "checksum crossbeam 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "24ce9782d4d5c53674646a6a4c1863a21a8fc0cb649b3c94dfc16e45071dea19" -"checksum crossbeam-channel 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "862becd07e73da5746de6d9b3ba055c9bb8b10afd0d2b51155a6e30d81cd20b3" +"checksum crossbeam-channel 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b45c6ba620feae538943c106977c6348c16ad3b03dd8aaecd25a4224345fa795" "checksum crossbeam-deque 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "fe8153ef04a7594ded05b427ffad46ddeaf22e63fd48d42b3e1e3bb4db07cae7" "checksum crossbeam-epoch 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9b4e2817eb773f770dcb294127c011e22771899c21d18fce7dd739c0b9832e81" "checksum crossbeam-utils 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d636a8b3bcc1b409d7ffd3facef8f21dcb4009626adbd0c5e6c4305c07253c7b" +"checksum crossbeam-utils 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b71f220442ed14749909b543d4dd7ec3918cb1fe289fd96e88d0abe6ca049783" "checksum curve25519-dalek 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e5808ccadbb61565fd184702be128ac43a6a33561bcd976a0d7a388b06ad696d" "checksum daemonize 0.2.3 (git+https://github.com/mcginty/daemonize)" = "" "checksum derive_deref 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "232c0157c26fd44067bcee2025879139cc983d8eed21f8cb5965b6a44e23fb67" @@ -1688,7 +1696,7 @@ dependencies = [ "checksum redox_termios 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7e891cfe48e9100a70a3b6eb652fef28920c117d366339687bd5576160db0f76" "checksum regex 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "9329abc99e39129fcceabd24cf5d85b4671ef7c29c50e972bc5afe32438ec384" "checksum regex-syntax 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)" = "7d707a4fa2637f2dca2ef9fd02225ec7661fe01a53623c1e6515b6916511f7a7" -"checksum ring 0.13.0-alpha4 (registry+https://github.com/rust-lang/crates.io-index)" = "d835120175f4cfea267529ebf36fa3851ba1de1e44202f6241e5dee517edaebe" +"checksum ring 0.13.0-alpha5 (registry+https://github.com/rust-lang/crates.io-index)" = "3845516753f91b4511f9b17c917ea6fa4bc5a7853a9947b0f66731aff51cdef5" "checksum rust-crypto 0.2.36 (git+https://github.com/mcginty/rust-crypto)" = "" "checksum rustc-demangle 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "76d7ba1feafada44f2d38eed812bd2489a03c0f5abb975799251518b68848649" "checksum rustc-serialize 0.3.24 (registry+https://github.com/rust-lang/crates.io-index)" = "dcf128d1287d2ea9d80910b5f1120d0b8eede3fbf1abe91c40d39ea7d51e6fda" @@ -1751,7 +1759,7 @@ dependencies = [ "checksum unicode-xid 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f860d7d29cf02cb2f3f359fd35991af3d30bac52c57d265a3c461074cb4dc" "checksum unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fc72304796d0818e357ead4e000d19c9c174ab23dc11093ac919054d20a6a7fc" "checksum unreachable 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "382810877fe448991dfc7f0dd6e3ae5d58088fd0ea5e35189655f84e6814fa56" -"checksum untrusted 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "70afa43c8c5d23a53a3c39ec9b56232c5badc19f6bb5ad529c1d6448a7241365" +"checksum untrusted 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "55cd1f4b4e96b46aeb8d4855db4a7a9bd96eeeb5c6a1ab54593328761642ce2f" "checksum utf8-ranges 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "662fab6525a98beff2921d7f61a39e7d59e0b425ebc7d0d9e66d316e55124122" "checksum vec_map 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "05c78687fb1a80548ae3250346c3db86a80a7cdd77bda190189f2d0a0987c81a" "checksum void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" diff --git a/Cargo.toml b/Cargo.toml index 75de0a9..71f86e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,7 +35,7 @@ byteorder = "^1.2" bytes = "0.4" chacha20-poly1305-aead = "^0.1" crossbeam = "^0.3" -crossbeam-channel = "^0.1" +crossbeam-channel = "^0.2" derive_deref = "^1.0" failure = "^0.1" futures = "^0.1" diff --git a/src/crypto_pool.rs b/src/crypto_pool.rs index db5826e..c99002a 100644 --- a/src/crypto_pool.rs +++ b/src/crypto_pool.rs @@ -1,8 +1,6 @@ use consts::{PADDING_MULTIPLE, TRANSPORT_OVERHEAD, TRANSPORT_HEADER_SIZE}; use crossbeam_channel::{bounded, Receiver, Sender}; -use futures::sync::mpsc; -use futures::executor; -use futures::Sink; +use futures::task::Task; use num_cpus; use snow::AsyncTransportState; use std::thread; @@ -13,8 +11,8 @@ use ip_packet::IpPacket; use byteorder::{ByteOrder, LittleEndian}; pub enum Work { - Decrypt((mpsc::UnboundedSender, DecryptWork)), - Encrypt((mpsc::UnboundedSender, EncryptWork)), + Decrypt((Sender, Task, DecryptWork)), + Encrypt((Sender, Task, EncryptWork)), } pub struct EncryptWork { @@ -49,7 +47,7 @@ pub struct DecryptResult { /// Spawn a thread pool to efficiently process /// the CPU-intensive encryption/decryption. pub fn create() -> Sender { - let threads = num_cpus::get(); // One thread for I/O. + let threads = num_cpus::get() - 2; // One thread for I/O. let (sender, receiver) = bounded(4096); debug!("spinning up a crypto pool with {} threads", threads); @@ -62,55 +60,55 @@ pub fn create() -> Sender { } fn worker(receiver: Receiver) { - loop { - select_loop! { - recv(receiver, work) => { - match work { - Work::Decrypt((tx, element)) => { - let mut raw_packet = vec![0u8; element.packet.len()]; - let nonce = element.packet.nonce(); - let len = element.transport.read_transport_message(nonce, element.packet.payload(), &mut raw_packet).unwrap(); - if len > 0 { - let len = IpPacket::new(&raw_packet[..len]) - .ok_or_else(||format_err!("invalid IP packet (len {})", len)).unwrap() - .length(); - raw_packet.truncate(len as usize); - } else { - raw_packet.truncate(0); - } + while let Some(work) = receiver.recv() { + match work { + Work::Decrypt((tx, task, element)) => { + let mut raw_packet = vec![0u8; element.packet.len()]; + let nonce = element.packet.nonce(); + let len = element.transport.read_transport_message(nonce, element.packet.payload(), &mut raw_packet).unwrap(); + if len > 0 { + let len = IpPacket::new(&raw_packet[..len]) + .ok_or_else(||format_err!("invalid IP packet (len {})", len)).unwrap() + .length(); + raw_packet.truncate(len as usize); + } else { + raw_packet.truncate(0); + } - executor::spawn(tx.send(DecryptResult { - endpoint: element.endpoint, - orig_packet: element.packet, - out_packet: raw_packet, - session_type: element.session_type, - })).wait_future().unwrap(); - }, - Work::Encrypt((tx, mut element)) => { - let padding = if element.in_packet.len() % PADDING_MULTIPLE != 0 { - PADDING_MULTIPLE - (element.in_packet.len() % PADDING_MULTIPLE) - } else { 0 }; - let padded_len = element.in_packet.len() + padding; - let mut out_packet = vec![0u8; padded_len + TRANSPORT_OVERHEAD]; + tx.send(DecryptResult { + endpoint: element.endpoint, + orig_packet: element.packet, + out_packet: raw_packet, + session_type: element.session_type, + }); - out_packet[0] = 4; - LittleEndian::write_u32(&mut out_packet[4..], element.their_index); - LittleEndian::write_u64(&mut out_packet[8..], element.nonce); + task.notify(); + }, + Work::Encrypt((tx, task, mut element)) => { + let padding = if element.in_packet.len() % PADDING_MULTIPLE != 0 { + PADDING_MULTIPLE - (element.in_packet.len() % PADDING_MULTIPLE) + } else { 0 }; + let padded_len = element.in_packet.len() + padding; + let mut out_packet = vec![0u8; padded_len + TRANSPORT_OVERHEAD]; - element.in_packet.resize(padded_len, 0); - let len = element.transport.write_transport_message(element.nonce, - &element.in_packet, - &mut out_packet[16..]).unwrap(); - out_packet.truncate(TRANSPORT_HEADER_SIZE + len); + out_packet[0] = 4; + LittleEndian::write_u32(&mut out_packet[4..], element.their_index); + LittleEndian::write_u64(&mut out_packet[8..], element.nonce); - executor::spawn(tx.send(EncryptResult { - endpoint: element.endpoint, - our_index: element.our_index, - out_packet, - })).wait_future().unwrap(); - } + element.in_packet.resize(padded_len, 0); + let len = element.transport.write_transport_message(element.nonce, + &element.in_packet, + &mut out_packet[16..]).unwrap(); + out_packet.truncate(TRANSPORT_HEADER_SIZE + len); + + tx.send(EncryptResult { + endpoint: element.endpoint, + our_index: element.our_index, + out_packet, + }); + + task.notify(); } } } - } } \ No newline at end of file diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs index 8d1b8ba..aa69b72 100644 --- a/src/interface/peer_server.rs +++ b/src/interface/peer_server.rs @@ -13,7 +13,7 @@ use timer::{Timer, TimerMessage}; use byteorder::{ByteOrder, LittleEndian}; use crossbeam_channel as crossbeam; use failure::{Error, err_msg}; -use futures::{Async, Future, Stream, Poll, sync, unsync::mpsc, task}; +use futures::{Async, Future, Stream, Poll, unsync::mpsc, task}; use rand::{self, Rng, ThreadRng}; use udp::{Endpoint, UdpSocket, PeerServerMessage, UdpChannel}; use tokio_core::reactor::Handle; @@ -39,12 +39,12 @@ struct Channel { } struct SyncChannel { - tx: sync::mpsc::UnboundedSender, - rx: sync::mpsc::UnboundedReceiver, + tx: crossbeam::Sender, + rx: crossbeam::Receiver, } -impl From<(sync::mpsc::UnboundedSender, sync::mpsc::UnboundedReceiver)> for SyncChannel { - fn from(pair: (sync::mpsc::UnboundedSender, sync::mpsc::UnboundedReceiver)) -> Self { +impl From<(crossbeam::Sender, crossbeam::Receiver)> for SyncChannel { + fn from(pair: (crossbeam::Sender, crossbeam::Receiver)) -> Self { Self { tx: pair.0, rx: pair.1, @@ -96,8 +96,8 @@ impl PeerServer { under_load_until : Instant::now(), rng : rand::thread_rng(), crypto_pool : crypto_pool::create(), - decrypt_channel : sync::mpsc::unbounded().into(), - encrypt_channel : sync::mpsc::unbounded().into(), + decrypt_channel : crossbeam::unbounded().into(), + encrypt_channel : crossbeam::unbounded().into(), }) } @@ -314,16 +314,16 @@ impl PeerServer { let mut peer = peer_ref.borrow_mut(); let tx = self.decrypt_channel.tx.clone(); - let work = crypto_pool::Work::Decrypt((tx, peer.handle_incoming_transport(addr, packet)?)); - self.crypto_pool.send(work)?; + let work = crypto_pool::Work::Decrypt((tx, task::current(), peer.handle_incoming_transport(addr, packet)?)); + self.crypto_pool.send(work); Ok(()) } fn encrypt_and_send(&mut self, peer: &mut Peer, packet: Vec) -> Result<(), Error> { let tx = self.encrypt_channel.tx.clone(); - let work = crypto_pool::Work::Encrypt((tx, peer.handle_outgoing_transport(packet)?)); - self.crypto_pool.send(work)?; + let work = crypto_pool::Work::Encrypt((tx, task::current(), peer.handle_outgoing_transport(packet)?)); + self.crypto_pool.send(work); Ok(()) } @@ -651,27 +651,12 @@ impl Future for PeerServer { } } - loop { - match self.decrypt_channel.rx.poll() { - Ok(Async::Ready(Some(result))) => { - let _ = self.handle_ingress_decrypted_transport(result).map_err(|e| warn!("ingress decrypt: {:?}", e)); - }, - Ok(Async::NotReady) => { break; }, - Ok(Async::Ready(None)) => bail!("incoming udp stream ended unexpectedly"), - Err(e) => bail!("incoming udp stream error: {:?}", e) - } + while let Some(result) = self.decrypt_channel.rx.try_recv() { + let _ = self.handle_ingress_decrypted_transport(result).map_err(|e| warn!("ingress decrypt: {:?}", e)); } - loop { - // Handle UDP packets from the outside world - match self.encrypt_channel.rx.poll() { - Ok(Async::Ready(Some(result))) => { - let _ = self.handle_egress_encrypted_packet(result).map_err(|e| warn!("egress encrypted: {:?}", e)); - }, - Ok(Async::NotReady) => { break; }, - Ok(Async::Ready(None)) => bail!("incoming udp stream ended unexpectedly"), - Err(e) => bail!("incoming udp stream error: {:?}", e) - } + while let Some(result) = self.encrypt_channel.rx.try_recv() { + let _ = self.handle_egress_encrypted_packet(result).map_err(|e| warn!("egress encrypted: {:?}", e)); } loop { diff --git a/src/lib.rs b/src/lib.rs index 106124d..9ebaebb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,7 +9,6 @@ #![cfg_attr(feature = "cargo-clippy", allow(unreadable_literal))] #![cfg_attr(feature = "cargo-clippy", allow(decimal_literal_representation))] -#[macro_use] extern crate crossbeam_channel; #[macro_use] extern crate derive_deref; #[macro_use] extern crate failure; #[macro_use] extern crate futures; @@ -22,6 +21,7 @@ extern crate byteorder; extern crate bytes; extern crate chacha20_poly1305_aead; extern crate crossbeam; +extern crate crossbeam_channel; extern crate hex; extern crate libc; extern crate mio; -- cgit v1.2.3-59-g8ed1b