diff options
Diffstat (limited to 'src/interface/peer_server.rs')
-rw-r--r-- | src/interface/peer_server.rs | 93 |
1 files changed, 49 insertions, 44 deletions
diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs index aa82fb7..39ba68d 100644 --- a/src/interface/peer_server.rs +++ b/src/interface/peer_server.rs @@ -2,6 +2,7 @@ use consts::{REKEY_TIMEOUT, KEEPALIVE_TIMEOUT, STALE_SESSION_TIMEOUT, MAX_CONTENT_SIZE, WIPE_AFTER_TIME, MAX_HANDSHAKE_ATTEMPTS, UNDER_LOAD_QUEUE_SIZE, UNDER_LOAD_TIME}; use cookie; +use crypto_pool::{self, DecryptResult, EncryptResult}; use interface::{SharedPeer, SharedState, State, UtunPacket}; use message::{Message, Initiation, Response, CookieReply, Transport}; use peer::{Peer, SessionType, SessionTransition}; @@ -10,9 +11,9 @@ use timestamp::Timestamp; use timer::{Timer, TimerMessage}; use byteorder::{ByteOrder, LittleEndian}; +use crossbeam_channel as crossbeam; use failure::{Error, err_msg}; -use futures::{Async, Future, Stream, Poll, unsync::mpsc, task}; -use futures_cpupool::CpuPool; +use futures::{Async, Future, Stream, Poll, sync, unsync::mpsc, task}; use rand::{self, Rng, ThreadRng}; use udp::{Endpoint, UdpSocket, PeerServerMessage, UdpChannel}; use tokio_core::reactor::Handle; @@ -37,6 +38,20 @@ struct Channel<T> { rx: mpsc::UnboundedReceiver<T>, } +struct SyncChannel<T> { + tx: sync::mpsc::UnboundedSender<T>, + rx: sync::mpsc::UnboundedReceiver<T>, +} + +impl<T> From<(sync::mpsc::UnboundedSender<T>, sync::mpsc::UnboundedReceiver<T>)> for SyncChannel<T> { + fn from(pair: (sync::mpsc::UnboundedSender<T>, sync::mpsc::UnboundedReceiver<T>)) -> Self { + Self { + tx: pair.0, + rx: pair.1, + } + } +} + impl<T> From<(mpsc::UnboundedSender<T>, mpsc::UnboundedReceiver<T>)> for Channel<T> { fn from(pair: (mpsc::UnboundedSender<T>, mpsc::UnboundedReceiver<T>)) -> Self { Self { @@ -60,9 +75,9 @@ pub struct PeerServer { rate_limiter : RateLimiter, under_load_until : Instant, rng : ThreadRng, - cpu_pool : CpuPool, - decrypt_channel : Channel<(Endpoint, Transport, Vec<u8>, SessionType)>, - encrypt_channel : Channel<(SharedPeer, (Endpoint, Vec<u8>))>, + crypto_pool : crossbeam::Sender<crypto_pool::Work>, + decrypt_channel : SyncChannel<crypto_pool::DecryptResult>, + encrypt_channel : SyncChannel<crypto_pool::EncryptResult>, } impl PeerServer { @@ -80,9 +95,9 @@ impl PeerServer { rate_limiter : RateLimiter::new(&handle)?, under_load_until : Instant::now(), rng : rand::thread_rng(), - cpu_pool : CpuPool::new_num_cpus(), - decrypt_channel : mpsc::unbounded().into(), - encrypt_channel : mpsc::unbounded().into(), + crypto_pool : crypto_pool::create(), + decrypt_channel : sync::mpsc::unbounded().into(), + encrypt_channel : sync::mpsc::unbounded().into(), }) } @@ -273,7 +288,7 @@ impl PeerServer { } } else { debug!("sending empty keepalive"); - self.encrypt_and_send(peer_ref.clone(), &mut peer, UtunPacket::Inet4(vec![]))?; + self.encrypt_and_send(peer_ref.clone(), &mut peer, vec![])?; } } else { error!("peer not ready for transport after processing handshake response. this shouldn't happen."); @@ -299,38 +314,27 @@ impl PeerServer { let mut peer = peer_ref.borrow_mut(); let tx = self.decrypt_channel.tx.clone(); - let f = self.cpu_pool.spawn(peer.handle_incoming_transport(addr, packet)?) - .and_then(move |result| { - tx.unbounded_send(result).expect("broken decrypt channel"); - Ok(()) - }) - .map_err(|e| warn!("{:?}", e)); - self.handle.spawn(f); + let work = crypto_pool::Work::Decrypt((tx, peer.handle_incoming_transport(addr, packet)?)); + self.crypto_pool.send(work)?; Ok(()) } - fn encrypt_and_send(&mut self, peer_ref: SharedPeer, peer: &mut Peer, packet: UtunPacket) -> Result<(), Error> { + fn encrypt_and_send(&mut self, peer_ref: SharedPeer, peer: &mut Peer, packet: Vec<u8>) -> Result<(), Error> { let tx = self.encrypt_channel.tx.clone(); - let f = self.cpu_pool.spawn(peer.handle_outgoing_transport(packet)?) - .and_then(move |result| { - tx.unbounded_send((peer_ref, result)).expect("broken decrypt channel"); - Ok(()) - }) - .map_err(|e| warn!("{:?}", e)); - self.handle.spawn(f); + let work = crypto_pool::Work::Encrypt((tx, peer.handle_outgoing_transport(packet)?)); + self.crypto_pool.send(work)?; Ok(()) } - fn handle_ingress_decrypted_transport(&mut self, addr: Endpoint, orig_packet: Transport, raw_packet: Vec<u8>, session_type: SessionType) - -> Result<(), Error> + fn handle_ingress_decrypted_transport(&mut self, result: DecryptResult) -> Result<(), Error> { - let peer_ref = self.shared_state.borrow().index_map.get(&orig_packet.our_index()) + let peer_ref = self.shared_state.borrow().index_map.get(&result.orig_packet.our_index()) .ok_or_else(|| err_msg("unknown our_index"))?.clone(); let needs_handshake = { let mut peer = peer_ref.borrow_mut(); - let transition = peer.handle_incoming_decrypted_transport(addr, &raw_packet, session_type)?; + let transition = peer.handle_incoming_decrypted_transport(result.endpoint, &result.out_packet, result.session_type)?; let shared_state = self.shared_state.clone(); let mut state = shared_state.borrow_mut(); if let SessionTransition::Transition(possible_dead_index) = transition { @@ -338,7 +342,7 @@ impl PeerServer { let _ = state.index_map.remove(&index); } - let outgoing: Vec<UtunPacket> = peer.outgoing_queue.drain(..).collect(); + let outgoing: Vec<Vec<u8>> = peer.outgoing_queue.drain(..).collect(); for packet in outgoing { self.encrypt_and_send(peer_ref.clone(), &mut peer, packet)?; @@ -354,14 +358,14 @@ impl PeerServer { self.send_handshake_init(&peer_ref)?; } - if raw_packet.is_empty() { + if result.out_packet.is_empty() { debug!("received keepalive."); return Ok(()) // short-circuit on keep-alives } - self.shared_state.borrow_mut().router.validate_source(&raw_packet, &peer_ref)?; + self.shared_state.borrow_mut().router.validate_source(&result.out_packet, &peer_ref)?; trace!("received transport packet"); - self.send_to_tunnel(raw_packet)?; + self.send_to_tunnel(result.out_packet)?; Ok(()) } @@ -374,7 +378,7 @@ impl PeerServer { let needs_handshake = { let mut peer = peer_ref.borrow_mut(); let needs_handshake = peer.needs_new_handshake(true); - peer.queue_egress(packet); + peer.queue_egress(packet.into()); if peer.ready_for_transport() { if peer.outgoing_queue.len() > 1 { @@ -396,11 +400,13 @@ impl PeerServer { Ok(()) } - fn handle_egress_encrypted_packet(&mut self, peer_ref: SharedPeer, endpoint: Endpoint, packet: Vec<u8>) -> Result<(), Error> { + fn handle_egress_encrypted_packet(&mut self, result: EncryptResult) -> Result<(), Error> { + let peer_ref = self.shared_state.borrow().index_map.get(&result.our_index) + .ok_or_else(|| err_msg("unknown our_index"))?.clone(); let mut peer = peer_ref.borrow_mut(); - peer.handle_outgoing_encrypted_transport(&packet); + peer.handle_outgoing_encrypted_transport(&result.out_packet); - self.send_to_peer((endpoint, packet)) + self.send_to_peer((result.endpoint, result.out_packet)) } fn send_cookie_reply(&mut self, addr: Endpoint, mac1: &[u8], index: u32) -> Result<(), Error> { @@ -508,7 +514,7 @@ impl PeerServer { } } - self.encrypt_and_send(upgraded_peer_ref.clone(), &mut peer, UtunPacket::Inet4(vec![]))?; + self.encrypt_and_send(upgraded_peer_ref.clone(), &mut peer, vec![])?; debug!("sent passive keepalive packet"); self.timer.send_after(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone())); @@ -526,7 +532,7 @@ impl PeerServer { bail!("persistent keepalive tick (waiting ~{}s due to last authenticated packet time)", wait.as_secs()); } - self.encrypt_and_send(upgraded_peer_ref.clone(), &mut peer, UtunPacket::Inet4(vec![]))?; + self.encrypt_and_send(upgraded_peer_ref.clone(), &mut peer, vec![])?; let handle = self.timer.send_after(persistent_keepalive, PersistentKeepAlive(peer_ref.clone())); peer.timers.persistent_timer = Some(handle); debug!("sent persistent keepalive packet"); @@ -584,7 +590,7 @@ impl PeerServer { if let Some(keepalive) = peer.info.persistent_keepalive() { let handle = self.timer.send_after(keepalive, TimerMessage::PersistentKeepAlive(Rc::downgrade(&peer_ref))); peer.timers.persistent_timer = Some(handle); - self.encrypt_and_send(peer_ref.clone(), &mut peer, UtunPacket::Inet4(vec![]))?; + self.encrypt_and_send(peer_ref.clone(), &mut peer, vec![])?; debug!("set new keepalive timer and immediately sent new keepalive packet."); } } @@ -646,10 +652,9 @@ impl Future for PeerServer { } loop { - // Handle UDP packets from the outside world match self.decrypt_channel.rx.poll() { - Ok(Async::Ready(Some((addr, orig_packet, decrypted, session_type)))) => { - let _ = self.handle_ingress_decrypted_transport(addr, orig_packet, decrypted, session_type).map_err(|e| warn!("UDP ERR: {:?}", e)); + Ok(Async::Ready(Some(result))) => { + let _ = self.handle_ingress_decrypted_transport(result).map_err(|e| warn!("UDP ERR: {:?}", e)); }, Ok(Async::NotReady) => { break; }, Ok(Async::Ready(None)) => bail!("incoming udp stream ended unexpectedly"), @@ -660,8 +665,8 @@ impl Future for PeerServer { loop { // Handle UDP packets from the outside world match self.encrypt_channel.rx.poll() { - Ok(Async::Ready(Some((peer_ref, (endpoint, packet))))) => { - let _ = self.handle_egress_encrypted_packet(peer_ref, endpoint, packet).map_err(|e| warn!("UDP ERR: {:?}", e)); + Ok(Async::Ready(Some(result))) => { + let _ = self.handle_egress_encrypted_packet(result).map_err(|e| warn!("UDP ERR: {:?}", e)); }, Ok(Async::NotReady) => { break; }, Ok(Async::Ready(None)) => bail!("incoming udp stream ended unexpectedly"), |