From c8a8b6a568e2bb78bdd1470e0fba1fb360b769ee Mon Sep 17 00:00:00 2001 From: Jake McGinty Date: Thu, 17 May 2018 19:43:29 -0700 Subject: peer_server: use unbounded channels, ratelimiter wip unbounded channels are easier to deal with now, and bounded channels weren't actually doing anything useful. --- src/interface/peer_server.rs | 132 ++++++++++++++++++++++++++----------------- 1 file changed, 81 insertions(+), 51 deletions(-) (limited to 'src/interface/peer_server.rs') diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs index ebcb01b..fe4797c 100644 --- a/src/interface/peer_server.rs +++ b/src/interface/peer_server.rs @@ -4,16 +4,18 @@ use cookie; use interface::{SharedPeer, SharedState, State, UtunPacket}; use message::{Message, Initiation, Response, CookieReply, Transport}; use peer::{Peer, SessionType, SessionTransition}; +use ratelimiter::RateLimiter; use timestamp::Timestamp; use timer::{Timer, TimerMessage}; use byteorder::{ByteOrder, LittleEndian}; use failure::{Error, err_msg}; -use futures::{Async, Future, Stream, Sink, Poll, unsync::mpsc}; +use futures::{Async, Future, Stream, Poll, unsync::mpsc, task}; use rand::{self, Rng, ThreadRng}; use udp::{Endpoint, UdpSocket, PeerServerMessage, UdpChannel}; use tokio_core::reactor::Handle; +use std::collections::VecDeque; use std::convert::TryInto; use std::rc::Rc; @@ -27,12 +29,12 @@ pub enum ChannelMessage { } struct Channel { - tx: mpsc::Sender, - rx: mpsc::Receiver, + tx: mpsc::UnboundedSender, + rx: mpsc::UnboundedReceiver, } -impl From<(mpsc::Sender, mpsc::Receiver)> for Channel { - fn from(pair: (mpsc::Sender, mpsc::Receiver)) -> Self { +impl From<(mpsc::UnboundedSender, mpsc::UnboundedReceiver)> for Channel { + fn from(pair: (mpsc::UnboundedSender, mpsc::UnboundedReceiver)) -> Self { Self { tx: pair.0, rx: pair.1, @@ -41,30 +43,36 @@ impl From<(mpsc::Sender, mpsc::Receiver)> for Channel { } pub struct PeerServer { - handle : Handle, - shared_state : SharedState, - udp : Option, - port : Option, - outgoing : Channel, - channel : Channel, - timer : Timer, - tunnel_tx : mpsc::Sender>, - cookie : cookie::Validator, - rng : ThreadRng, + handle : Handle, + shared_state : SharedState, + udp : Option, + port : Option, + outgoing : Channel, + channel : Channel, + handshakes : VecDeque<(Endpoint, Message)>, + timer : Timer, + tunnel_tx : mpsc ::UnboundedSender>, + cookie : cookie::Validator, + rate_limiter : RateLimiter, + under_load_until : Timestamp, + rng : ThreadRng, } impl PeerServer { - pub fn new(handle: Handle, shared_state: SharedState, tunnel_tx: mpsc::Sender>) -> Result { + pub fn new(handle: Handle, shared_state: SharedState, tunnel_tx: mpsc::UnboundedSender>) -> Result { Ok(PeerServer { shared_state, tunnel_tx, - handle : handle.clone(), - timer : Timer::new(handle), - udp : None, - port : None, - outgoing : mpsc::channel(1024).into(), - channel : mpsc::channel(1024).into(), - cookie : cookie::Validator::new(&[0u8; 32]), - rng : rand::thread_rng() + handle : handle.clone(), + timer : Timer::new(handle.clone()), + udp : None, + port : None, + outgoing : mpsc::unbounded().into(), + channel : mpsc::unbounded().into(), + handshakes : VecDeque::new(), + cookie : cookie::Validator::new(&[0u8; 32]), + rate_limiter : RateLimiter::new(&handle)?, + under_load_until : Timestamp::default(), + rng : rand::thread_rng() }) } @@ -77,8 +85,8 @@ impl PeerServer { return Ok(()); } - let port = interface.listen_port.unwrap_or(0); - let fwmark = interface.fwmark.unwrap_or(0); + let port = interface.listen_port.unwrap_or(0); + let fwmark = interface.fwmark.unwrap_or(0); if self.port.is_some() && self.port.unwrap() == port { debug!("skipping rebind, since we're already listening on the correct port."); @@ -100,11 +108,11 @@ impl PeerServer { Ok(()) } - pub fn tunnel_tx(&self) -> mpsc::Sender { + pub fn tunnel_tx(&self) -> mpsc::UnboundedSender { self.outgoing.tx.clone() } - pub fn tx(&self) -> mpsc::Sender { + pub fn tx(&self) -> mpsc::UnboundedSender { self.channel.tx.clone() } @@ -114,8 +122,8 @@ impl PeerServer { Ok(()) } - fn send_to_tunnel(&self, packet: Vec) { - self.handle.spawn(self.tunnel_tx.clone().send(packet).then(|_| Ok(()))); + fn send_to_tunnel(&self, packet: Vec) -> Result<(), Error> { + self.tunnel_tx.unbounded_send(packet).map_err(|e| e.into()) } fn unused_index(&mut self, state: &mut State) -> u32 { @@ -130,12 +138,29 @@ impl PeerServer { fn handle_ingress_packet(&mut self, addr: Endpoint, packet: Vec) -> Result<(), Error> { trace!("got a UDP packet from {:?} of length {}, packet type {}", &addr, packet.len(), packet[0]); - match packet.try_into()? { - Message::Initiation(packet) => self.handle_ingress_handshake_init(addr, &packet), - Message::Response(packet) => self.handle_ingress_handshake_resp(addr, &packet), - Message::CookieReply(packet) => self.handle_ingress_cookie_reply(addr, &packet), - Message::Transport(packet) => self.handle_ingress_transport(addr, &packet), + let message = packet.try_into()?; + if let Message::Transport(packet) = message { + self.handle_ingress_transport(addr, &packet)?; + } else { + self.queue_ingress_handshake(addr, message); } + Ok(()) + } + + fn queue_ingress_handshake(&mut self, addr: Endpoint, message: Message) { + // TODO: max queue size management + self.handshakes.push_back((addr, message)); + task::current().notify(); + } + + fn handle_ingress_handshake(&mut self, addr: Endpoint, message: &Message) -> Result<(), Error> { + match message { + Message::Initiation(ref packet) => self.handle_ingress_handshake_init(addr, packet)?, + Message::Response(ref packet) => self.handle_ingress_handshake_resp(addr, packet)?, + Message::CookieReply(ref packet) => self.handle_ingress_cookie_reply(addr, packet)?, + Message::Transport(_) => unreachable!("no transport packets allowed"), + } + Ok(()) } fn handle_ingress_handshake_init(&mut self, addr: Endpoint, packet: &Initiation) -> Result<(), Error> { @@ -253,7 +278,7 @@ impl PeerServer { self.shared_state.borrow_mut().router.validate_source(&raw_packet, &peer_ref)?; trace!("received transport packet"); - self.send_to_tunnel(raw_packet); + self.send_to_tunnel(raw_packet)?; Ok(()) } @@ -481,54 +506,59 @@ impl Future for PeerServer { type Error = Error; fn poll(&mut self) -> Poll { - // Handle config events + // Poll inner Futures until at least one of them has returned a NotReady. It's not + // safe to return NotReady yourself unless at least one future has returned a NotReady. loop { + let mut not_ready = false; + // Handle config events match self.channel.rx.poll() { Ok(Async::Ready(Some(event))) => { let _ = self.handle_incoming_event(event); }, - Ok(Async::NotReady) => break, + Ok(Async::NotReady) => { not_ready = true; }, Ok(Async::Ready(None)) => bail!("config stream ended unexpectedly"), Err(e) => bail!("config stream error: {:?}", e), } - } - // Handle pending state-changing timers - loop { + // Handle pending state-changing timers match self.timer.poll() { Ok(Async::Ready(Some(message))) => { let _ = self.handle_timer(message).map_err(|e| debug!("TIMER: {}", e)); }, - Ok(Async::NotReady) => break, + Ok(Async::NotReady) => { not_ready = true; }, Ok(Async::Ready(None)) => bail!("timer stream ended unexpectedly"), Err(e) => bail!("timer stream error: {:?}", e), } - } - // Handle UDP packets from the outside world - if self.udp.is_some() { - loop { + // Handle UDP packets from the outside world + if self.udp.is_some() { match self.udp.as_mut().unwrap().ingress.poll() { Ok(Async::Ready(Some((addr, packet)))) => { let _ = self.handle_ingress_packet(addr, packet).map_err(|e| warn!("UDP ERR: {:?}", e)); }, - Ok(Async::NotReady) => break, + Ok(Async::NotReady) => { not_ready = true; }, Ok(Async::Ready(None)) => bail!("incoming udp stream ended unexpectedly"), Err(e) => bail!("incoming udp stream error: {:?}", e) } } - } - // Handle packets coming from the local tunnel - loop { + // Handle packets coming from the local tunnel match self.outgoing.rx.poll() { Ok(Async::Ready(Some(packet))) => { let _ = self.handle_egress_packet(packet).map_err(|e| warn!("UDP ERR: {:?}", e)); }, - Ok(Async::NotReady) => break, + Ok(Async::NotReady) => { not_ready = true; }, Ok(Async::Ready(None)) => bail!("outgoing udp stream ended unexpectedly"), Err(e) => bail!("outgoing udp stream error: {:?}", e), } + + if not_ready { + break; + } + } + + if let Some((addr, message)) = self.handshakes.pop_front() { + let _ = self.handle_ingress_handshake(addr, &message).map_err(|e| warn!("handshake err: {:?}", e)); } Ok(Async::NotReady) -- cgit v1.2.3-59-g8ed1b