diff options
author | Jake McGinty <me@jake.su> | 2018-02-01 15:44:14 +0000 |
---|---|---|
committer | Jake McGinty <me@jake.su> | 2018-02-01 15:44:14 +0000 |
commit | e3816d9cd71209bdf7dc287ba1a12e89cf520195 (patch) | |
tree | 46d26d62fe9e896c1552ccb8620f7e3d47fcacb5 /src | |
parent | handle no-psk case (diff) | |
download | wireguard-rs-e3816d9cd71209bdf7dc287ba1a12e89cf520195.tar.xz wireguard-rs-e3816d9cd71209bdf7dc287ba1a12e89cf520195.zip |
properly leave packets queued when no route exists to peer
Diffstat (limited to 'src')
-rw-r--r-- | src/interface/peer_server.rs | 71 |
1 files changed, 45 insertions, 26 deletions
diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs index e4d442a..b7384e6 100644 --- a/src/interface/peer_server.rs +++ b/src/interface/peer_server.rs @@ -8,7 +8,7 @@ use std::time::Duration; use base64; use byteorder::{ByteOrder, BigEndian, LittleEndian}; -use futures::{Async, Future, Stream, Sink, Poll, future, unsync, sync, stream}; +use futures::{self, Async, Future, Stream, Sink, Poll, future, unsync, sync, stream}; use pnet::packet::ipv4::Ipv4Packet; use snow::{self, NoiseBuilder}; use tokio_core::net::{UdpSocket, UdpCodec, UdpFramed}; @@ -47,7 +47,7 @@ pub struct PeerServer { timer: Timer, udp_stream: stream::SplitStream<UdpFramed<VecUdpCodec>>, outgoing_tx: unsync::mpsc::Sender<Vec<u8>>, - outgoing_rx: unsync::mpsc::Receiver<Vec<u8>>, + outgoing_rx: futures::stream::Peekable<unsync::mpsc::Receiver<Vec<u8>>>, timer_tx: unsync::mpsc::Sender<TimerMessage>, timer_rx: unsync::mpsc::Receiver<TimerMessage>, udp_tx: unsync::mpsc::Sender<(SocketAddr, Vec<u8>)>, @@ -61,6 +61,7 @@ impl PeerServer { let (timer_tx, timer_rx) = unsync::mpsc::channel::<TimerMessage>(1024); let (udp_tx, udp_rx) = unsync::mpsc::channel::<(SocketAddr, Vec<u8>)>(1024); let (outgoing_tx, outgoing_rx) = unsync::mpsc::channel::<Vec<u8>>(1024); + let outgoing_rx = outgoing_rx.peekable(); let timer = Timer::default(); let udp_write_passthrough = udp_sink.sink_map_err(|_| ()).send_all( @@ -254,29 +255,47 @@ impl PeerServer { } } - fn handle_outgoing_packet(&mut self, packet: Vec<u8>) { - debug_packet("received UTUN packet: ", &packet); - let state = self.shared_state.borrow(); - let mut out_packet = vec![0u8; 1500]; - let destination = Ipv4Packet::new(&packet).unwrap().get_destination(); - if let Some((_, _, peer)) = state.ip4_map.longest_match(destination) { - let mut peer = peer.borrow_mut(); - out_packet[0] = 4; - if let Some(their_index) = peer.their_current_index() { - let endpoint = peer.info.endpoint.unwrap(); - peer.tx_bytes += packet.len() as u64; - let noise = peer.current_noise().expect("current noise session"); - LittleEndian::write_u32(&mut out_packet[4..], their_index); - LittleEndian::write_u64(&mut out_packet[8..], noise.sending_nonce().unwrap()); - let len = noise.write_message(&packet, &mut out_packet[16..]).expect("failed to encrypt outgoing UDP packet"); - out_packet.truncate(16 + len); - self.handle.spawn(self.udp_tx.clone().send((endpoint, out_packet)).then(|_| Ok(()))); + // Just this way to avoid a double-mutable-borrow while peeking. + fn peek_and_handle(&mut self) -> Result<bool,()> { + let routed = { + let packet = match self.outgoing_rx.peek() { + Ok(Async::Ready(Some(packet))) => packet, + Ok(Async::NotReady) => return Ok(false), + Ok(Async::Ready(None)) | Err(_) => return Err(()), + }; + + debug_packet("received UTUN packet: ", &packet); + let state = self.shared_state.borrow(); + let mut out_packet = vec![0u8; 1500]; + let destination = Ipv4Packet::new(&packet).unwrap().get_destination(); + + if let Some((_, _, peer)) = state.ip4_map.longest_match(destination) { + let mut peer = peer.borrow_mut(); + out_packet[0] = 4; + if let Some(their_index) = peer.their_current_index() { + let endpoint = peer.info.endpoint.unwrap(); + peer.tx_bytes += packet.len() as u64; + let noise = peer.current_noise().expect("current noise session"); + LittleEndian::write_u32(&mut out_packet[4..], their_index); + LittleEndian::write_u64(&mut out_packet[8..], noise.sending_nonce().unwrap()); + let len = noise.write_message(&packet, &mut out_packet[16..]).expect("failed to encrypt outgoing UDP packet"); + out_packet.truncate(16 + len); + self.handle.spawn(self.udp_tx.clone().send((endpoint, out_packet)).then(|_| Ok(()))); + true + } else { + debug!("got outgoing packet with no current session"); + false + } } else { - info!("got outgoing packet with no current session"); + // TODO return another error and generate ICMP "no route" packet + warn!("got packet with no available outgoing route"); + false } - } else { - warn!("got packet with no available outgoing route"); + }; + if routed { + let _ = self.outgoing_rx.poll(); } + return Ok(routed) } } @@ -305,10 +324,10 @@ impl Future for PeerServer { // Handle packets coming from the local tunnel loop { - match self.outgoing_rx.poll() { - Ok(Async::Ready(Some(packet))) => self.handle_outgoing_packet(packet), - Ok(Async::NotReady) => break, - Ok(Async::Ready(None)) | Err(_) => return Err(()), + match self.peek_and_handle() { + Ok(false) => break, + Err(_) => return Err(()), + _ => {} } } |