From 9f5b12d3b8967bee22491515731d950d8d5220e4 Mon Sep 17 00:00:00 2001 From: Jake McGinty Date: Thu, 3 May 2018 23:42:29 -0700 Subject: timers: more corrections to persistent keepalive --- src/interface/peer_server.rs | 75 +++++++++++++++++++++++++------------------- 1 file changed, 43 insertions(+), 32 deletions(-) (limited to 'src/interface/peer_server.rs') diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs index 7ad6d15..31b004b 100644 --- a/src/interface/peer_server.rs +++ b/src/interface/peer_server.rs @@ -14,14 +14,15 @@ use rand::{self, Rng}; use udp::{Endpoint, UdpSocket, PeerServerMessage, UdpChannel}; use tokio_core::reactor::Handle; -use std::{convert::TryInto, time::Duration}; +use std::convert::TryInto; pub enum ChannelMessage { ClearPrivateKey, NewPrivateKey, NewListenPort(u16), NewFwmark(u32), - NewPersistentKeepalive(u16), + NewPersistentKeepalive(SharedPeer), + NewPeer(SharedPeer), } struct Channel { @@ -165,7 +166,6 @@ impl PeerServer { Ok(()) } - // TODO use the address to update endpoint if it changes i suppose fn handle_ingress_handshake_resp(&mut self, addr: Endpoint, packet: &Response) -> Result<(), Error> { ensure!(packet.len() == 92, "handshake resp packet length is incorrect"); let mut state = self.shared_state.borrow_mut(); @@ -200,17 +200,6 @@ impl PeerServer { info!("handshake response received, current session now {}", our_index); self.timer.send_after(*WIPE_AFTER_TIME, TimerMessage::Wipe(peer_ref.clone())); - - match peer.info.keepalive { - Some(keepalive) if keepalive > 0 => { - self.timer.send_after(Duration::from_secs(u64::from(keepalive)), - TimerMessage::PersistentKeepAlive(peer_ref.clone(), our_index)); - }, - _ => { - self.timer.send_after(*KEEPALIVE_TIMEOUT, - TimerMessage::PassiveKeepAlive(peer_ref.clone(), our_index)); - } - } Ok(()) } @@ -245,8 +234,6 @@ impl PeerServer { } } - let our_new_index = peer.sessions.current.as_ref().unwrap().our_index; - self.timer.send_after(*KEEPALIVE_TIMEOUT, TimerMessage::PassiveKeepAlive(peer_ref.clone(), our_new_index)); self.timer.send_after(*WIPE_AFTER_TIME, TimerMessage::Wipe(peer_ref.clone())); } (raw_packet, peer.needs_new_handshake(false)) @@ -355,29 +342,29 @@ impl PeerServer { let new_index = self.send_handshake_init(&peer_ref)?; debug!("sent handshake init (Rekey timer) ({} -> {})", our_index, new_index); }, - PassiveKeepAlive(peer_ref, our_index) => { + PassiveKeepAlive(peer_ref) => { let mut peer = peer_ref.borrow_mut(); { if peer.sessions.current.is_none() { - self.timer.send_after(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone(), our_index)); - bail!("no active session. waiting until there is one."); + self.timer.send_after(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone())); + bail!("passive keepalive skip: no active session. waiting until there is one."); } else if peer.info.keepalive.is_some() { - self.timer.send_after(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone(), our_index)); - bail!("persistent keepalive set, no passive keepalive needed."); + self.timer.send_after(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone())); + bail!("passive keepalive skip: persistent keepalive set."); } let since_last_recv = peer.timers.data_received.elapsed(); let since_last_send = peer.timers.data_sent.elapsed(); if since_last_recv < *KEEPALIVE_TIMEOUT { let wait = *KEEPALIVE_TIMEOUT - since_last_recv; - self.timer.send_after(wait, PassiveKeepAlive(peer_ref.clone(), our_index)); + self.timer.send_after(wait, PassiveKeepAlive(peer_ref.clone())); bail!("passive keepalive tick (waiting ~{}s due to last recv time)", wait.as_secs()); } else if since_last_send < *KEEPALIVE_TIMEOUT { let wait = *KEEPALIVE_TIMEOUT - since_last_send; - self.timer.send_after(wait, PassiveKeepAlive(peer_ref.clone(), our_index)); + self.timer.send_after(wait, PassiveKeepAlive(peer_ref.clone())); bail!("passive keepalive tick (waiting ~{}s due to last send time)", wait.as_secs()); } else if peer.timers.keepalive_sent { - self.timer.send_after(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone(), our_index)); + self.timer.send_after(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone())); bail!("passive keepalive already sent (waiting ~{}s to see if session survives)", KEEPALIVE_TIMEOUT.as_secs()); } else { peer.timers.keepalive_sent = true; @@ -385,24 +372,26 @@ impl PeerServer { } self.send_to_peer(peer.handle_outgoing_transport(&[])?)?; - debug!("sent passive keepalive packet ({})", our_index); + debug!("sent passive keepalive packet"); - self.timer.send_after(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone(), our_index)); + self.timer.send_after(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone())); }, - PersistentKeepAlive(peer_ref, our_index) => { + PersistentKeepAlive(peer_ref) => { let mut peer = peer_ref.borrow_mut(); if let Some(persistent_keepalive) = peer.info.persistent_keepalive() { let since_last_auth_any = peer.timers.authenticated_traversed.elapsed(); if since_last_auth_any < persistent_keepalive { let wait = persistent_keepalive - since_last_auth_any; - self.timer.send_after(wait, PersistentKeepAlive(peer_ref.clone(), our_index)); + let handle = self.timer.send_after(wait, PersistentKeepAlive(peer_ref.clone())); + peer.timers.persistent_timer = Some(handle); bail!("persistent keepalive tick (waiting ~{}s due to last authenticated packet time)", wait.as_secs()); } self.send_to_peer(peer.handle_outgoing_transport(&[])?)?; - self.timer.send_after(persistent_keepalive, PersistentKeepAlive(peer_ref.clone(), our_index)); - debug!("sent persistent keepalive packet ({})", our_index); + let handle = self.timer.send_after(persistent_keepalive, PersistentKeepAlive(peer_ref.clone())); + peer.timers.persistent_timer = Some(handle); + debug!("sent persistent keepalive packet"); } else { bail!("no persistent keepalive set for peer (likely unset between the time the timer was started and now)."); } @@ -438,10 +427,32 @@ impl PeerServer { self.port = None; } }, - NewListenPort(_) => self.rebind().unwrap(), + NewPeer(peer_ref) => { + let mut peer = peer_ref.borrow_mut(); + self.timer.send_after(*KEEPALIVE_TIMEOUT, TimerMessage::PassiveKeepAlive(peer_ref.clone())); + if let Some(keepalive) = peer.info.persistent_keepalive() { + let handle = self.timer.send_after(keepalive, TimerMessage::PersistentKeepAlive(peer_ref.clone())); + peer.timers.persistent_timer = Some(handle); + } + }, + NewPersistentKeepalive(peer_ref) => { + let mut peer = peer_ref.borrow_mut(); + if let Some(ref mut handle) = peer.timers.persistent_timer { + handle.cancel(); + debug!("sent cancel signal to old persistent_timer."); + } + + if let Some(keepalive) = peer.info.persistent_keepalive() { + let handle = self.timer.send_after(keepalive, TimerMessage::PersistentKeepAlive(peer_ref.clone())); + peer.timers.persistent_timer = Some(handle); + self.send_to_peer(peer.handle_outgoing_transport(&[])?)?; + debug!("set new keepalive timer and immediately sent new keepalive packet."); + } + } + NewListenPort(_) => self.rebind()?, NewFwmark(mark) => { if let Some(ref udp) = self.udp { - udp.set_mark(mark).unwrap(); + udp.set_mark(mark)?; } } _ => {} -- cgit v1.2.3-59-g8ed1b