aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJake McGinty <me@jake.su>2018-06-01 23:37:15 -0500
committerJake McGinty <me@jake.su>2018-06-01 23:53:29 -0500
commitd0aaf28afcbc812c5a05b16da35f39afbc3fb047 (patch)
treeb8293f4becc0b724f3f715a8572aef6aa6c05b0a
parentfollow crossbeam directions to see if perf improves (diff)
downloadwireguard-rs-d0aaf28afcbc812c5a05b16da35f39afbc3fb047.tar.xz
wireguard-rs-d0aaf28afcbc812c5a05b16da35f39afbc3fb047.zip
well, this is interesting.
-rw-r--r--src/crypto_pool.rs17
-rw-r--r--src/interface/mod.rs3
-rw-r--r--src/interface/peer_server.rs24
-rw-r--r--src/peer.rs6
4 files changed, 28 insertions, 22 deletions
diff --git a/src/crypto_pool.rs b/src/crypto_pool.rs
index e0756e9..b8e4536 100644
--- a/src/crypto_pool.rs
+++ b/src/crypto_pool.rs
@@ -50,13 +50,14 @@ pub struct DecryptResult {
/// Spawn a thread pool to efficiently process
/// the CPU-intensive encryption/decryption.
pub fn create() -> Sender<Work> {
- let threads = num_cpus::get() - 1; // One thread for I/O.
+ let threads = num_cpus::get(); // One thread for I/O.
let (sender, receiver) = unbounded();
+ debug!("spinning up a crypto pool with {} threads", threads);
crossbeam::scope(|s| {
- for i in 0..threads {
+ for _ in 0..threads {
let rx = receiver.clone();
- s.spawn(move || worker(rx.clone()));
+ thread::spawn(move || worker(rx.clone()));
}
});
@@ -64,6 +65,7 @@ pub fn create() -> Sender<Work> {
}
fn worker(receiver: Receiver<Work>) {
+ loop {
select_loop! {
recv(receiver, work) => {
match work {
@@ -80,12 +82,12 @@ fn worker(receiver: Receiver<Work>) {
raw_packet.truncate(0);
}
- tx.unbounded_send(DecryptResult {
+ executor::spawn(tx.send(DecryptResult {
endpoint: element.endpoint,
orig_packet: element.packet,
out_packet: raw_packet,
session_type: element.session_type,
- }).unwrap();
+ })).wait_future().unwrap();
},
Work::Encrypt((tx, mut element)) => {
let padding = if element.in_packet.len() % PADDING_MULTIPLE != 0 {
@@ -104,13 +106,14 @@ fn worker(receiver: Receiver<Work>) {
&mut out_packet[16..]).unwrap();
out_packet.truncate(TRANSPORT_HEADER_SIZE + len);
- tx.unbounded_send(EncryptResult {
+ executor::spawn(tx.send(EncryptResult {
endpoint: element.endpoint,
our_index: element.our_index,
out_packet,
- }).unwrap();
+ })).wait_future().unwrap();
}
}
}
}
+ }
} \ No newline at end of file
diff --git a/src/interface/mod.rs b/src/interface/mod.rs
index acfb9be..8c43c8d 100644
--- a/src/interface/mod.rs
+++ b/src/interface/mod.rs
@@ -108,11 +108,14 @@ impl Interface {
let (utun_tx, utun_rx) = unsync::mpsc::unbounded::<Vec<u8>>();
let peer_server = PeerServer::new(core.handle(), self.state.clone(), utun_tx.clone())?;
+ debug!("peer server... engaged.");
let utun_stream = UtunStream::connect(&self.name, &core.handle())?;
+ debug!("utun stream... engaged.");
let interface_name = utun_stream.name()?;
let utun_stream = utun_stream.framed(VecUtunCodec{});
let config_server = ConfigurationService::new(&interface_name, &self.state, peer_server.tx(), &core.handle())?.map_err(|_|());
self.name = interface_name;
+ debug!("config server... engaged.");
let (utun_writer, utun_reader) = utun_stream.split();
diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs
index 39ba68d..8d1b8ba 100644
--- a/src/interface/peer_server.rs
+++ b/src/interface/peer_server.rs
@@ -284,11 +284,11 @@ impl PeerServer {
if !peer.outgoing_queue.is_empty() {
debug!("sending {} queued egress packets", peer.outgoing_queue.len());
while let Some(packet) = peer.outgoing_queue.pop_front() {
- self.encrypt_and_send(peer_ref.clone(), &mut peer, packet)?;
+ self.encrypt_and_send(&mut peer, packet)?;
}
} else {
debug!("sending empty keepalive");
- self.encrypt_and_send(peer_ref.clone(), &mut peer, vec![])?;
+ self.encrypt_and_send(&mut peer, vec![])?;
}
} else {
error!("peer not ready for transport after processing handshake response. this shouldn't happen.");
@@ -320,7 +320,7 @@ impl PeerServer {
Ok(())
}
- fn encrypt_and_send(&mut self, peer_ref: SharedPeer, peer: &mut Peer, packet: Vec<u8>) -> Result<(), Error> {
+ fn encrypt_and_send(&mut self, peer: &mut Peer, packet: Vec<u8>) -> Result<(), Error> {
let tx = self.encrypt_channel.tx.clone();
let work = crypto_pool::Work::Encrypt((tx, peer.handle_outgoing_transport(packet)?));
self.crypto_pool.send(work)?;
@@ -345,7 +345,7 @@ impl PeerServer {
let outgoing: Vec<Vec<u8>> = peer.outgoing_queue.drain(..).collect();
for packet in outgoing {
- self.encrypt_and_send(peer_ref.clone(), &mut peer, packet)?;
+ self.encrypt_and_send(&mut peer, packet)?;
}
self.timer.send_after(*WIPE_AFTER_TIME, TimerMessage::Wipe(Rc::downgrade(&peer_ref)));
@@ -386,7 +386,7 @@ impl PeerServer {
}
while let Some(packet) = peer.outgoing_queue.pop_front() {
- self.encrypt_and_send(peer_ref.clone(), &mut peer, packet)?;
+ self.encrypt_and_send(&mut peer, packet)?;
}
}
@@ -514,7 +514,7 @@ impl PeerServer {
}
}
- self.encrypt_and_send(upgraded_peer_ref.clone(), &mut peer, vec![])?;
+ self.encrypt_and_send(&mut peer, vec![])?;
debug!("sent passive keepalive packet");
self.timer.send_after(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone()));
@@ -532,7 +532,7 @@ impl PeerServer {
bail!("persistent keepalive tick (waiting ~{}s due to last authenticated packet time)", wait.as_secs());
}
- self.encrypt_and_send(upgraded_peer_ref.clone(), &mut peer, vec![])?;
+ self.encrypt_and_send(&mut peer, vec![])?;
let handle = self.timer.send_after(persistent_keepalive, PersistentKeepAlive(peer_ref.clone()));
peer.timers.persistent_timer = Some(handle);
debug!("sent persistent keepalive packet");
@@ -590,7 +590,7 @@ impl PeerServer {
if let Some(keepalive) = peer.info.persistent_keepalive() {
let handle = self.timer.send_after(keepalive, TimerMessage::PersistentKeepAlive(Rc::downgrade(&peer_ref)));
peer.timers.persistent_timer = Some(handle);
- self.encrypt_and_send(peer_ref.clone(), &mut peer, vec![])?;
+ self.encrypt_and_send(&mut peer, vec![])?;
debug!("set new keepalive timer and immediately sent new keepalive packet.");
}
}
@@ -642,7 +642,7 @@ impl Future for PeerServer {
// Handle UDP packets from the outside world
match self.udp.as_mut().unwrap().ingress.poll() {
Ok(Async::Ready(Some((addr, packet)))) => {
- let _ = self.handle_ingress_packet(addr, packet).map_err(|e| warn!("UDP ERR: {:?}", e));
+ let _ = self.handle_ingress_packet(addr, packet).map_err(|e| warn!("ingress: {:?}", e));
},
Ok(Async::NotReady) => { break; },
Ok(Async::Ready(None)) => bail!("incoming udp stream ended unexpectedly"),
@@ -654,7 +654,7 @@ impl Future for PeerServer {
loop {
match self.decrypt_channel.rx.poll() {
Ok(Async::Ready(Some(result))) => {
- let _ = self.handle_ingress_decrypted_transport(result).map_err(|e| warn!("UDP ERR: {:?}", e));
+ let _ = self.handle_ingress_decrypted_transport(result).map_err(|e| warn!("ingress decrypt: {:?}", e));
},
Ok(Async::NotReady) => { break; },
Ok(Async::Ready(None)) => bail!("incoming udp stream ended unexpectedly"),
@@ -666,7 +666,7 @@ impl Future for PeerServer {
// Handle UDP packets from the outside world
match self.encrypt_channel.rx.poll() {
Ok(Async::Ready(Some(result))) => {
- let _ = self.handle_egress_encrypted_packet(result).map_err(|e| warn!("UDP ERR: {:?}", e));
+ let _ = self.handle_egress_encrypted_packet(result).map_err(|e| warn!("egress encrypted: {:?}", e));
},
Ok(Async::NotReady) => { break; },
Ok(Async::Ready(None)) => bail!("incoming udp stream ended unexpectedly"),
@@ -678,7 +678,7 @@ impl Future for PeerServer {
// Handle packets coming from the local tunnel
match self.outgoing.rx.poll() {
Ok(Async::Ready(Some(packet))) => {
- let _ = self.handle_egress_packet(packet).map_err(|e| warn!("UDP ERR: {:?}", e));
+ let _ = self.handle_egress_packet(packet).map_err(|e| warn!("egress: {:?}", e));
},
Ok(Async::NotReady) => { break; },
Ok(Async::Ready(None)) => bail!("outgoing udp stream ended unexpectedly"),
diff --git a/src/peer.rs b/src/peer.rs
index aace50d..992f3c3 100644
--- a/src/peer.rs
+++ b/src/peer.rs
@@ -6,7 +6,7 @@ use consts::{REKEY_AFTER_MESSAGES, REKEY_AFTER_TIME,
use crypto_pool::{DecryptWork, EncryptWork};
use cookie;
use failure::{Error, err_msg};
-use futures::{Future, future};
+use futures::Future;
use interface::UtunPacket;
use noise;
use message::{Initiation, Response, CookieReply, Transport};
@@ -348,7 +348,7 @@ impl Peer {
ensure!(session.birthday.elapsed() < *REJECT_AFTER_TIME, "exceeded REJECT-AFTER-TIME");
session.anti_replay.update(nonce)?;
- let mut transport = session.noise.get_async_transport_state()?.clone();
+ let transport = session.noise.get_async_transport_state()?.clone();
Ok(DecryptWork {
transport,
endpoint,
@@ -396,7 +396,7 @@ impl Peer {
session.nonce += 1;
let nonce = session.nonce - 1;
- let mut transport = session.noise.get_async_transport_state()?.clone();
+ let transport = session.noise.get_async_transport_state()?.clone();
Ok(EncryptWork {
transport,