diff options
author | Jake McGinty <me@jake.su> | 2018-02-14 13:36:41 +0000 |
---|---|---|
committer | Jake McGinty <me@jake.su> | 2018-02-14 13:36:41 +0000 |
commit | 59687b82503f30ccfc169a83dc699177b0baba85 (patch) | |
tree | 12a4886c59e7b00c4df857655610bd3b8230e108 /src/interface/peer_server.rs | |
parent | use constant time comparison for mac (diff) | |
download | wireguard-rs-59687b82503f30ccfc169a83dc699177b0baba85.tar.xz wireguard-rs-59687b82503f30ccfc169a83dc699177b0baba85.zip |
timer module
Diffstat (limited to 'src/interface/peer_server.rs')
-rw-r--r-- | src/interface/peer_server.rs | 103 |
1 files changed, 43 insertions, 60 deletions
diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs index c53ecc4..9e3ab63 100644 --- a/src/interface/peer_server.rs +++ b/src/interface/peer_server.rs @@ -1,11 +1,12 @@ use super::{SharedState, SharedPeer, UtunPacket, trace_packet}; -use consts::{REKEY_AFTER_TIME, KEEPALIVE_TIMEOUT, MAX_CONTENT_SIZE, TRANSPORT_HEADER_SIZE, TRANSPORT_OVERHEAD}; -use protocol::Session; +use consts::{REKEY_TIMEOUT, REKEY_AFTER_TIME, KEEPALIVE_TIMEOUT, MAX_CONTENT_SIZE, TRANSPORT_HEADER_SIZE, TRANSPORT_OVERHEAD}; +use protocol::{Session, SessionType}; use noise::Noise; +use timer::{Timer, TimerMessage}; use std::io; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; -use std::time::Duration; +use std::time::{Duration, SystemTime}; use base64; use byteorder::{ByteOrder, BigEndian, LittleEndian}; @@ -19,7 +20,6 @@ use snow; use tokio_core::net::{UdpSocket, UdpCodec, UdpFramed}; use tokio_core::reactor::Handle; use tokio_io::codec::Framed; -use tokio_timer::{Interval, Timer}; use treebitmap::{IpLookupTable, IpLookupTableOps}; @@ -55,21 +55,13 @@ impl UdpCodec for VecUdpCodec { } } -#[derive(Debug)] -pub enum TimerMessage { - KeepAlive(SharedPeer, u32), - Rekey(SharedPeer, u32), -} - pub struct PeerServer { handle: Handle, shared_state: SharedState, - timer: Timer, udp_stream: stream::SplitStream<UdpFramed<VecUdpCodec>>, + timer: Timer, outgoing_tx: unsync::mpsc::Sender<UtunPacket>, outgoing_rx: futures::stream::Peekable<unsync::mpsc::Receiver<UtunPacket>>, - timer_tx: unsync::mpsc::Sender<TimerMessage>, - timer_rx: unsync::mpsc::Receiver<TimerMessage>, udp_tx: unsync::mpsc::Sender<(SocketAddr, Vec<u8>)>, tunnel_tx: unsync::mpsc::Sender<UtunPacket>, } @@ -80,13 +72,12 @@ impl PeerServer { socket.set_only_v6(false)?; socket.set_nonblocking(true)?; socket.bind(&SocketAddr::from((Ipv6Addr::unspecified(), 0)).into())?; + let timer = Timer::new(); let socket = UdpSocket::from_socket(socket.into_udp_socket(), &handle.clone())?; let (udp_sink, udp_stream) = socket.framed(VecUdpCodec{}).split(); - let (timer_tx, timer_rx) = unsync::mpsc::channel::<TimerMessage>(1024); let (udp_tx, udp_rx) = unsync::mpsc::channel::<(SocketAddr, Vec<u8>)>(1024); let (outgoing_tx, outgoing_rx) = unsync::mpsc::channel::<UtunPacket>(1024); let outgoing_rx = outgoing_rx.peekable(); - let timer = Timer::default(); let udp_write_passthrough = udp_sink.sink_map_err(|_| ()).send_all( udp_rx.map(|(addr, packet)| { @@ -97,7 +88,7 @@ impl PeerServer { handle.spawn(udp_write_passthrough); Ok(PeerServer { - handle, shared_state, timer, udp_stream, udp_tx, tunnel_tx, timer_tx, timer_rx, outgoing_tx, outgoing_rx + handle, shared_state, timer, udp_stream, udp_tx, tunnel_tx, outgoing_tx, outgoing_rx }) } @@ -118,7 +109,7 @@ impl PeerServer { } fn handle_incoming_packet(&mut self, addr: SocketAddr, packet: Vec<u8>) -> Result<(), Error> { - debug!("got a UDP packet from {:?} of length {}, packet type {}", &addr, packet.len(), packet[0]); + trace!("got a UDP packet from {:?} of length {}, packet type {}", &addr, packet.len(), packet[0]); let mut state = self.shared_state.borrow_mut(); match packet[0] { 1 => { @@ -130,7 +121,7 @@ impl PeerServer { Noise::verify_mac1(pubkey, mac_in, &mac_out[..16])?; } - info!("got handshake initiation request"); + info!("got handshake initiation request (0x01)"); let their_index = LittleEndian::read_u32(&packet[4..]); @@ -165,6 +156,8 @@ impl PeerServer { let (mac_in, mac_out) = packet.split_at(60); Noise::verify_mac1(pubkey, mac_in, &mac_out[..16])?; } + info!("got handshake response (0x02)"); + let our_index = LittleEndian::read_u32(&packet[8..]); let peer_ref = state.index_map.get(&our_index) .ok_or_else(|| format_err!("unknown our_index ({})", our_index))? @@ -174,35 +167,16 @@ impl PeerServer { if let Some(index) = dead_index { let _ = state.index_map.remove(&index); } - info!("got handshake response, ratcheted session."); - - // TODO neither of these timers are to spec, but are simple functional placeholders - let rekey_timer = self.timer.sleep(Duration::from_secs(REKEY_AFTER_TIME)); - let rekey_future = rekey_timer.map_err(|_|()).and_then({ - let timer_tx = self.timer_tx.clone(); - let peer_ref = peer_ref.clone(); - move |_| { - timer_tx.clone().send(TimerMessage::Rekey(peer_ref, our_index)) - .then(|_| Ok(())) - } - }).then(|_| Ok(())); - self.handle.spawn(rekey_future); - - let keepalive_interval = self.timer.interval(Duration::from_secs(KEEPALIVE_TIMEOUT)); - let keepalive_future = keepalive_interval.map_err(|_|()).for_each({ - let timer_tx = self.timer_tx.clone(); - let peer_ref = peer_ref.clone(); - move |_| -> Box<Future<Item = _, Error = _>> { - if peer_ref.borrow().our_current_index().unwrap() != our_index { - debug!("cancelling old keepalive_timer"); - Box::new(future::err(())) - } else { - Box::new(timer_tx.clone().send(TimerMessage::KeepAlive(peer_ref.clone(), our_index)) - .then(|_| Ok(()))) - } - } - }); - self.handle.spawn(keepalive_future); + info!("handshake response processed, current session now {}", our_index); + + // Start the timers for this new session + self.timer.spawn_delayed(&self.handle, + REKEY_AFTER_TIME, + TimerMessage::Rekey(peer_ref.clone(), our_index)); + + self.timer.spawn_delayed(&self.handle, + KEEPALIVE_TIMEOUT, + TimerMessage::KeepAlive(peer_ref.clone(), our_index)); }, 3 => { warn!("cookie messages not yet implemented."); @@ -244,6 +218,13 @@ impl PeerServer { TimerMessage::Rekey(peer_ref, _our_index) => { let mut peer = peer_ref.borrow_mut(); + let now = SystemTime::now(); + if let Some(last_init) = peer.last_rekey_init { + if now.duration_since(last_init).unwrap() < Duration::from_secs(REKEY_TIMEOUT) { + debug!("too soon since last rekey attempt"); + } + } + let private_key = &state.interface_info.private_key.expect("no private key!"); let (init_packet, our_index) = peer.initiate_new_session(private_key).unwrap(); let _ = state.index_map.insert(our_index, peer_ref.clone()); @@ -253,19 +234,21 @@ impl PeerServer { self.send_to_peer((endpoint, init_packet)); info!("sent rekey"); }, - TimerMessage::KeepAlive(peer_ref, _our_index) => { + TimerMessage::KeepAlive(peer_ref, our_index) => { let mut peer = peer_ref.borrow_mut(); - let mut packet = vec![0u8; TRANSPORT_OVERHEAD]; - packet[0] = 4; - let their_index = peer.their_current_index().expect("no current index for them"); - let endpoint = peer.info.endpoint.unwrap(); - peer.tx_bytes += packet.len() as u64; - let noise = peer.current_noise().expect("current noise session"); - LittleEndian::write_u32(&mut packet[4..], their_index); - LittleEndian::write_u64(&mut packet[8..], noise.sending_nonce().unwrap()); - let _ = noise.write_message(&[], &mut packet[16..]).map_err(SyncFailure::new)?; - self.send_to_peer((endpoint, packet)); - debug!("sent keepalive"); + match peer.find_session(our_index) { + Some((_, SessionType::Current)) => { + self.send_to_peer(peer.handle_outgoing_transport(&[])?); + debug!("sent keepalive packet ({})", our_index); + + self.timer.spawn_delayed(&self.handle, + KEEPALIVE_TIMEOUT, + TimerMessage::KeepAlive(peer_ref.clone(), our_index)); + } + _ => { + debug!("keepalive timer received for non-current session, ignoring."); + } + } } } Ok(()) @@ -304,7 +287,7 @@ impl Future for PeerServer { fn poll(&mut self) -> Poll<Self::Item, Self::Error> { // Handle pending state-changing timers loop { - match self.timer_rx.poll() { + match self.timer.poll() { Ok(Async::Ready(Some(message))) => { let _ = self.handle_timer(message).map_err(|e| warn!("TIMER ERR: {:?}", e)); }, |