diff options
Diffstat (limited to 'src/interface/peer_server.rs')
-rw-r--r-- | src/interface/peer_server.rs | 45 |
1 files changed, 15 insertions, 30 deletions
diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs index 8d1b8ba..aa69b72 100644 --- a/src/interface/peer_server.rs +++ b/src/interface/peer_server.rs @@ -13,7 +13,7 @@ use timer::{Timer, TimerMessage}; use byteorder::{ByteOrder, LittleEndian}; use crossbeam_channel as crossbeam; use failure::{Error, err_msg}; -use futures::{Async, Future, Stream, Poll, sync, unsync::mpsc, task}; +use futures::{Async, Future, Stream, Poll, unsync::mpsc, task}; use rand::{self, Rng, ThreadRng}; use udp::{Endpoint, UdpSocket, PeerServerMessage, UdpChannel}; use tokio_core::reactor::Handle; @@ -39,12 +39,12 @@ struct Channel<T> { } struct SyncChannel<T> { - tx: sync::mpsc::UnboundedSender<T>, - rx: sync::mpsc::UnboundedReceiver<T>, + tx: crossbeam::Sender<T>, + rx: crossbeam::Receiver<T>, } -impl<T> From<(sync::mpsc::UnboundedSender<T>, sync::mpsc::UnboundedReceiver<T>)> for SyncChannel<T> { - fn from(pair: (sync::mpsc::UnboundedSender<T>, sync::mpsc::UnboundedReceiver<T>)) -> Self { +impl<T> From<(crossbeam::Sender<T>, crossbeam::Receiver<T>)> for SyncChannel<T> { + fn from(pair: (crossbeam::Sender<T>, crossbeam::Receiver<T>)) -> Self { Self { tx: pair.0, rx: pair.1, @@ -96,8 +96,8 @@ impl PeerServer { under_load_until : Instant::now(), rng : rand::thread_rng(), crypto_pool : crypto_pool::create(), - decrypt_channel : sync::mpsc::unbounded().into(), - encrypt_channel : sync::mpsc::unbounded().into(), + decrypt_channel : crossbeam::unbounded().into(), + encrypt_channel : crossbeam::unbounded().into(), }) } @@ -314,16 +314,16 @@ impl PeerServer { let mut peer = peer_ref.borrow_mut(); let tx = self.decrypt_channel.tx.clone(); - let work = crypto_pool::Work::Decrypt((tx, peer.handle_incoming_transport(addr, packet)?)); - self.crypto_pool.send(work)?; + let work = crypto_pool::Work::Decrypt((tx, task::current(), peer.handle_incoming_transport(addr, packet)?)); + self.crypto_pool.send(work); Ok(()) } fn encrypt_and_send(&mut self, peer: &mut Peer, packet: Vec<u8>) -> Result<(), Error> { let tx = self.encrypt_channel.tx.clone(); - let work = crypto_pool::Work::Encrypt((tx, peer.handle_outgoing_transport(packet)?)); - self.crypto_pool.send(work)?; + let work = crypto_pool::Work::Encrypt((tx, task::current(), peer.handle_outgoing_transport(packet)?)); + self.crypto_pool.send(work); Ok(()) } @@ -651,27 +651,12 @@ impl Future for PeerServer { } } - loop { - match self.decrypt_channel.rx.poll() { - Ok(Async::Ready(Some(result))) => { - let _ = self.handle_ingress_decrypted_transport(result).map_err(|e| warn!("ingress decrypt: {:?}", e)); - }, - Ok(Async::NotReady) => { break; }, - Ok(Async::Ready(None)) => bail!("incoming udp stream ended unexpectedly"), - Err(e) => bail!("incoming udp stream error: {:?}", e) - } + while let Some(result) = self.decrypt_channel.rx.try_recv() { + let _ = self.handle_ingress_decrypted_transport(result).map_err(|e| warn!("ingress decrypt: {:?}", e)); } - loop { - // Handle UDP packets from the outside world - match self.encrypt_channel.rx.poll() { - Ok(Async::Ready(Some(result))) => { - let _ = self.handle_egress_encrypted_packet(result).map_err(|e| warn!("egress encrypted: {:?}", e)); - }, - Ok(Async::NotReady) => { break; }, - Ok(Async::Ready(None)) => bail!("incoming udp stream ended unexpectedly"), - Err(e) => bail!("incoming udp stream error: {:?}", e) - } + while let Some(result) = self.encrypt_channel.rx.try_recv() { + let _ = self.handle_egress_encrypted_packet(result).map_err(|e| warn!("egress encrypted: {:?}", e)); } loop { |