aboutsummaryrefslogtreecommitdiffstats
path: root/src/interface/peer_server.rs
diff options
context:
space:
mode:
authorJake McGinty <me@jake.su>2018-06-24 00:37:46 +0800
committerJake McGinty <me@jake.su>2018-06-24 00:37:46 +0800
commita2caa612a3c983da76cd6ef080c7c13c9724ff82 (patch)
treee137b96db26f24694d1130268158e2b01586be28 /src/interface/peer_server.rs
parentclean stuff up a bit (diff)
downloadwireguard-rs-a2caa612a3c983da76cd6ef080c7c13c9724ff82.tar.xz
wireguard-rs-a2caa612a3c983da76cd6ef080c7c13c9724ff82.zip
update dependencies and strategyjm/multithread-crossbeam
Diffstat (limited to 'src/interface/peer_server.rs')
-rw-r--r--src/interface/peer_server.rs45
1 files changed, 15 insertions, 30 deletions
diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs
index 8d1b8ba..aa69b72 100644
--- a/src/interface/peer_server.rs
+++ b/src/interface/peer_server.rs
@@ -13,7 +13,7 @@ use timer::{Timer, TimerMessage};
use byteorder::{ByteOrder, LittleEndian};
use crossbeam_channel as crossbeam;
use failure::{Error, err_msg};
-use futures::{Async, Future, Stream, Poll, sync, unsync::mpsc, task};
+use futures::{Async, Future, Stream, Poll, unsync::mpsc, task};
use rand::{self, Rng, ThreadRng};
use udp::{Endpoint, UdpSocket, PeerServerMessage, UdpChannel};
use tokio_core::reactor::Handle;
@@ -39,12 +39,12 @@ struct Channel<T> {
}
struct SyncChannel<T> {
- tx: sync::mpsc::UnboundedSender<T>,
- rx: sync::mpsc::UnboundedReceiver<T>,
+ tx: crossbeam::Sender<T>,
+ rx: crossbeam::Receiver<T>,
}
-impl<T> From<(sync::mpsc::UnboundedSender<T>, sync::mpsc::UnboundedReceiver<T>)> for SyncChannel<T> {
- fn from(pair: (sync::mpsc::UnboundedSender<T>, sync::mpsc::UnboundedReceiver<T>)) -> Self {
+impl<T> From<(crossbeam::Sender<T>, crossbeam::Receiver<T>)> for SyncChannel<T> {
+ fn from(pair: (crossbeam::Sender<T>, crossbeam::Receiver<T>)) -> Self {
Self {
tx: pair.0,
rx: pair.1,
@@ -96,8 +96,8 @@ impl PeerServer {
under_load_until : Instant::now(),
rng : rand::thread_rng(),
crypto_pool : crypto_pool::create(),
- decrypt_channel : sync::mpsc::unbounded().into(),
- encrypt_channel : sync::mpsc::unbounded().into(),
+ decrypt_channel : crossbeam::unbounded().into(),
+ encrypt_channel : crossbeam::unbounded().into(),
})
}
@@ -314,16 +314,16 @@ impl PeerServer {
let mut peer = peer_ref.borrow_mut();
let tx = self.decrypt_channel.tx.clone();
- let work = crypto_pool::Work::Decrypt((tx, peer.handle_incoming_transport(addr, packet)?));
- self.crypto_pool.send(work)?;
+ let work = crypto_pool::Work::Decrypt((tx, task::current(), peer.handle_incoming_transport(addr, packet)?));
+ self.crypto_pool.send(work);
Ok(())
}
fn encrypt_and_send(&mut self, peer: &mut Peer, packet: Vec<u8>) -> Result<(), Error> {
let tx = self.encrypt_channel.tx.clone();
- let work = crypto_pool::Work::Encrypt((tx, peer.handle_outgoing_transport(packet)?));
- self.crypto_pool.send(work)?;
+ let work = crypto_pool::Work::Encrypt((tx, task::current(), peer.handle_outgoing_transport(packet)?));
+ self.crypto_pool.send(work);
Ok(())
}
@@ -651,27 +651,12 @@ impl Future for PeerServer {
}
}
- loop {
- match self.decrypt_channel.rx.poll() {
- Ok(Async::Ready(Some(result))) => {
- let _ = self.handle_ingress_decrypted_transport(result).map_err(|e| warn!("ingress decrypt: {:?}", e));
- },
- Ok(Async::NotReady) => { break; },
- Ok(Async::Ready(None)) => bail!("incoming udp stream ended unexpectedly"),
- Err(e) => bail!("incoming udp stream error: {:?}", e)
- }
+ while let Some(result) = self.decrypt_channel.rx.try_recv() {
+ let _ = self.handle_ingress_decrypted_transport(result).map_err(|e| warn!("ingress decrypt: {:?}", e));
}
- loop {
- // Handle UDP packets from the outside world
- match self.encrypt_channel.rx.poll() {
- Ok(Async::Ready(Some(result))) => {
- let _ = self.handle_egress_encrypted_packet(result).map_err(|e| warn!("egress encrypted: {:?}", e));
- },
- Ok(Async::NotReady) => { break; },
- Ok(Async::Ready(None)) => bail!("incoming udp stream ended unexpectedly"),
- Err(e) => bail!("incoming udp stream error: {:?}", e)
- }
+ while let Some(result) = self.encrypt_channel.rx.try_recv() {
+ let _ = self.handle_egress_encrypted_packet(result).map_err(|e| warn!("egress encrypted: {:?}", e));
}
loop {