diff options
author | Jake McGinty <me@jake.su> | 2017-12-30 14:43:23 -0800 |
---|---|---|
committer | Jake McGinty <me@jake.su> | 2017-12-30 14:43:23 -0800 |
commit | 4f30a52e5fcb52679afa2c70dcf9cccd5a12afe3 (patch) | |
tree | 5b333eab87ecf2842a2c56b885b7ed3245d801a3 /src | |
parent | fallback to previous noise hack (diff) | |
download | wireguard-rs-4f30a52e5fcb52679afa2c70dcf9cccd5a12afe3.tar.xz wireguard-rs-4f30a52e5fcb52679afa2c70dcf9cccd5a12afe3.zip |
rudimentary timers!
Diffstat (limited to 'src')
-rw-r--r-- | src/consts.rs | 2 | ||||
-rw-r--r-- | src/interface/peer_server.rs | 120 | ||||
-rw-r--r-- | src/main.rs | 1 |
3 files changed, 90 insertions, 33 deletions
diff --git a/src/consts.rs b/src/consts.rs new file mode 100644 index 0000000..ba9d0d0 --- /dev/null +++ b/src/consts.rs @@ -0,0 +1,2 @@ +pub const REKEY_AFTER_TIME: u64 = 120; +pub const KEEPALIVE_TIMEOUT: u64 = 10; diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs index e1af433..b698f3c 100644 --- a/src/interface/peer_server.rs +++ b/src/interface/peer_server.rs @@ -1,4 +1,5 @@ use super::{SharedState, SharedPeer, debug_packet}; +use consts::{REKEY_AFTER_TIME, KEEPALIVE_TIMEOUT}; use std::io; use std::net::SocketAddr; @@ -30,23 +31,33 @@ impl UdpCodec for VecUdpCodec { } } +#[derive(Debug)] +pub enum TimerMessage { + KeepAlive(SharedPeer), + Rekey(SharedPeer), +} pub struct PeerServer { handle: Handle, shared_state: SharedState, + timer: Timer, udp_stream: stream::SplitStream<UdpFramed<VecUdpCodec>>, - rx: unsync::mpsc::Receiver<Vec<u8>>, + outgoing_tx: unsync::mpsc::Sender<Vec<u8>>, + outgoing_rx: unsync::mpsc::Receiver<Vec<u8>>, + timer_tx: unsync::mpsc::Sender<TimerMessage>, + timer_rx: unsync::mpsc::Receiver<TimerMessage>, udp_tx: unsync::mpsc::Sender<(SocketAddr, Vec<u8>)>, tunnel_tx: unsync::mpsc::Sender<Vec<u8>>, - pub tx: unsync::mpsc::Sender<Vec<u8>>, } impl PeerServer { pub fn bind(handle: Handle, shared_state: SharedState, tunnel_tx: unsync::mpsc::Sender<Vec<u8>>) -> Self { let socket = UdpSocket::bind(&([0,0,0,0], 0).into(), &handle.clone()).unwrap(); let (udp_sink, udp_stream) = socket.framed(VecUdpCodec{}).split(); + let (timer_tx, timer_rx) = unsync::mpsc::channel::<TimerMessage>(1024); let (udp_tx, udp_rx) = unsync::mpsc::channel::<(SocketAddr, Vec<u8>)>(1024); - let (tx, rx) = unsync::mpsc::channel::<Vec<u8>>(1024); + let (outgoing_tx, outgoing_rx) = unsync::mpsc::channel::<Vec<u8>>(1024); + let timer = Timer::default(); let udp_write_passthrough = udp_sink.sink_map_err(|_| ()).send_all( udp_rx.map(|(addr, packet)| { @@ -57,12 +68,12 @@ impl PeerServer { handle.spawn(udp_write_passthrough); PeerServer { - handle, shared_state, udp_stream, udp_tx, tunnel_tx, tx, rx + handle, shared_state, timer, udp_stream, udp_tx, tunnel_tx, timer_tx, timer_rx, outgoing_tx, outgoing_rx } } pub fn tx(&self) -> unsync::mpsc::Sender<Vec<u8>> { - self.tx.clone() + self.outgoing_tx.clone() } pub fn udp_tx(&self) -> unsync::mpsc::Sender<(SocketAddr, Vec<u8>)> { @@ -71,7 +82,7 @@ impl PeerServer { fn handle_incoming_packet(&mut self, addr: SocketAddr, packet: Vec<u8>) { debug!("got a UDP packet of length {}, packet type {}", packet.len(), packet[0]); - let mut state = self.shared_state.borrow_mut(); + let state = self.shared_state.borrow_mut(); match packet[0] { 1 => { info!("got handshake initialization."); @@ -88,34 +99,28 @@ impl PeerServer { peer.ratchet_session().unwrap(); info!("got handshake response, ratcheted session."); - let noise = NoiseBuilder::new("Noise_IKpsk2_25519_ChaChaPoly_BLAKE2s".parse().unwrap()) - .local_private_key(&state.interface_info.private_key.expect("no private key!")) - .remote_public_key(&peer.info.pub_key) - .prologue("WireGuard v1 zx2c4 Jason@zx2c4.com".as_bytes()) - .psk(2, &peer.info.psk.expect("no psk!")) - .build_initiator().unwrap(); - peer.set_next_session(noise.into()); - - let _ = state.index_map.insert(peer.our_next_index().unwrap(), peer_ref.clone()); - - let init_packet = peer.get_handshake_packet(); - let endpoint = peer.info.endpoint.unwrap().clone(); + // TODO neither of these timers are to spec, but are simple functional placeholders + let rekey_timer = self.timer.sleep(Duration::from_secs(REKEY_AFTER_TIME)); + let rekey_future = rekey_timer.map_err(|_|()).and_then({ + let timer_tx = self.timer_tx.clone(); + let peer_ref = peer_ref.clone(); + move |_| { + timer_tx.clone().send(TimerMessage::Rekey(peer_ref)) + .then(|_| Ok(())) + } + }).then(|_| Ok(())); + self.handle.spawn(rekey_future); - let timer = Timer::default(); - let sleep = timer.sleep(Duration::from_secs(120)); - let boop = sleep.and_then({ - let handle = self.handle.clone(); - let tx = self.udp_tx.clone(); + let keepalive_interval = self.timer.interval(Duration::from_secs(KEEPALIVE_TIMEOUT)); + let keepalive_future = keepalive_interval.map_err(|_|()).for_each({ + let timer_tx = self.timer_tx.clone(); let peer_ref = peer_ref.clone(); move |_| { - info!("sending rekey!"); - handle.spawn(tx.clone().send((endpoint, init_packet)) - .map(|_| ()) - .map_err(|_| ())); - Ok(()) + timer_tx.clone().send(TimerMessage::KeepAlive(peer_ref.clone())) + .then(|_| Ok(())) } - }).map_err(|_|()); - self.handle.spawn(boop); + }); + self.handle.spawn(keepalive_future); }, 4 => { let our_index_received = LittleEndian::read_u32(&packet[4..]); @@ -127,6 +132,8 @@ impl PeerServer { let mut peer = peer.borrow_mut(); peer.rx_bytes += packet.len(); + + // TODO: map index not just to peer, but to specific session instead of guessing let res = { let noise = peer.current_noise().expect("current noise session"); noise.set_receiving_nonce(nonce).unwrap(); @@ -143,14 +150,52 @@ impl PeerServer { debug_packet("received TRANSPORT: ", &raw_packet[..payload_len]); self.handle.spawn(self.tunnel_tx.clone().send(raw_packet[..payload_len].to_owned()) - .map(|_| ()) - .map_err(|_| ())); + .then(|_| Ok(()))); } }, _ => unimplemented!() } } + fn handle_timer(&mut self, message: TimerMessage) { + let mut state = self.shared_state.borrow_mut(); + match message { + TimerMessage::Rekey(peer_ref) => { + let mut peer = peer_ref.borrow_mut(); + let noise = NoiseBuilder::new("Noise_IKpsk2_25519_ChaChaPoly_BLAKE2s".parse().unwrap()) + .local_private_key(&state.interface_info.private_key.expect("no private key!")) + .remote_public_key(&peer.info.pub_key) + .prologue("WireGuard v1 zx2c4 Jason@zx2c4.com".as_bytes()) + .psk(2, &peer.info.psk.expect("no psk!")) + .build_initiator().unwrap(); + peer.set_next_session(noise.into()); + + let _ = state.index_map.insert(peer.our_next_index().unwrap(), peer_ref.clone()); + + let init_packet = peer.get_handshake_packet(); + let endpoint = peer.info.endpoint.unwrap().clone(); + + self.handle.spawn(self.udp_tx.clone().send((endpoint, init_packet)).then(|_| Ok(()))); + info!("sent rekey"); + }, + TimerMessage::KeepAlive(peer_ref) => { + let mut peer = peer_ref.borrow_mut(); + let mut packet = vec![0u8; 1500]; + packet[0] = 4; + let their_index = peer.their_current_index().expect("no current index for them"); + let endpoint = peer.info.endpoint.unwrap(); + peer.tx_bytes += packet.len(); + let noise = peer.current_noise().expect("current noise session"); + LittleEndian::write_u32(&mut packet[4..], their_index); + LittleEndian::write_u64(&mut packet[8..], noise.sending_nonce().unwrap()); + let len = noise.write_message(&[], &mut packet[16..]).expect("failed to encrypt outgoing keepalive"); + packet.truncate(len + 16); + self.handle.spawn(self.udp_tx.clone().send((endpoint, packet)).then(|_| Ok(()))); + info!("sent keepalive"); + } + } + } + fn handle_outgoing_packet(&mut self, packet: Vec<u8>) { debug!("handle_outgoing_packet()"); @@ -162,6 +207,15 @@ impl Future for PeerServer { type Error = (); fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + // Handle pending state-changing timers + loop { + match self.timer_rx.poll() { + Ok(Async::Ready(Some(message))) => self.handle_timer(message), + Ok(Async::NotReady) => break, + Ok(Async::Ready(None)) | Err(_) => return Err(()), + } + } + // Handle UDP packets from the outside world loop { match self.udp_stream.poll() { @@ -173,7 +227,7 @@ impl Future for PeerServer { // Handle packets coming from the local tunnel loop { - match self.rx.poll() { + match self.outgoing_rx.poll() { Ok(Async::Ready(Some(packet))) => self.handle_outgoing_packet(packet), Ok(Async::NotReady) => break, Ok(Async::Ready(None)) | Err(_) => return Err(()), diff --git a/src/main.rs b/src/main.rs index 240e6e6..7f159cc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -26,6 +26,7 @@ extern crate byteorder; extern crate crypto; extern crate pnet; +mod consts; mod error; mod interface; mod protocol; |