diff options
Diffstat (limited to 'src/interface/peer_server.rs')
-rw-r--r-- | src/interface/peer_server.rs | 75 |
1 files changed, 20 insertions, 55 deletions
diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs index d03fa58..ee40061 100644 --- a/src/interface/peer_server.rs +++ b/src/interface/peer_server.rs @@ -2,7 +2,7 @@ use consts::{REKEY_TIMEOUT, KEEPALIVE_TIMEOUT, STALE_SESSION_TIMEOUT, MAX_CONTENT_SIZE, WIPE_AFTER_TIME, MAX_HANDSHAKE_ATTEMPTS, UNDER_LOAD_QUEUE_SIZE, UNDER_LOAD_TIME}; use cookie; -use interface::{SharedPeer, SharedState, State, UtunPacket}; +use interface::{SharedPeer, SharedState, State}; use message::{Message, Initiation, Response, CookieReply, Transport}; use peer::{Peer, SessionType, SessionTransition}; use ratelimiter::RateLimiter; @@ -12,10 +12,10 @@ use timer::{Timer, TimerMessage}; use byteorder::{ByteOrder, LittleEndian}; use failure::{Error, err_msg}; use futures::{Async, Future, Stream, Poll, unsync::mpsc, task}; -use futures_cpupool::CpuPool; use rand::{self, Rng, ThreadRng}; use udp::{Endpoint, UdpSocket, PeerServerMessage, UdpChannel}; use tokio_core::reactor::Handle; +use types::PacketVec; use std::collections::VecDeque; use std::convert::TryInto; @@ -51,17 +51,15 @@ pub struct PeerServer { shared_state : SharedState, udp : Option<UdpChannel>, port : Option<u16>, - outgoing : Channel<UtunPacket>, + outgoing : Channel<PacketVec>, channel : Channel<ChannelMessage>, handshakes : VecDeque<(Endpoint, Message)>, timer : Timer, - tunnel_tx : mpsc::UnboundedSender<Vec<u8>>, + tunnel_tx : mpsc::UnboundedSender<PacketVec>, cookie : cookie::Validator, rate_limiter : RateLimiter, under_load_until : Instant, rng : ThreadRng, - cpu_pool : CpuPool, - decrypt_channel : Channel<(Endpoint, Transport, Vec<u8>, SessionType)>, } impl PeerServer { @@ -79,8 +77,6 @@ impl PeerServer { rate_limiter : RateLimiter::new(&handle)?, under_load_until : Instant::now(), rng : rand::thread_rng(), - cpu_pool : CpuPool::new_num_cpus(), - decrypt_channel : mpsc::unbounded().into(), }) } @@ -116,7 +112,7 @@ impl PeerServer { Ok(()) } - pub fn tunnel_tx(&self) -> mpsc::UnboundedSender<UtunPacket> { + pub fn tunnel_tx(&self) -> mpsc::UnboundedSender<PacketVec> { self.outgoing.tx.clone() } @@ -266,11 +262,11 @@ impl PeerServer { if !peer.outgoing_queue.is_empty() { debug!("sending {} queued egress packets", peer.outgoing_queue.len()); while let Some(packet) = peer.outgoing_queue.pop_front() { - self.send_to_peer(peer.handle_outgoing_transport(packet.payload())?)?; + self.send_to_peer(peer.handle_outgoing_transport(packet)?)?; } } else { debug!("sending empty keepalive"); - self.send_to_peer(peer.handle_outgoing_transport(&[])?)?; + self.send_to_peer(peer.handle_outgoing_transport(vec![])?)?; } } else { error!("peer not ready for transport after processing handshake response. this shouldn't happen."); @@ -290,42 +286,23 @@ impl PeerServer { } fn handle_ingress_transport(&mut self, addr: Endpoint, packet: Transport) -> Result<(), Error> { - let peer_ref = self.shared_state.borrow().index_map.get(&packet.our_index()) .ok_or_else(|| err_msg("unknown our_index"))?.clone(); - let mut peer = peer_ref.borrow_mut(); - let tx = self.decrypt_channel.tx.clone(); - let f = self.cpu_pool.spawn(peer.handle_incoming_transport(addr, packet)?) - .and_then(move |result| { - tx.unbounded_send(result).expect("broken decrypt channel"); - Ok(()) - }) - .map_err(|e| warn!("{:?}", e)); - self.handle.spawn(f); - - Ok(()) - } - - fn handle_ingress_decrypted_transport(&mut self, addr: Endpoint, orig_packet: Transport, raw_packet: Vec<u8>, session_type: SessionType) - -> Result<(), Error> - { - let peer_ref = self.shared_state.borrow().index_map.get(&orig_packet.our_index()) - .ok_or_else(|| err_msg("unknown our_index"))?.clone(); - - let needs_handshake = { + let (raw_packet, needs_handshake) = { let mut peer = peer_ref.borrow_mut(); - let transition = peer.handle_incoming_decrypted_transport(addr, &raw_packet, session_type)?; let mut state = self.shared_state.borrow_mut(); + let (raw_packet, transition) = peer.handle_incoming_transport(addr, &packet)?; + if let SessionTransition::Transition(possible_dead_index) = transition { if let Some(index) = possible_dead_index { let _ = state.index_map.remove(&index); } - let outgoing: Vec<UtunPacket> = peer.outgoing_queue.drain(..).collect(); + let outgoing: Vec<PacketVec> = peer.outgoing_queue.drain(..).collect(); for packet in outgoing { - match peer.handle_outgoing_transport(packet.payload()) { + match peer.handle_outgoing_transport(packet) { Ok(message) => self.send_to_peer(message)?, Err(e) => warn!("failed to encrypt packet: {}", e) } @@ -333,7 +310,7 @@ impl PeerServer { self.timer.send_after(*WIPE_AFTER_TIME, TimerMessage::Wipe(Rc::downgrade(&peer_ref))); } - peer.needs_new_handshake(false) + (raw_packet, peer.needs_new_handshake(false)) }; if needs_handshake { @@ -352,10 +329,10 @@ impl PeerServer { Ok(()) } - fn handle_egress_packet(&mut self, packet: UtunPacket) -> Result<(), Error> { - ensure!(!packet.payload().is_empty() && packet.payload().len() <= MAX_CONTENT_SIZE, "egress packet outside of size bounds"); + fn handle_egress_packet(&mut self, packet: PacketVec) -> Result<(), Error> { + ensure!(!packet.is_empty() && packet.len() <= MAX_CONTENT_SIZE, "egress packet outside of size bounds"); - let peer_ref = self.shared_state.borrow_mut().router.route_to_peer(packet.payload()) + let peer_ref = self.shared_state.borrow_mut().router.route_to_peer(&packet) .ok_or_else(|| err_msg("no route to peer"))?; let needs_handshake = { @@ -369,7 +346,7 @@ impl PeerServer { } while let Some(packet) = peer.outgoing_queue.pop_front() { - self.send_to_peer(peer.handle_outgoing_transport(packet.payload())?)?; + self.send_to_peer(peer.handle_outgoing_transport(packet)?)?; } } @@ -488,7 +465,7 @@ impl PeerServer { } } - self.send_to_peer(peer.handle_outgoing_transport(&[])?)?; + self.send_to_peer(peer.handle_outgoing_transport(vec![])?)?; debug!("sent passive keepalive packet"); self.timer.send_after(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone())); @@ -506,7 +483,7 @@ impl PeerServer { bail!("persistent keepalive tick (waiting ~{}s due to last authenticated packet time)", wait.as_secs()); } - self.send_to_peer(peer.handle_outgoing_transport(&[])?)?; + self.send_to_peer(peer.handle_outgoing_transport(vec![])?)?; let handle = self.timer.send_after(persistent_keepalive, PersistentKeepAlive(peer_ref.clone())); peer.timers.persistent_timer = Some(handle); debug!("sent persistent keepalive packet"); @@ -564,7 +541,7 @@ impl PeerServer { if let Some(keepalive) = peer.info.persistent_keepalive() { let handle = self.timer.send_after(keepalive, TimerMessage::PersistentKeepAlive(Rc::downgrade(&peer_ref))); peer.timers.persistent_timer = Some(handle); - self.send_to_peer(peer.handle_outgoing_transport(&[])?)?; + self.send_to_peer(peer.handle_outgoing_transport(vec![])?)?; debug!("set new keepalive timer and immediately sent new keepalive packet."); } } @@ -626,18 +603,6 @@ impl Future for PeerServer { } loop { - // Handle UDP packets from the outside world - match self.decrypt_channel.rx.poll() { - Ok(Async::Ready(Some((addr, orig_packet, decrypted, session_type)))) => { - let _ = self.handle_ingress_decrypted_transport(addr, orig_packet, decrypted, session_type).map_err(|e| warn!("UDP ERR: {:?}", e)); - }, - Ok(Async::NotReady) => { break; }, - Ok(Async::Ready(None)) => bail!("incoming udp stream ended unexpectedly"), - Err(e) => bail!("incoming udp stream error: {:?}", e) - } - } - - loop { // Handle packets coming from the local tunnel match self.outgoing.rx.poll() { Ok(Async::Ready(Some(packet))) => { |