diff options
author | Jake McGinty <me@jake.su> | 2018-02-23 02:10:09 +0000 |
---|---|---|
committer | Jake McGinty <me@jake.su> | 2018-02-23 02:10:09 +0000 |
commit | 130d6d4ca31b62b31f4a6334b32cf011d3221d4a (patch) | |
tree | e687fa1c42effdd0b684a77acb3d3a9e0a4f2b8d /src/interface | |
parent | precompute cookie MAC keys (diff) | |
download | wireguard-rs-130d6d4ca31b62b31f4a6334b32cf011d3221d4a.tar.xz wireguard-rs-130d6d4ca31b62b31f4a6334b32cf011d3221d4a.zip |
support listen-port changes
Diffstat (limited to 'src/interface')
-rw-r--r-- | src/interface/mod.rs | 14 | ||||
-rw-r--r-- | src/interface/peer_server.rs | 162 |
2 files changed, 120 insertions, 56 deletions
diff --git a/src/interface/mod.rs b/src/interface/mod.rs index 22720e5..9316e80 100644 --- a/src/interface/mod.rs +++ b/src/interface/mod.rs @@ -101,7 +101,7 @@ impl Interface { let (utun_tx, utun_rx) = unsync::mpsc::channel::<Vec<u8>>(1024); - let peer_server = PeerServer::bind(core.handle(), self.state.clone(), utun_tx.clone()).unwrap(); + let peer_server = PeerServer::new(core.handle(), self.state.clone(), utun_tx.clone()).unwrap(); let utun_stream = UtunStream::connect(&self.name, &core.handle()).unwrap().framed(VecUtunCodec{}); let (utun_writer, utun_reader) = utun_stream.split(); @@ -145,6 +145,9 @@ impl Interface { if let Some(private_key) = info.private_key { s.push_str(&format!("private_key={}\n", hex::encode(private_key))); } + if let Some(port) = info.listen_port { + s.push_str(&format!("listen_port={}\n", port)); + } for (_, peer) in peers.iter() { s.push_str(&peer.borrow().to_config_string()); @@ -163,7 +166,7 @@ impl Interface { } }).map_err(|_| ()); - let config_fut = config_rx.for_each({ + let config_fut = config_rx.and_then({ let state = self.state.clone(); move |event| { let mut state = state.borrow_mut(); @@ -179,7 +182,7 @@ impl Interface { state.interface_info.listen_port = Some(port); info!("set listen port: {}", port); }, - UpdateEvent::UpdatePeer(info) => { + UpdateEvent::UpdatePeer(ref info) => { info!("added new peer: {}", info); let mut peer = Peer::new(info.clone()); @@ -203,11 +206,12 @@ impl Interface { }, _ => warn!("unhandled UpdateEvent received") } - - future::ok(()) + future::ok(event) } }).map_err(|e| { warn!("error {:?}", e); () }); + let config_fut = peer_server.config_tx().sink_map_err(|_|()).send_all(config_fut).map_err(|e| { warn!("error {:?}", e); () }); + core.run(peer_server.join(utun_fut.join(config_fut.join(config_server)))).unwrap(); } } diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs index 11f47b0..000da27 100644 --- a/src/interface/peer_server.rs +++ b/src/interface/peer_server.rs @@ -1,7 +1,7 @@ -use super::{SharedState, UtunPacket}; -use consts::{REKEY_TIMEOUT, REKEY_AFTER_TIME, REJECT_AFTER_TIME, REKEY_ATTEMPT_TIME, KEEPALIVE_TIMEOUT, MAX_CONTENT_SIZE, TIMER_TICK_DURATION}; +use consts::{REKEY_TIMEOUT, REKEY_AFTER_TIME, REJECT_AFTER_TIME, REKEY_ATTEMPT_TIME, + KEEPALIVE_TIMEOUT, MAX_CONTENT_SIZE, TIMER_TICK_DURATION}; use cookie; -use interface::SharedPeer; +use interface::{SharedPeer, SharedState, UtunPacket, config}; use peer::{Peer, SessionType}; use timer::{Timer, TimerMessage}; @@ -11,7 +11,7 @@ use std::time::{Duration, Instant}; use byteorder::{ByteOrder, LittleEndian}; use failure::{Error, err_msg}; -use futures::{Async, Future, Stream, Sink, Poll, unsync::mpsc, stream}; +use futures::{Async, Future, Stream, Sink, Poll, unsync::mpsc, stream, future}; use socket2::{Socket, Domain, Type, Protocol}; use tokio_core::net::{UdpSocket, UdpCodec, UdpFramed}; use tokio_core::reactor::Handle; @@ -49,51 +49,90 @@ impl UdpCodec for VecUdpCodec { } } +struct Channel<T> { + tx: mpsc::Sender<T>, + rx: mpsc::Receiver<T>, +} + +impl<T> From<(mpsc::Sender<T>, mpsc::Receiver<T>)> for Channel<T> { + fn from(pair: (mpsc::Sender<T>, mpsc::Receiver<T>)) -> Self { + Self { + tx: pair.0, + rx: pair.1, + } + } +} + pub struct PeerServer { handle : Handle, shared_state : SharedState, - udp_stream : stream::SplitStream<UdpFramed<VecUdpCodec>>, + ingress : Option<stream::SplitStream<UdpFramed<VecUdpCodec>>>, + egress_tx : Option<mpsc::Sender<PeerServerMessage>>, + port : Option<u16>, + outgoing : Channel<UtunPacket>, + config : Channel<config::UpdateEvent>, timer : Timer, - outgoing_tx : mpsc::Sender<UtunPacket>, - outgoing_rx : mpsc::Receiver<UtunPacket>, - udp_tx : mpsc::Sender<(SocketAddr, Vec<u8>)>, tunnel_tx : mpsc::Sender<Vec<u8>>, cookie : cookie::Validator, } impl PeerServer { - pub fn bind(handle: Handle, shared_state: SharedState, tunnel_tx: mpsc::Sender<Vec<u8>>) -> Result<Self, Error> { - let timer = Timer::default(); - let port = shared_state.borrow().interface_info.listen_port.unwrap_or(0); - let socket = Socket::new(Domain::ipv6(), Type::dgram(), Some(Protocol::udp()))?; + pub fn new(handle: Handle, shared_state: SharedState, tunnel_tx: mpsc::Sender<Vec<u8>>) -> Result<Self, Error> { + Ok(PeerServer { + handle, shared_state, tunnel_tx, + timer : Timer::default(), + ingress : None, + egress_tx : None, + port : None, + outgoing : mpsc::channel(1024).into(), + config : mpsc::channel(1024).into(), + cookie : cookie::Validator::new(&[0u8; 32]) + }) + } + + pub fn rebind(&mut self) -> Result<(), Error> { + let port = self.shared_state.borrow().interface_info.listen_port.unwrap_or(0); + let socket = Socket::new(Domain::ipv6(), Type::dgram(), Some(Protocol::udp()))?; + if self.port.is_some() && self.port.unwrap() == port { + debug!("skipping rebind, since we're already listening on the correct port."); + return Ok(()) + } socket.set_only_v6(false)?; socket.set_nonblocking(true)?; socket.bind(&SocketAddr::from((Ipv6Addr::unspecified(), port)).into())?; - let socket = UdpSocket::from_socket(socket.into_udp_socket(), &handle.clone())?; - let (udp_sink, udp_stream) = socket.framed(VecUdpCodec{}).split(); - let (udp_tx, udp_rx) = mpsc::channel::<(SocketAddr, Vec<u8>)>(1024); - let (outgoing_tx, outgoing_rx) = mpsc::channel::<UtunPacket>(1024); - let cookie = cookie::Validator::default(); - let udp_write_passthrough = udp_sink.sink_map_err(|_| ()).send_all( - udp_rx.map(|(addr, packet)| { + trace!("listening on {}", port); + + let socket = UdpSocket::from_socket(socket.into_udp_socket(), &self.handle)?; + let (udp_sink, udp_stream) = socket.framed(VecUdpCodec{}).split(); + let (egress_tx, egress_rx) = mpsc::channel(1024); + let udp_writethrough = udp_sink.sink_map_err(|_| ()).send_all( + egress_rx.and_then(|(addr, packet)| { trace!("sending UDP packet to {:?}", &addr); - (addr, packet) - }).map_err(|_| ())) + future::ok((addr, packet)) + }).map_err(|_| { info!("udp sink error"); () })) .then(|_| Ok(())); - handle.spawn(udp_write_passthrough); - Ok(PeerServer { - handle, shared_state, timer, udp_stream, udp_tx, tunnel_tx, outgoing_tx, outgoing_rx, cookie - }) + self.handle.spawn(udp_writethrough); + + self.port = Some(port); + self.ingress = Some(udp_stream); + self.egress_tx = Some(egress_tx); + Ok(()) } pub fn tx(&self) -> mpsc::Sender<UtunPacket> { - self.outgoing_tx.clone() + self.outgoing.tx.clone() } - fn send_to_peer(&self, payload: PeerServerMessage) { - self.handle.spawn(self.udp_tx.clone().send(payload).then(|_| Ok(()))); + pub fn config_tx(&self) -> mpsc::Sender<config::UpdateEvent> { + self.config.tx.clone() + } + + fn send_to_peer(&self, payload: PeerServerMessage) -> Result<(), Error> { + let tx = self.egress_tx.as_ref().ok_or_else(|| err_msg("no egress tx"))?.clone(); + self.handle.spawn(tx.send(payload).then(|_| Ok(()))); + Ok(()) } fn send_to_tunnel(&self, packet: Vec<u8>) { @@ -115,10 +154,8 @@ impl PeerServer { ensure!(packet.len() == 148, "handshake init packet length is incorrect"); let mut state = self.shared_state.borrow_mut(); { - let pubkey = state.interface_info.pub_key.as_ref() - .ok_or_else(|| err_msg("must have local interface key"))?; let (mac_in, mac_out) = packet.split_at(116); - self.cookie.verify_mac1(pubkey, mac_in, &mac_out[..16])?; + self.cookie.verify_mac1(mac_in, &mac_out[..16])?; } debug!("got handshake initiation request (0x01)"); @@ -134,7 +171,7 @@ impl PeerServer { let (response, next_index) = peer.complete_incoming_handshake(addr, handshake)?; let _ = state.index_map.insert(next_index, peer_ref.clone()); - self.send_to_peer((addr, response)); + self.send_to_peer((addr, response))?; info!("sent handshake response, ratcheted session (index {}).", next_index); Ok(()) @@ -145,10 +182,8 @@ impl PeerServer { ensure!(packet.len() == 92, "handshake resp packet length is incorrect"); let mut state = self.shared_state.borrow_mut(); { - let pubkey = state.interface_info.pub_key.as_ref() - .ok_or_else(|| err_msg("must have local interface key"))?; let (mac_in, mac_out) = packet.split_at(60); - self.cookie.verify_mac1(pubkey, mac_in, &mac_out[..16])?; + self.cookie.verify_mac1(mac_in, &mac_out[..16])?; } debug!("got handshake response (0x02)"); @@ -166,10 +201,10 @@ impl PeerServer { if !peer.outgoing_queue.is_empty() { debug!("sending {} queued egress packets", peer.outgoing_queue.len()); while let Some(packet) = peer.outgoing_queue.pop_front() { - self.send_to_peer(peer.handle_outgoing_transport(packet.payload())?); + self.send_to_peer(peer.handle_outgoing_transport(packet.payload())?)?; } } else { - self.send_to_peer(peer.handle_outgoing_transport(&[])?); + self.send_to_peer(peer.handle_outgoing_transport(&[])?)?; } } else { error!("peer not ready for transport after processing handshake response. this shouldn't happen."); @@ -219,7 +254,7 @@ impl PeerServer { for packet in outgoing { match peer.handle_outgoing_transport(packet.payload()) { - Ok(message) => self.send_to_peer(message), + Ok(message) => self.send_to_peer(message)?, Err(e) => warn!("failed to encrypt packet: {}", e) } } @@ -251,7 +286,7 @@ impl PeerServer { let _ = state.index_map.remove(&index); } - self.send_to_peer((endpoint, init_packet)); + self.send_to_peer((endpoint, init_packet))?; peer.last_sent_init = Some(Instant::now()); peer.last_tun_queue = peer.last_tun_queue.or_else(|| Some(Instant::now())); let when = *REKEY_TIMEOUT + *TIMER_TICK_DURATION * 2; @@ -306,7 +341,6 @@ impl PeerServer { let new_index = self.send_handshake_init(&peer_ref)?; debug!("sent handshake init (Rekey timer) ({} -> {})", our_index, new_index); - }, TimerMessage::Reject(peer_ref, our_index) => { let mut peer = peer_ref.borrow_mut(); @@ -335,11 +369,13 @@ impl PeerServer { *KEEPALIVE_TIMEOUT - last_sent_packet + *TIMER_TICK_DURATION, TimerMessage::PassiveKeepAlive(peer_ref.clone(), our_index)); bail!("passive keepalive tick (waiting {:?})", *KEEPALIVE_TIMEOUT - last_sent_packet); + } else { // if we're going to send a keepalive reset last_sent + session.last_sent = None; } } } - self.send_to_peer(peer.handle_outgoing_transport(&[])?); + self.send_to_peer(peer.handle_outgoing_transport(&[])?)?; debug!("sent passive keepalive packet ({})", our_index); self.timer.spawn_delayed(&self.handle, @@ -353,7 +389,7 @@ impl PeerServer { ensure!(session_type == SessionType::Current, "expired session for persistent keepalive timer"); } - self.send_to_peer(peer.handle_outgoing_transport(&[])?); + self.send_to_peer(peer.handle_outgoing_transport(&[])?)?; debug!("sent persistent keepalive packet ({})", our_index); if let Some(persistent_keepalive) = peer.info.keep_alive_interval { @@ -384,7 +420,7 @@ impl PeerServer { } while let Some(packet) = peer.outgoing_queue.pop_front() { - self.send_to_peer(peer.handle_outgoing_transport(packet.payload())?); + self.send_to_peer(peer.handle_outgoing_transport(packet.payload())?)?; } } peer.needs_new_handshake() @@ -403,6 +439,28 @@ impl Future for PeerServer { type Error = (); fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + // Handle config events + loop { + use self::config::UpdateEvent::*; + match self.config.rx.poll() { + Ok(Async::Ready(Some(event))) => { + match event { + PrivateKey(_) => { + let pub_key = &self.shared_state.borrow().interface_info.pub_key.unwrap(); + self.cookie = cookie::Validator::new(pub_key); + if self.egress_tx.is_none() { + self.rebind().unwrap(); + } + }, + ListenPort(_) => self.rebind().unwrap(), + _ => {} + } + }, + Ok(Async::NotReady) => break, + Ok(Async::Ready(None)) | Err(_) => return Err(()), + } + } + // Handle pending state-changing timers loop { match self.timer.poll() { @@ -415,19 +473,21 @@ impl Future for PeerServer { } // Handle UDP packets from the outside world - loop { - match self.udp_stream.poll() { - Ok(Async::Ready(Some((addr, packet)))) => { - let _ = self.handle_ingress_packet(addr, &packet).map_err(|e| warn!("UDP ERR: {:?}", e)); - }, - Ok(Async::NotReady) => break, - Ok(Async::Ready(None)) | Err(_) => return Err(()), + if self.ingress.is_some() { + loop { + match self.ingress.as_mut().unwrap().poll() { + Ok(Async::Ready(Some((addr, packet)))) => { + let _ = self.handle_ingress_packet(addr, &packet).map_err(|e| warn!("UDP ERR: {:?}", e)); + }, + Ok(Async::NotReady) => break, + Ok(Async::Ready(None)) | Err(_) => return Err(()), + } } } // Handle packets coming from the local tunnel loop { - match self.outgoing_rx.poll() { + match self.outgoing.rx.poll() { Ok(Async::Ready(Some(packet))) => { let _ = self.handle_egress_packet(packet).map_err(|e| warn!("UDP ERR: {:?}", e)); }, |