diff options
author | 2018-02-15 21:49:03 +0000 | |
---|---|---|
committer | 2018-02-15 21:49:03 +0000 | |
commit | 210f9274752a9ea828199de3a55c7275ef0129f1 (patch) | |
tree | a388841c476e196ccfbc33c1e3733d1f93b9144a /src/interface | |
parent | support persistent keepalive (diff) | |
download | wireguard-rs-210f9274752a9ea828199de3a55c7275ef0129f1.tar.xz wireguard-rs-210f9274752a9ea828199de3a55c7275ef0129f1.zip |
more complete rekey timer
still todo: reject-after timeouts
Diffstat (limited to 'src/interface')
-rw-r--r-- | src/interface/mod.rs | 10 | ||||
-rw-r--r-- | src/interface/peer_server.rs | 132 |
2 files changed, 95 insertions, 47 deletions
diff --git a/src/interface/mod.rs b/src/interface/mod.rs index 395f97b..e75c25c 100644 --- a/src/interface/mod.rs +++ b/src/interface/mod.rs @@ -177,8 +177,6 @@ impl Interface { }).map_err(|_| ()); let config_fut = config_rx.for_each({ - let tx = peer_server.udp_tx().clone(); - let handle = handle.clone(); let state = self.state.clone(); move |event| { let mut state = state.borrow_mut(); @@ -198,18 +196,10 @@ impl Interface { info!("added new peer: {}", info); let mut peer = Peer::new(info.clone()); - let private_key = &state.interface_info.private_key.expect("no private key!"); - let (init_packet, our_index) = peer.initiate_new_session(private_key).expect("initiate_new_session"); - let peer = Rc::new(RefCell::new(peer)); state.router.add_allowed_ips(&info.allowed_ips, &peer); - - let _ = state.index_map.insert(our_index, peer.clone()); let _ = state.pubkey_map.insert(info.pub_key, peer); - - handle.spawn(tx.clone().send((info.endpoint.unwrap(), init_packet)).then(|_| Ok(()))); - debug!("sent handshake packet to new peer"); }, UpdateEvent::RemovePeer(_pub_key) => { warn!("RemovePeer event not yet handled"); diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs index 177ad65..59f3386 100644 --- a/src/interface/peer_server.rs +++ b/src/interface/peer_server.rs @@ -1,5 +1,6 @@ use super::{SharedState, UtunPacket, trace_packet}; -use consts::{REKEY_TIMEOUT, REKEY_AFTER_TIME, KEEPALIVE_TIMEOUT, MAX_CONTENT_SIZE, TIMER_TICK_DURATION}; +use consts::{REKEY_TIMEOUT, REKEY_AFTER_TIME, REKEY_ATTEMPT_TIME, KEEPALIVE_TIMEOUT, MAX_CONTENT_SIZE, TIMER_TICK_DURATION}; +use interface::SharedPeer; use protocol::{Peer, SessionType}; use noise::Noise; use timer::{Timer, TimerMessage}; @@ -89,10 +90,6 @@ impl PeerServer { self.outgoing_tx.clone() } - pub fn udp_tx(&self) -> unsync::mpsc::Sender<(SocketAddr, Vec<u8>)> { - self.udp_tx.clone() - } - fn send_to_peer(&self, payload: PeerServerMessage) { self.handle.spawn(self.udp_tx.clone().send(payload).then(|_| Ok(()))); } @@ -151,11 +148,6 @@ impl PeerServer { } info!("handshake response received, current session now {}", our_index); - // Start the timers for this new session - self.timer.spawn_delayed(&self.handle, - *REKEY_AFTER_TIME, - TimerMessage::Rekey(peer_ref.clone(), our_index)); - self.timer.spawn_delayed(&self.handle, *KEEPALIVE_TIMEOUT, TimerMessage::PassiveKeepAlive(peer_ref.clone(), our_index)); @@ -200,27 +192,69 @@ impl PeerServer { Ok(()) } + fn send_handshake_init(&mut self, peer_ref: SharedPeer) -> Result<u32, Error> { + let mut state = self.shared_state.borrow_mut(); + let mut peer = peer_ref.borrow_mut(); + let private_key = &state.interface_info.private_key.ok_or_else(|| err_msg("no private key!"))?; + + let (endpoint, init_packet, new_index, dead_index) = peer.initiate_new_session(private_key)?; + + let _ = state.index_map.insert(new_index, peer_ref.clone()); + if let Some(index) = dead_index { + trace!("removing abandoned 'next' session ({}) from index map", index); + let _ = state.index_map.remove(&index); + } + + self.send_to_peer((endpoint, init_packet)); + peer.last_rekey_init = Some(Instant::now()); + let when = *REKEY_TIMEOUT + *TIMER_TICK_DURATION * 2; + self.timer.spawn_delayed(&self.handle, + when, + TimerMessage::Rekey(peer_ref.clone(), new_index)); + Ok(new_index) + } + fn handle_timer(&mut self, message: TimerMessage) -> Result<(), Error> { - let mut state = self.shared_state.borrow_mut(); match message { - TimerMessage::Rekey(peer_ref, _our_index) => { - let mut peer = peer_ref.borrow_mut(); - - let now = Instant::now(); - if let Some(last_init) = peer.last_rekey_init { - if now.duration_since(last_init) < *REKEY_TIMEOUT { - debug!("too soon since last rekey attempt"); + TimerMessage::Rekey(peer_ref, our_index) => { + { + let mut peer = peer_ref.borrow_mut(); + let now = Instant::now(); + + match peer.find_session(our_index) { + Some((_, SessionType::Next)) => { + if let Some(last_init_sent) = peer.last_rekey_init { + let since_last_init = now.duration_since(last_init_sent); + if since_last_init < *REKEY_TIMEOUT { + let wait = *REKEY_TIMEOUT - since_last_init + *TIMER_TICK_DURATION * 2; + self.timer.spawn_delayed(&self.handle, + wait, + TimerMessage::Rekey(peer_ref.clone(), our_index)); + bail!("too soon since last init sent, waiting {:?} ({})", wait, our_index); + } else if since_last_init > *REKEY_ATTEMPT_TIME { + bail!("REKEY_ATTEMPT_TIME exceeded ({})", our_index); + } + } + }, + Some((_, SessionType::Current)) => { + if let Some(last_handshake) = peer.last_handshake_instant { + let since_last_handshake = now.duration_since(last_handshake); + if since_last_handshake <= *REKEY_AFTER_TIME { + let wait = *REKEY_AFTER_TIME - since_last_handshake + *TIMER_TICK_DURATION * 2; + self.timer.spawn_delayed(&self.handle, + wait, + TimerMessage::Rekey(peer_ref.clone(), our_index)); + bail!("recent last complete handshake - waiting {:?} ({})", wait, our_index); + } + } + }, + _ => bail!("index is linked to a dead session, bailing.") } } - let private_key = &state.interface_info.private_key.expect("no private key!"); - let (init_packet, our_index) = peer.initiate_new_session(private_key).unwrap(); - let _ = state.index_map.insert(our_index, peer_ref.clone()); - - let endpoint = peer.info.endpoint.ok_or_else(|| err_msg("no endpoint for peer"))?; + let new_index = self.send_handshake_init(peer_ref.clone())?; + debug!("sent handshake init (Rekey timer) ({} -> {})", our_index, new_index); - self.send_to_peer((endpoint, init_packet)); - debug!("sent rekey"); }, TimerMessage::PassiveKeepAlive(peer_ref, our_index) => { let mut peer = peer_ref.borrow_mut(); @@ -269,27 +303,51 @@ impl PeerServer { // Just this way to avoid a double-mutable-borrow while peeking. fn peek_from_tun_and_handle(&mut self) -> Result<bool, Error> { - let (endpoint, out_packet) = { + enum Decision { Drop, Wait, Handshake(SharedPeer), Transport((SocketAddr, Vec<u8>))} + let decision = { let packet = match self.outgoing_rx.peek() { Ok(Async::Ready(Some(packet))) => packet, Ok(Async::NotReady) => return Ok(false), Ok(Async::Ready(None)) | Err(_) => bail!("channel failure"), }; - - ensure!(!packet.payload().is_empty() && packet.payload().len() < MAX_CONTENT_SIZE, - "illegal packet size"); - trace_packet("received UTUN packet: ", packet.payload()); - let state = self.shared_state.borrow(); - let peer = state.router.route_to_peer(packet.payload()).ok_or_else(|| err_msg("no route to peer"))?; - let mut peer = peer.borrow_mut(); - peer.handle_outgoing_transport(packet.payload())? + let mut state = self.shared_state.borrow_mut(); + let peer_ref = state.router.route_to_peer(packet.payload()).ok_or_else(|| err_msg("no route to peer"))?; + let mut peer = peer_ref.borrow_mut(); + + if packet.payload().is_empty() || packet.payload().len() > MAX_CONTENT_SIZE { + Decision::Drop + } else if peer.sessions.current.is_none() { + if peer.sessions.next.is_some() { + Decision::Wait + } else { + Decision::Handshake(peer_ref.clone()) + } + } else { + Decision::Transport(peer.handle_outgoing_transport(packet.payload())?) + } }; - self.send_to_peer((endpoint, out_packet)); - let _ = self.outgoing_rx.poll(); // if we haven't short-circuited yet, take the packet out of the queue - Ok(true) + match decision { + Decision::Transport(outgoing) => { + self.send_to_peer(outgoing); + let _ = self.outgoing_rx.poll(); + Ok(true) + }, + Decision::Handshake(peer_ref) => { + debug!("kicking off handshake because there are pending outgoing packets"); + self.send_handshake_init(peer_ref)?; + Ok(false) + }, + Decision::Drop => { + let _ = self.outgoing_rx.poll(); + Ok(true) + }, + Decision::Wait => { + Ok(false) + } + } } } |