From d0aaf28afcbc812c5a05b16da35f39afbc3fb047 Mon Sep 17 00:00:00 2001 From: Jake McGinty Date: Fri, 1 Jun 2018 23:37:15 -0500 Subject: well, this is interesting. --- src/crypto_pool.rs | 17 ++++++++++------- src/interface/mod.rs | 3 +++ src/interface/peer_server.rs | 24 ++++++++++++------------ src/peer.rs | 6 +++--- 4 files changed, 28 insertions(+), 22 deletions(-) diff --git a/src/crypto_pool.rs b/src/crypto_pool.rs index e0756e9..b8e4536 100644 --- a/src/crypto_pool.rs +++ b/src/crypto_pool.rs @@ -50,13 +50,14 @@ pub struct DecryptResult { /// Spawn a thread pool to efficiently process /// the CPU-intensive encryption/decryption. pub fn create() -> Sender { - let threads = num_cpus::get() - 1; // One thread for I/O. + let threads = num_cpus::get(); // One thread for I/O. let (sender, receiver) = unbounded(); + debug!("spinning up a crypto pool with {} threads", threads); crossbeam::scope(|s| { - for i in 0..threads { + for _ in 0..threads { let rx = receiver.clone(); - s.spawn(move || worker(rx.clone())); + thread::spawn(move || worker(rx.clone())); } }); @@ -64,6 +65,7 @@ pub fn create() -> Sender { } fn worker(receiver: Receiver) { + loop { select_loop! { recv(receiver, work) => { match work { @@ -80,12 +82,12 @@ fn worker(receiver: Receiver) { raw_packet.truncate(0); } - tx.unbounded_send(DecryptResult { + executor::spawn(tx.send(DecryptResult { endpoint: element.endpoint, orig_packet: element.packet, out_packet: raw_packet, session_type: element.session_type, - }).unwrap(); + })).wait_future().unwrap(); }, Work::Encrypt((tx, mut element)) => { let padding = if element.in_packet.len() % PADDING_MULTIPLE != 0 { @@ -104,13 +106,14 @@ fn worker(receiver: Receiver) { &mut out_packet[16..]).unwrap(); out_packet.truncate(TRANSPORT_HEADER_SIZE + len); - tx.unbounded_send(EncryptResult { + executor::spawn(tx.send(EncryptResult { endpoint: element.endpoint, our_index: element.our_index, out_packet, - }).unwrap(); + })).wait_future().unwrap(); } } } } + } } \ No newline at end of file diff --git a/src/interface/mod.rs b/src/interface/mod.rs index acfb9be..8c43c8d 100644 --- a/src/interface/mod.rs +++ b/src/interface/mod.rs @@ -108,11 +108,14 @@ impl Interface { let (utun_tx, utun_rx) = unsync::mpsc::unbounded::>(); let peer_server = PeerServer::new(core.handle(), self.state.clone(), utun_tx.clone())?; + debug!("peer server... engaged."); let utun_stream = UtunStream::connect(&self.name, &core.handle())?; + debug!("utun stream... engaged."); let interface_name = utun_stream.name()?; let utun_stream = utun_stream.framed(VecUtunCodec{}); let config_server = ConfigurationService::new(&interface_name, &self.state, peer_server.tx(), &core.handle())?.map_err(|_|()); self.name = interface_name; + debug!("config server... engaged."); let (utun_writer, utun_reader) = utun_stream.split(); diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs index 39ba68d..8d1b8ba 100644 --- a/src/interface/peer_server.rs +++ b/src/interface/peer_server.rs @@ -284,11 +284,11 @@ impl PeerServer { if !peer.outgoing_queue.is_empty() { debug!("sending {} queued egress packets", peer.outgoing_queue.len()); while let Some(packet) = peer.outgoing_queue.pop_front() { - self.encrypt_and_send(peer_ref.clone(), &mut peer, packet)?; + self.encrypt_and_send(&mut peer, packet)?; } } else { debug!("sending empty keepalive"); - self.encrypt_and_send(peer_ref.clone(), &mut peer, vec![])?; + self.encrypt_and_send(&mut peer, vec![])?; } } else { error!("peer not ready for transport after processing handshake response. this shouldn't happen."); @@ -320,7 +320,7 @@ impl PeerServer { Ok(()) } - fn encrypt_and_send(&mut self, peer_ref: SharedPeer, peer: &mut Peer, packet: Vec) -> Result<(), Error> { + fn encrypt_and_send(&mut self, peer: &mut Peer, packet: Vec) -> Result<(), Error> { let tx = self.encrypt_channel.tx.clone(); let work = crypto_pool::Work::Encrypt((tx, peer.handle_outgoing_transport(packet)?)); self.crypto_pool.send(work)?; @@ -345,7 +345,7 @@ impl PeerServer { let outgoing: Vec> = peer.outgoing_queue.drain(..).collect(); for packet in outgoing { - self.encrypt_and_send(peer_ref.clone(), &mut peer, packet)?; + self.encrypt_and_send(&mut peer, packet)?; } self.timer.send_after(*WIPE_AFTER_TIME, TimerMessage::Wipe(Rc::downgrade(&peer_ref))); @@ -386,7 +386,7 @@ impl PeerServer { } while let Some(packet) = peer.outgoing_queue.pop_front() { - self.encrypt_and_send(peer_ref.clone(), &mut peer, packet)?; + self.encrypt_and_send(&mut peer, packet)?; } } @@ -514,7 +514,7 @@ impl PeerServer { } } - self.encrypt_and_send(upgraded_peer_ref.clone(), &mut peer, vec![])?; + self.encrypt_and_send(&mut peer, vec![])?; debug!("sent passive keepalive packet"); self.timer.send_after(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone())); @@ -532,7 +532,7 @@ impl PeerServer { bail!("persistent keepalive tick (waiting ~{}s due to last authenticated packet time)", wait.as_secs()); } - self.encrypt_and_send(upgraded_peer_ref.clone(), &mut peer, vec![])?; + self.encrypt_and_send(&mut peer, vec![])?; let handle = self.timer.send_after(persistent_keepalive, PersistentKeepAlive(peer_ref.clone())); peer.timers.persistent_timer = Some(handle); debug!("sent persistent keepalive packet"); @@ -590,7 +590,7 @@ impl PeerServer { if let Some(keepalive) = peer.info.persistent_keepalive() { let handle = self.timer.send_after(keepalive, TimerMessage::PersistentKeepAlive(Rc::downgrade(&peer_ref))); peer.timers.persistent_timer = Some(handle); - self.encrypt_and_send(peer_ref.clone(), &mut peer, vec![])?; + self.encrypt_and_send(&mut peer, vec![])?; debug!("set new keepalive timer and immediately sent new keepalive packet."); } } @@ -642,7 +642,7 @@ impl Future for PeerServer { // Handle UDP packets from the outside world match self.udp.as_mut().unwrap().ingress.poll() { Ok(Async::Ready(Some((addr, packet)))) => { - let _ = self.handle_ingress_packet(addr, packet).map_err(|e| warn!("UDP ERR: {:?}", e)); + let _ = self.handle_ingress_packet(addr, packet).map_err(|e| warn!("ingress: {:?}", e)); }, Ok(Async::NotReady) => { break; }, Ok(Async::Ready(None)) => bail!("incoming udp stream ended unexpectedly"), @@ -654,7 +654,7 @@ impl Future for PeerServer { loop { match self.decrypt_channel.rx.poll() { Ok(Async::Ready(Some(result))) => { - let _ = self.handle_ingress_decrypted_transport(result).map_err(|e| warn!("UDP ERR: {:?}", e)); + let _ = self.handle_ingress_decrypted_transport(result).map_err(|e| warn!("ingress decrypt: {:?}", e)); }, Ok(Async::NotReady) => { break; }, Ok(Async::Ready(None)) => bail!("incoming udp stream ended unexpectedly"), @@ -666,7 +666,7 @@ impl Future for PeerServer { // Handle UDP packets from the outside world match self.encrypt_channel.rx.poll() { Ok(Async::Ready(Some(result))) => { - let _ = self.handle_egress_encrypted_packet(result).map_err(|e| warn!("UDP ERR: {:?}", e)); + let _ = self.handle_egress_encrypted_packet(result).map_err(|e| warn!("egress encrypted: {:?}", e)); }, Ok(Async::NotReady) => { break; }, Ok(Async::Ready(None)) => bail!("incoming udp stream ended unexpectedly"), @@ -678,7 +678,7 @@ impl Future for PeerServer { // Handle packets coming from the local tunnel match self.outgoing.rx.poll() { Ok(Async::Ready(Some(packet))) => { - let _ = self.handle_egress_packet(packet).map_err(|e| warn!("UDP ERR: {:?}", e)); + let _ = self.handle_egress_packet(packet).map_err(|e| warn!("egress: {:?}", e)); }, Ok(Async::NotReady) => { break; }, Ok(Async::Ready(None)) => bail!("outgoing udp stream ended unexpectedly"), diff --git a/src/peer.rs b/src/peer.rs index aace50d..992f3c3 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -6,7 +6,7 @@ use consts::{REKEY_AFTER_MESSAGES, REKEY_AFTER_TIME, use crypto_pool::{DecryptWork, EncryptWork}; use cookie; use failure::{Error, err_msg}; -use futures::{Future, future}; +use futures::Future; use interface::UtunPacket; use noise; use message::{Initiation, Response, CookieReply, Transport}; @@ -348,7 +348,7 @@ impl Peer { ensure!(session.birthday.elapsed() < *REJECT_AFTER_TIME, "exceeded REJECT-AFTER-TIME"); session.anti_replay.update(nonce)?; - let mut transport = session.noise.get_async_transport_state()?.clone(); + let transport = session.noise.get_async_transport_state()?.clone(); Ok(DecryptWork { transport, endpoint, @@ -396,7 +396,7 @@ impl Peer { session.nonce += 1; let nonce = session.nonce - 1; - let mut transport = session.noise.get_async_transport_state()?.clone(); + let transport = session.noise.get_async_transport_state()?.clone(); Ok(EncryptWork { transport, -- cgit v1.2.3-59-g8ed1b