aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorJake McGinty <me@jake.su>2018-02-01 15:44:14 +0000
committerJake McGinty <me@jake.su>2018-02-01 15:44:14 +0000
commite3816d9cd71209bdf7dc287ba1a12e89cf520195 (patch)
tree46d26d62fe9e896c1552ccb8620f7e3d47fcacb5 /src
parenthandle no-psk case (diff)
downloadwireguard-rs-e3816d9cd71209bdf7dc287ba1a12e89cf520195.tar.xz
wireguard-rs-e3816d9cd71209bdf7dc287ba1a12e89cf520195.zip
properly leave packets queued when no route exists to peer
Diffstat (limited to 'src')
-rw-r--r--src/interface/peer_server.rs71
1 files changed, 45 insertions, 26 deletions
diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs
index e4d442a..b7384e6 100644
--- a/src/interface/peer_server.rs
+++ b/src/interface/peer_server.rs
@@ -8,7 +8,7 @@ use std::time::Duration;
use base64;
use byteorder::{ByteOrder, BigEndian, LittleEndian};
-use futures::{Async, Future, Stream, Sink, Poll, future, unsync, sync, stream};
+use futures::{self, Async, Future, Stream, Sink, Poll, future, unsync, sync, stream};
use pnet::packet::ipv4::Ipv4Packet;
use snow::{self, NoiseBuilder};
use tokio_core::net::{UdpSocket, UdpCodec, UdpFramed};
@@ -47,7 +47,7 @@ pub struct PeerServer {
timer: Timer,
udp_stream: stream::SplitStream<UdpFramed<VecUdpCodec>>,
outgoing_tx: unsync::mpsc::Sender<Vec<u8>>,
- outgoing_rx: unsync::mpsc::Receiver<Vec<u8>>,
+ outgoing_rx: futures::stream::Peekable<unsync::mpsc::Receiver<Vec<u8>>>,
timer_tx: unsync::mpsc::Sender<TimerMessage>,
timer_rx: unsync::mpsc::Receiver<TimerMessage>,
udp_tx: unsync::mpsc::Sender<(SocketAddr, Vec<u8>)>,
@@ -61,6 +61,7 @@ impl PeerServer {
let (timer_tx, timer_rx) = unsync::mpsc::channel::<TimerMessage>(1024);
let (udp_tx, udp_rx) = unsync::mpsc::channel::<(SocketAddr, Vec<u8>)>(1024);
let (outgoing_tx, outgoing_rx) = unsync::mpsc::channel::<Vec<u8>>(1024);
+ let outgoing_rx = outgoing_rx.peekable();
let timer = Timer::default();
let udp_write_passthrough = udp_sink.sink_map_err(|_| ()).send_all(
@@ -254,29 +255,47 @@ impl PeerServer {
}
}
- fn handle_outgoing_packet(&mut self, packet: Vec<u8>) {
- debug_packet("received UTUN packet: ", &packet);
- let state = self.shared_state.borrow();
- let mut out_packet = vec![0u8; 1500];
- let destination = Ipv4Packet::new(&packet).unwrap().get_destination();
- if let Some((_, _, peer)) = state.ip4_map.longest_match(destination) {
- let mut peer = peer.borrow_mut();
- out_packet[0] = 4;
- if let Some(their_index) = peer.their_current_index() {
- let endpoint = peer.info.endpoint.unwrap();
- peer.tx_bytes += packet.len() as u64;
- let noise = peer.current_noise().expect("current noise session");
- LittleEndian::write_u32(&mut out_packet[4..], their_index);
- LittleEndian::write_u64(&mut out_packet[8..], noise.sending_nonce().unwrap());
- let len = noise.write_message(&packet, &mut out_packet[16..]).expect("failed to encrypt outgoing UDP packet");
- out_packet.truncate(16 + len);
- self.handle.spawn(self.udp_tx.clone().send((endpoint, out_packet)).then(|_| Ok(())));
+ // Just this way to avoid a double-mutable-borrow while peeking.
+ fn peek_and_handle(&mut self) -> Result<bool,()> {
+ let routed = {
+ let packet = match self.outgoing_rx.peek() {
+ Ok(Async::Ready(Some(packet))) => packet,
+ Ok(Async::NotReady) => return Ok(false),
+ Ok(Async::Ready(None)) | Err(_) => return Err(()),
+ };
+
+ debug_packet("received UTUN packet: ", &packet);
+ let state = self.shared_state.borrow();
+ let mut out_packet = vec![0u8; 1500];
+ let destination = Ipv4Packet::new(&packet).unwrap().get_destination();
+
+ if let Some((_, _, peer)) = state.ip4_map.longest_match(destination) {
+ let mut peer = peer.borrow_mut();
+ out_packet[0] = 4;
+ if let Some(their_index) = peer.their_current_index() {
+ let endpoint = peer.info.endpoint.unwrap();
+ peer.tx_bytes += packet.len() as u64;
+ let noise = peer.current_noise().expect("current noise session");
+ LittleEndian::write_u32(&mut out_packet[4..], their_index);
+ LittleEndian::write_u64(&mut out_packet[8..], noise.sending_nonce().unwrap());
+ let len = noise.write_message(&packet, &mut out_packet[16..]).expect("failed to encrypt outgoing UDP packet");
+ out_packet.truncate(16 + len);
+ self.handle.spawn(self.udp_tx.clone().send((endpoint, out_packet)).then(|_| Ok(())));
+ true
+ } else {
+ debug!("got outgoing packet with no current session");
+ false
+ }
} else {
- info!("got outgoing packet with no current session");
+ // TODO return another error and generate ICMP "no route" packet
+ warn!("got packet with no available outgoing route");
+ false
}
- } else {
- warn!("got packet with no available outgoing route");
+ };
+ if routed {
+ let _ = self.outgoing_rx.poll();
}
+ return Ok(routed)
}
}
@@ -305,10 +324,10 @@ impl Future for PeerServer {
// Handle packets coming from the local tunnel
loop {
- match self.outgoing_rx.poll() {
- Ok(Async::Ready(Some(packet))) => self.handle_outgoing_packet(packet),
- Ok(Async::NotReady) => break,
- Ok(Async::Ready(None)) | Err(_) => return Err(()),
+ match self.peek_and_handle() {
+ Ok(false) => break,
+ Err(_) => return Err(()),
+ _ => {}
}
}