aboutsummaryrefslogtreecommitdiffstats
path: root/src/interface/peer_server.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/interface/peer_server.rs')
-rw-r--r--src/interface/peer_server.rs93
1 files changed, 49 insertions, 44 deletions
diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs
index aa82fb7..39ba68d 100644
--- a/src/interface/peer_server.rs
+++ b/src/interface/peer_server.rs
@@ -2,6 +2,7 @@ use consts::{REKEY_TIMEOUT, KEEPALIVE_TIMEOUT, STALE_SESSION_TIMEOUT,
MAX_CONTENT_SIZE, WIPE_AFTER_TIME, MAX_HANDSHAKE_ATTEMPTS,
UNDER_LOAD_QUEUE_SIZE, UNDER_LOAD_TIME};
use cookie;
+use crypto_pool::{self, DecryptResult, EncryptResult};
use interface::{SharedPeer, SharedState, State, UtunPacket};
use message::{Message, Initiation, Response, CookieReply, Transport};
use peer::{Peer, SessionType, SessionTransition};
@@ -10,9 +11,9 @@ use timestamp::Timestamp;
use timer::{Timer, TimerMessage};
use byteorder::{ByteOrder, LittleEndian};
+use crossbeam_channel as crossbeam;
use failure::{Error, err_msg};
-use futures::{Async, Future, Stream, Poll, unsync::mpsc, task};
-use futures_cpupool::CpuPool;
+use futures::{Async, Future, Stream, Poll, sync, unsync::mpsc, task};
use rand::{self, Rng, ThreadRng};
use udp::{Endpoint, UdpSocket, PeerServerMessage, UdpChannel};
use tokio_core::reactor::Handle;
@@ -37,6 +38,20 @@ struct Channel<T> {
rx: mpsc::UnboundedReceiver<T>,
}
+struct SyncChannel<T> {
+ tx: sync::mpsc::UnboundedSender<T>,
+ rx: sync::mpsc::UnboundedReceiver<T>,
+}
+
+impl<T> From<(sync::mpsc::UnboundedSender<T>, sync::mpsc::UnboundedReceiver<T>)> for SyncChannel<T> {
+ fn from(pair: (sync::mpsc::UnboundedSender<T>, sync::mpsc::UnboundedReceiver<T>)) -> Self {
+ Self {
+ tx: pair.0,
+ rx: pair.1,
+ }
+ }
+}
+
impl<T> From<(mpsc::UnboundedSender<T>, mpsc::UnboundedReceiver<T>)> for Channel<T> {
fn from(pair: (mpsc::UnboundedSender<T>, mpsc::UnboundedReceiver<T>)) -> Self {
Self {
@@ -60,9 +75,9 @@ pub struct PeerServer {
rate_limiter : RateLimiter,
under_load_until : Instant,
rng : ThreadRng,
- cpu_pool : CpuPool,
- decrypt_channel : Channel<(Endpoint, Transport, Vec<u8>, SessionType)>,
- encrypt_channel : Channel<(SharedPeer, (Endpoint, Vec<u8>))>,
+ crypto_pool : crossbeam::Sender<crypto_pool::Work>,
+ decrypt_channel : SyncChannel<crypto_pool::DecryptResult>,
+ encrypt_channel : SyncChannel<crypto_pool::EncryptResult>,
}
impl PeerServer {
@@ -80,9 +95,9 @@ impl PeerServer {
rate_limiter : RateLimiter::new(&handle)?,
under_load_until : Instant::now(),
rng : rand::thread_rng(),
- cpu_pool : CpuPool::new_num_cpus(),
- decrypt_channel : mpsc::unbounded().into(),
- encrypt_channel : mpsc::unbounded().into(),
+ crypto_pool : crypto_pool::create(),
+ decrypt_channel : sync::mpsc::unbounded().into(),
+ encrypt_channel : sync::mpsc::unbounded().into(),
})
}
@@ -273,7 +288,7 @@ impl PeerServer {
}
} else {
debug!("sending empty keepalive");
- self.encrypt_and_send(peer_ref.clone(), &mut peer, UtunPacket::Inet4(vec![]))?;
+ self.encrypt_and_send(peer_ref.clone(), &mut peer, vec![])?;
}
} else {
error!("peer not ready for transport after processing handshake response. this shouldn't happen.");
@@ -299,38 +314,27 @@ impl PeerServer {
let mut peer = peer_ref.borrow_mut();
let tx = self.decrypt_channel.tx.clone();
- let f = self.cpu_pool.spawn(peer.handle_incoming_transport(addr, packet)?)
- .and_then(move |result| {
- tx.unbounded_send(result).expect("broken decrypt channel");
- Ok(())
- })
- .map_err(|e| warn!("{:?}", e));
- self.handle.spawn(f);
+ let work = crypto_pool::Work::Decrypt((tx, peer.handle_incoming_transport(addr, packet)?));
+ self.crypto_pool.send(work)?;
Ok(())
}
- fn encrypt_and_send(&mut self, peer_ref: SharedPeer, peer: &mut Peer, packet: UtunPacket) -> Result<(), Error> {
+ fn encrypt_and_send(&mut self, peer_ref: SharedPeer, peer: &mut Peer, packet: Vec<u8>) -> Result<(), Error> {
let tx = self.encrypt_channel.tx.clone();
- let f = self.cpu_pool.spawn(peer.handle_outgoing_transport(packet)?)
- .and_then(move |result| {
- tx.unbounded_send((peer_ref, result)).expect("broken decrypt channel");
- Ok(())
- })
- .map_err(|e| warn!("{:?}", e));
- self.handle.spawn(f);
+ let work = crypto_pool::Work::Encrypt((tx, peer.handle_outgoing_transport(packet)?));
+ self.crypto_pool.send(work)?;
Ok(())
}
- fn handle_ingress_decrypted_transport(&mut self, addr: Endpoint, orig_packet: Transport, raw_packet: Vec<u8>, session_type: SessionType)
- -> Result<(), Error>
+ fn handle_ingress_decrypted_transport(&mut self, result: DecryptResult) -> Result<(), Error>
{
- let peer_ref = self.shared_state.borrow().index_map.get(&orig_packet.our_index())
+ let peer_ref = self.shared_state.borrow().index_map.get(&result.orig_packet.our_index())
.ok_or_else(|| err_msg("unknown our_index"))?.clone();
let needs_handshake = {
let mut peer = peer_ref.borrow_mut();
- let transition = peer.handle_incoming_decrypted_transport(addr, &raw_packet, session_type)?;
+ let transition = peer.handle_incoming_decrypted_transport(result.endpoint, &result.out_packet, result.session_type)?;
let shared_state = self.shared_state.clone();
let mut state = shared_state.borrow_mut();
if let SessionTransition::Transition(possible_dead_index) = transition {
@@ -338,7 +342,7 @@ impl PeerServer {
let _ = state.index_map.remove(&index);
}
- let outgoing: Vec<UtunPacket> = peer.outgoing_queue.drain(..).collect();
+ let outgoing: Vec<Vec<u8>> = peer.outgoing_queue.drain(..).collect();
for packet in outgoing {
self.encrypt_and_send(peer_ref.clone(), &mut peer, packet)?;
@@ -354,14 +358,14 @@ impl PeerServer {
self.send_handshake_init(&peer_ref)?;
}
- if raw_packet.is_empty() {
+ if result.out_packet.is_empty() {
debug!("received keepalive.");
return Ok(()) // short-circuit on keep-alives
}
- self.shared_state.borrow_mut().router.validate_source(&raw_packet, &peer_ref)?;
+ self.shared_state.borrow_mut().router.validate_source(&result.out_packet, &peer_ref)?;
trace!("received transport packet");
- self.send_to_tunnel(raw_packet)?;
+ self.send_to_tunnel(result.out_packet)?;
Ok(())
}
@@ -374,7 +378,7 @@ impl PeerServer {
let needs_handshake = {
let mut peer = peer_ref.borrow_mut();
let needs_handshake = peer.needs_new_handshake(true);
- peer.queue_egress(packet);
+ peer.queue_egress(packet.into());
if peer.ready_for_transport() {
if peer.outgoing_queue.len() > 1 {
@@ -396,11 +400,13 @@ impl PeerServer {
Ok(())
}
- fn handle_egress_encrypted_packet(&mut self, peer_ref: SharedPeer, endpoint: Endpoint, packet: Vec<u8>) -> Result<(), Error> {
+ fn handle_egress_encrypted_packet(&mut self, result: EncryptResult) -> Result<(), Error> {
+ let peer_ref = self.shared_state.borrow().index_map.get(&result.our_index)
+ .ok_or_else(|| err_msg("unknown our_index"))?.clone();
let mut peer = peer_ref.borrow_mut();
- peer.handle_outgoing_encrypted_transport(&packet);
+ peer.handle_outgoing_encrypted_transport(&result.out_packet);
- self.send_to_peer((endpoint, packet))
+ self.send_to_peer((result.endpoint, result.out_packet))
}
fn send_cookie_reply(&mut self, addr: Endpoint, mac1: &[u8], index: u32) -> Result<(), Error> {
@@ -508,7 +514,7 @@ impl PeerServer {
}
}
- self.encrypt_and_send(upgraded_peer_ref.clone(), &mut peer, UtunPacket::Inet4(vec![]))?;
+ self.encrypt_and_send(upgraded_peer_ref.clone(), &mut peer, vec![])?;
debug!("sent passive keepalive packet");
self.timer.send_after(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone()));
@@ -526,7 +532,7 @@ impl PeerServer {
bail!("persistent keepalive tick (waiting ~{}s due to last authenticated packet time)", wait.as_secs());
}
- self.encrypt_and_send(upgraded_peer_ref.clone(), &mut peer, UtunPacket::Inet4(vec![]))?;
+ self.encrypt_and_send(upgraded_peer_ref.clone(), &mut peer, vec![])?;
let handle = self.timer.send_after(persistent_keepalive, PersistentKeepAlive(peer_ref.clone()));
peer.timers.persistent_timer = Some(handle);
debug!("sent persistent keepalive packet");
@@ -584,7 +590,7 @@ impl PeerServer {
if let Some(keepalive) = peer.info.persistent_keepalive() {
let handle = self.timer.send_after(keepalive, TimerMessage::PersistentKeepAlive(Rc::downgrade(&peer_ref)));
peer.timers.persistent_timer = Some(handle);
- self.encrypt_and_send(peer_ref.clone(), &mut peer, UtunPacket::Inet4(vec![]))?;
+ self.encrypt_and_send(peer_ref.clone(), &mut peer, vec![])?;
debug!("set new keepalive timer and immediately sent new keepalive packet.");
}
}
@@ -646,10 +652,9 @@ impl Future for PeerServer {
}
loop {
- // Handle UDP packets from the outside world
match self.decrypt_channel.rx.poll() {
- Ok(Async::Ready(Some((addr, orig_packet, decrypted, session_type)))) => {
- let _ = self.handle_ingress_decrypted_transport(addr, orig_packet, decrypted, session_type).map_err(|e| warn!("UDP ERR: {:?}", e));
+ Ok(Async::Ready(Some(result))) => {
+ let _ = self.handle_ingress_decrypted_transport(result).map_err(|e| warn!("UDP ERR: {:?}", e));
},
Ok(Async::NotReady) => { break; },
Ok(Async::Ready(None)) => bail!("incoming udp stream ended unexpectedly"),
@@ -660,8 +665,8 @@ impl Future for PeerServer {
loop {
// Handle UDP packets from the outside world
match self.encrypt_channel.rx.poll() {
- Ok(Async::Ready(Some((peer_ref, (endpoint, packet))))) => {
- let _ = self.handle_egress_encrypted_packet(peer_ref, endpoint, packet).map_err(|e| warn!("UDP ERR: {:?}", e));
+ Ok(Async::Ready(Some(result))) => {
+ let _ = self.handle_egress_encrypted_packet(result).map_err(|e| warn!("UDP ERR: {:?}", e));
},
Ok(Async::NotReady) => { break; },
Ok(Async::Ready(None)) => bail!("incoming udp stream ended unexpectedly"),