aboutsummaryrefslogtreecommitdiffstats
path: root/src/interface
diff options
context:
space:
mode:
authorJake McGinty <me@jake.su>2018-02-23 02:10:09 +0000
committerJake McGinty <me@jake.su>2018-02-23 02:10:09 +0000
commit130d6d4ca31b62b31f4a6334b32cf011d3221d4a (patch)
treee687fa1c42effdd0b684a77acb3d3a9e0a4f2b8d /src/interface
parentprecompute cookie MAC keys (diff)
downloadwireguard-rs-130d6d4ca31b62b31f4a6334b32cf011d3221d4a.tar.xz
wireguard-rs-130d6d4ca31b62b31f4a6334b32cf011d3221d4a.zip
support listen-port changes
Diffstat (limited to 'src/interface')
-rw-r--r--src/interface/mod.rs14
-rw-r--r--src/interface/peer_server.rs162
2 files changed, 120 insertions, 56 deletions
diff --git a/src/interface/mod.rs b/src/interface/mod.rs
index 22720e5..9316e80 100644
--- a/src/interface/mod.rs
+++ b/src/interface/mod.rs
@@ -101,7 +101,7 @@ impl Interface {
let (utun_tx, utun_rx) = unsync::mpsc::channel::<Vec<u8>>(1024);
- let peer_server = PeerServer::bind(core.handle(), self.state.clone(), utun_tx.clone()).unwrap();
+ let peer_server = PeerServer::new(core.handle(), self.state.clone(), utun_tx.clone()).unwrap();
let utun_stream = UtunStream::connect(&self.name, &core.handle()).unwrap().framed(VecUtunCodec{});
let (utun_writer, utun_reader) = utun_stream.split();
@@ -145,6 +145,9 @@ impl Interface {
if let Some(private_key) = info.private_key {
s.push_str(&format!("private_key={}\n", hex::encode(private_key)));
}
+ if let Some(port) = info.listen_port {
+ s.push_str(&format!("listen_port={}\n", port));
+ }
for (_, peer) in peers.iter() {
s.push_str(&peer.borrow().to_config_string());
@@ -163,7 +166,7 @@ impl Interface {
}
}).map_err(|_| ());
- let config_fut = config_rx.for_each({
+ let config_fut = config_rx.and_then({
let state = self.state.clone();
move |event| {
let mut state = state.borrow_mut();
@@ -179,7 +182,7 @@ impl Interface {
state.interface_info.listen_port = Some(port);
info!("set listen port: {}", port);
},
- UpdateEvent::UpdatePeer(info) => {
+ UpdateEvent::UpdatePeer(ref info) => {
info!("added new peer: {}", info);
let mut peer = Peer::new(info.clone());
@@ -203,11 +206,12 @@ impl Interface {
},
_ => warn!("unhandled UpdateEvent received")
}
-
- future::ok(())
+ future::ok(event)
}
}).map_err(|e| { warn!("error {:?}", e); () });
+ let config_fut = peer_server.config_tx().sink_map_err(|_|()).send_all(config_fut).map_err(|e| { warn!("error {:?}", e); () });
+
core.run(peer_server.join(utun_fut.join(config_fut.join(config_server)))).unwrap();
}
}
diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs
index 11f47b0..000da27 100644
--- a/src/interface/peer_server.rs
+++ b/src/interface/peer_server.rs
@@ -1,7 +1,7 @@
-use super::{SharedState, UtunPacket};
-use consts::{REKEY_TIMEOUT, REKEY_AFTER_TIME, REJECT_AFTER_TIME, REKEY_ATTEMPT_TIME, KEEPALIVE_TIMEOUT, MAX_CONTENT_SIZE, TIMER_TICK_DURATION};
+use consts::{REKEY_TIMEOUT, REKEY_AFTER_TIME, REJECT_AFTER_TIME, REKEY_ATTEMPT_TIME,
+ KEEPALIVE_TIMEOUT, MAX_CONTENT_SIZE, TIMER_TICK_DURATION};
use cookie;
-use interface::SharedPeer;
+use interface::{SharedPeer, SharedState, UtunPacket, config};
use peer::{Peer, SessionType};
use timer::{Timer, TimerMessage};
@@ -11,7 +11,7 @@ use std::time::{Duration, Instant};
use byteorder::{ByteOrder, LittleEndian};
use failure::{Error, err_msg};
-use futures::{Async, Future, Stream, Sink, Poll, unsync::mpsc, stream};
+use futures::{Async, Future, Stream, Sink, Poll, unsync::mpsc, stream, future};
use socket2::{Socket, Domain, Type, Protocol};
use tokio_core::net::{UdpSocket, UdpCodec, UdpFramed};
use tokio_core::reactor::Handle;
@@ -49,51 +49,90 @@ impl UdpCodec for VecUdpCodec {
}
}
+struct Channel<T> {
+ tx: mpsc::Sender<T>,
+ rx: mpsc::Receiver<T>,
+}
+
+impl<T> From<(mpsc::Sender<T>, mpsc::Receiver<T>)> for Channel<T> {
+ fn from(pair: (mpsc::Sender<T>, mpsc::Receiver<T>)) -> Self {
+ Self {
+ tx: pair.0,
+ rx: pair.1,
+ }
+ }
+}
+
pub struct PeerServer {
handle : Handle,
shared_state : SharedState,
- udp_stream : stream::SplitStream<UdpFramed<VecUdpCodec>>,
+ ingress : Option<stream::SplitStream<UdpFramed<VecUdpCodec>>>,
+ egress_tx : Option<mpsc::Sender<PeerServerMessage>>,
+ port : Option<u16>,
+ outgoing : Channel<UtunPacket>,
+ config : Channel<config::UpdateEvent>,
timer : Timer,
- outgoing_tx : mpsc::Sender<UtunPacket>,
- outgoing_rx : mpsc::Receiver<UtunPacket>,
- udp_tx : mpsc::Sender<(SocketAddr, Vec<u8>)>,
tunnel_tx : mpsc::Sender<Vec<u8>>,
cookie : cookie::Validator,
}
impl PeerServer {
- pub fn bind(handle: Handle, shared_state: SharedState, tunnel_tx: mpsc::Sender<Vec<u8>>) -> Result<Self, Error> {
- let timer = Timer::default();
- let port = shared_state.borrow().interface_info.listen_port.unwrap_or(0);
- let socket = Socket::new(Domain::ipv6(), Type::dgram(), Some(Protocol::udp()))?;
+ pub fn new(handle: Handle, shared_state: SharedState, tunnel_tx: mpsc::Sender<Vec<u8>>) -> Result<Self, Error> {
+ Ok(PeerServer {
+ handle, shared_state, tunnel_tx,
+ timer : Timer::default(),
+ ingress : None,
+ egress_tx : None,
+ port : None,
+ outgoing : mpsc::channel(1024).into(),
+ config : mpsc::channel(1024).into(),
+ cookie : cookie::Validator::new(&[0u8; 32])
+ })
+ }
+
+ pub fn rebind(&mut self) -> Result<(), Error> {
+ let port = self.shared_state.borrow().interface_info.listen_port.unwrap_or(0);
+ let socket = Socket::new(Domain::ipv6(), Type::dgram(), Some(Protocol::udp()))?;
+ if self.port.is_some() && self.port.unwrap() == port {
+ debug!("skipping rebind, since we're already listening on the correct port.");
+ return Ok(())
+ }
socket.set_only_v6(false)?;
socket.set_nonblocking(true)?;
socket.bind(&SocketAddr::from((Ipv6Addr::unspecified(), port)).into())?;
- let socket = UdpSocket::from_socket(socket.into_udp_socket(), &handle.clone())?;
- let (udp_sink, udp_stream) = socket.framed(VecUdpCodec{}).split();
- let (udp_tx, udp_rx) = mpsc::channel::<(SocketAddr, Vec<u8>)>(1024);
- let (outgoing_tx, outgoing_rx) = mpsc::channel::<UtunPacket>(1024);
- let cookie = cookie::Validator::default();
- let udp_write_passthrough = udp_sink.sink_map_err(|_| ()).send_all(
- udp_rx.map(|(addr, packet)| {
+ trace!("listening on {}", port);
+
+ let socket = UdpSocket::from_socket(socket.into_udp_socket(), &self.handle)?;
+ let (udp_sink, udp_stream) = socket.framed(VecUdpCodec{}).split();
+ let (egress_tx, egress_rx) = mpsc::channel(1024);
+ let udp_writethrough = udp_sink.sink_map_err(|_| ()).send_all(
+ egress_rx.and_then(|(addr, packet)| {
trace!("sending UDP packet to {:?}", &addr);
- (addr, packet)
- }).map_err(|_| ()))
+ future::ok((addr, packet))
+ }).map_err(|_| { info!("udp sink error"); () }))
.then(|_| Ok(()));
- handle.spawn(udp_write_passthrough);
- Ok(PeerServer {
- handle, shared_state, timer, udp_stream, udp_tx, tunnel_tx, outgoing_tx, outgoing_rx, cookie
- })
+ self.handle.spawn(udp_writethrough);
+
+ self.port = Some(port);
+ self.ingress = Some(udp_stream);
+ self.egress_tx = Some(egress_tx);
+ Ok(())
}
pub fn tx(&self) -> mpsc::Sender<UtunPacket> {
- self.outgoing_tx.clone()
+ self.outgoing.tx.clone()
}
- fn send_to_peer(&self, payload: PeerServerMessage) {
- self.handle.spawn(self.udp_tx.clone().send(payload).then(|_| Ok(())));
+ pub fn config_tx(&self) -> mpsc::Sender<config::UpdateEvent> {
+ self.config.tx.clone()
+ }
+
+ fn send_to_peer(&self, payload: PeerServerMessage) -> Result<(), Error> {
+ let tx = self.egress_tx.as_ref().ok_or_else(|| err_msg("no egress tx"))?.clone();
+ self.handle.spawn(tx.send(payload).then(|_| Ok(())));
+ Ok(())
}
fn send_to_tunnel(&self, packet: Vec<u8>) {
@@ -115,10 +154,8 @@ impl PeerServer {
ensure!(packet.len() == 148, "handshake init packet length is incorrect");
let mut state = self.shared_state.borrow_mut();
{
- let pubkey = state.interface_info.pub_key.as_ref()
- .ok_or_else(|| err_msg("must have local interface key"))?;
let (mac_in, mac_out) = packet.split_at(116);
- self.cookie.verify_mac1(pubkey, mac_in, &mac_out[..16])?;
+ self.cookie.verify_mac1(mac_in, &mac_out[..16])?;
}
debug!("got handshake initiation request (0x01)");
@@ -134,7 +171,7 @@ impl PeerServer {
let (response, next_index) = peer.complete_incoming_handshake(addr, handshake)?;
let _ = state.index_map.insert(next_index, peer_ref.clone());
- self.send_to_peer((addr, response));
+ self.send_to_peer((addr, response))?;
info!("sent handshake response, ratcheted session (index {}).", next_index);
Ok(())
@@ -145,10 +182,8 @@ impl PeerServer {
ensure!(packet.len() == 92, "handshake resp packet length is incorrect");
let mut state = self.shared_state.borrow_mut();
{
- let pubkey = state.interface_info.pub_key.as_ref()
- .ok_or_else(|| err_msg("must have local interface key"))?;
let (mac_in, mac_out) = packet.split_at(60);
- self.cookie.verify_mac1(pubkey, mac_in, &mac_out[..16])?;
+ self.cookie.verify_mac1(mac_in, &mac_out[..16])?;
}
debug!("got handshake response (0x02)");
@@ -166,10 +201,10 @@ impl PeerServer {
if !peer.outgoing_queue.is_empty() {
debug!("sending {} queued egress packets", peer.outgoing_queue.len());
while let Some(packet) = peer.outgoing_queue.pop_front() {
- self.send_to_peer(peer.handle_outgoing_transport(packet.payload())?);
+ self.send_to_peer(peer.handle_outgoing_transport(packet.payload())?)?;
}
} else {
- self.send_to_peer(peer.handle_outgoing_transport(&[])?);
+ self.send_to_peer(peer.handle_outgoing_transport(&[])?)?;
}
} else {
error!("peer not ready for transport after processing handshake response. this shouldn't happen.");
@@ -219,7 +254,7 @@ impl PeerServer {
for packet in outgoing {
match peer.handle_outgoing_transport(packet.payload()) {
- Ok(message) => self.send_to_peer(message),
+ Ok(message) => self.send_to_peer(message)?,
Err(e) => warn!("failed to encrypt packet: {}", e)
}
}
@@ -251,7 +286,7 @@ impl PeerServer {
let _ = state.index_map.remove(&index);
}
- self.send_to_peer((endpoint, init_packet));
+ self.send_to_peer((endpoint, init_packet))?;
peer.last_sent_init = Some(Instant::now());
peer.last_tun_queue = peer.last_tun_queue.or_else(|| Some(Instant::now()));
let when = *REKEY_TIMEOUT + *TIMER_TICK_DURATION * 2;
@@ -306,7 +341,6 @@ impl PeerServer {
let new_index = self.send_handshake_init(&peer_ref)?;
debug!("sent handshake init (Rekey timer) ({} -> {})", our_index, new_index);
-
},
TimerMessage::Reject(peer_ref, our_index) => {
let mut peer = peer_ref.borrow_mut();
@@ -335,11 +369,13 @@ impl PeerServer {
*KEEPALIVE_TIMEOUT - last_sent_packet + *TIMER_TICK_DURATION,
TimerMessage::PassiveKeepAlive(peer_ref.clone(), our_index));
bail!("passive keepalive tick (waiting {:?})", *KEEPALIVE_TIMEOUT - last_sent_packet);
+ } else { // if we're going to send a keepalive reset last_sent
+ session.last_sent = None;
}
}
}
- self.send_to_peer(peer.handle_outgoing_transport(&[])?);
+ self.send_to_peer(peer.handle_outgoing_transport(&[])?)?;
debug!("sent passive keepalive packet ({})", our_index);
self.timer.spawn_delayed(&self.handle,
@@ -353,7 +389,7 @@ impl PeerServer {
ensure!(session_type == SessionType::Current, "expired session for persistent keepalive timer");
}
- self.send_to_peer(peer.handle_outgoing_transport(&[])?);
+ self.send_to_peer(peer.handle_outgoing_transport(&[])?)?;
debug!("sent persistent keepalive packet ({})", our_index);
if let Some(persistent_keepalive) = peer.info.keep_alive_interval {
@@ -384,7 +420,7 @@ impl PeerServer {
}
while let Some(packet) = peer.outgoing_queue.pop_front() {
- self.send_to_peer(peer.handle_outgoing_transport(packet.payload())?);
+ self.send_to_peer(peer.handle_outgoing_transport(packet.payload())?)?;
}
}
peer.needs_new_handshake()
@@ -403,6 +439,28 @@ impl Future for PeerServer {
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ // Handle config events
+ loop {
+ use self::config::UpdateEvent::*;
+ match self.config.rx.poll() {
+ Ok(Async::Ready(Some(event))) => {
+ match event {
+ PrivateKey(_) => {
+ let pub_key = &self.shared_state.borrow().interface_info.pub_key.unwrap();
+ self.cookie = cookie::Validator::new(pub_key);
+ if self.egress_tx.is_none() {
+ self.rebind().unwrap();
+ }
+ },
+ ListenPort(_) => self.rebind().unwrap(),
+ _ => {}
+ }
+ },
+ Ok(Async::NotReady) => break,
+ Ok(Async::Ready(None)) | Err(_) => return Err(()),
+ }
+ }
+
// Handle pending state-changing timers
loop {
match self.timer.poll() {
@@ -415,19 +473,21 @@ impl Future for PeerServer {
}
// Handle UDP packets from the outside world
- loop {
- match self.udp_stream.poll() {
- Ok(Async::Ready(Some((addr, packet)))) => {
- let _ = self.handle_ingress_packet(addr, &packet).map_err(|e| warn!("UDP ERR: {:?}", e));
- },
- Ok(Async::NotReady) => break,
- Ok(Async::Ready(None)) | Err(_) => return Err(()),
+ if self.ingress.is_some() {
+ loop {
+ match self.ingress.as_mut().unwrap().poll() {
+ Ok(Async::Ready(Some((addr, packet)))) => {
+ let _ = self.handle_ingress_packet(addr, &packet).map_err(|e| warn!("UDP ERR: {:?}", e));
+ },
+ Ok(Async::NotReady) => break,
+ Ok(Async::Ready(None)) | Err(_) => return Err(()),
+ }
}
}
// Handle packets coming from the local tunnel
loop {
- match self.outgoing_rx.poll() {
+ match self.outgoing.rx.poll() {
Ok(Async::Ready(Some(packet))) => {
let _ = self.handle_egress_packet(packet).map_err(|e| warn!("UDP ERR: {:?}", e));
},