aboutsummaryrefslogtreecommitdiffstats
path: root/src/interface
diff options
context:
space:
mode:
authorJake McGinty <me@jake.su>2018-02-15 21:49:03 +0000
committerJake McGinty <me@jake.su>2018-02-15 21:49:03 +0000
commit210f9274752a9ea828199de3a55c7275ef0129f1 (patch)
treea388841c476e196ccfbc33c1e3733d1f93b9144a /src/interface
parentsupport persistent keepalive (diff)
downloadwireguard-rs-210f9274752a9ea828199de3a55c7275ef0129f1.tar.xz
wireguard-rs-210f9274752a9ea828199de3a55c7275ef0129f1.zip
more complete rekey timer
still todo: reject-after timeouts
Diffstat (limited to 'src/interface')
-rw-r--r--src/interface/mod.rs10
-rw-r--r--src/interface/peer_server.rs132
2 files changed, 95 insertions, 47 deletions
diff --git a/src/interface/mod.rs b/src/interface/mod.rs
index 395f97b..e75c25c 100644
--- a/src/interface/mod.rs
+++ b/src/interface/mod.rs
@@ -177,8 +177,6 @@ impl Interface {
}).map_err(|_| ());
let config_fut = config_rx.for_each({
- let tx = peer_server.udp_tx().clone();
- let handle = handle.clone();
let state = self.state.clone();
move |event| {
let mut state = state.borrow_mut();
@@ -198,18 +196,10 @@ impl Interface {
info!("added new peer: {}", info);
let mut peer = Peer::new(info.clone());
- let private_key = &state.interface_info.private_key.expect("no private key!");
- let (init_packet, our_index) = peer.initiate_new_session(private_key).expect("initiate_new_session");
-
let peer = Rc::new(RefCell::new(peer));
state.router.add_allowed_ips(&info.allowed_ips, &peer);
-
- let _ = state.index_map.insert(our_index, peer.clone());
let _ = state.pubkey_map.insert(info.pub_key, peer);
-
- handle.spawn(tx.clone().send((info.endpoint.unwrap(), init_packet)).then(|_| Ok(())));
- debug!("sent handshake packet to new peer");
},
UpdateEvent::RemovePeer(_pub_key) => {
warn!("RemovePeer event not yet handled");
diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs
index 177ad65..59f3386 100644
--- a/src/interface/peer_server.rs
+++ b/src/interface/peer_server.rs
@@ -1,5 +1,6 @@
use super::{SharedState, UtunPacket, trace_packet};
-use consts::{REKEY_TIMEOUT, REKEY_AFTER_TIME, KEEPALIVE_TIMEOUT, MAX_CONTENT_SIZE, TIMER_TICK_DURATION};
+use consts::{REKEY_TIMEOUT, REKEY_AFTER_TIME, REKEY_ATTEMPT_TIME, KEEPALIVE_TIMEOUT, MAX_CONTENT_SIZE, TIMER_TICK_DURATION};
+use interface::SharedPeer;
use protocol::{Peer, SessionType};
use noise::Noise;
use timer::{Timer, TimerMessage};
@@ -89,10 +90,6 @@ impl PeerServer {
self.outgoing_tx.clone()
}
- pub fn udp_tx(&self) -> unsync::mpsc::Sender<(SocketAddr, Vec<u8>)> {
- self.udp_tx.clone()
- }
-
fn send_to_peer(&self, payload: PeerServerMessage) {
self.handle.spawn(self.udp_tx.clone().send(payload).then(|_| Ok(())));
}
@@ -151,11 +148,6 @@ impl PeerServer {
}
info!("handshake response received, current session now {}", our_index);
- // Start the timers for this new session
- self.timer.spawn_delayed(&self.handle,
- *REKEY_AFTER_TIME,
- TimerMessage::Rekey(peer_ref.clone(), our_index));
-
self.timer.spawn_delayed(&self.handle,
*KEEPALIVE_TIMEOUT,
TimerMessage::PassiveKeepAlive(peer_ref.clone(), our_index));
@@ -200,27 +192,69 @@ impl PeerServer {
Ok(())
}
+ fn send_handshake_init(&mut self, peer_ref: SharedPeer) -> Result<u32, Error> {
+ let mut state = self.shared_state.borrow_mut();
+ let mut peer = peer_ref.borrow_mut();
+ let private_key = &state.interface_info.private_key.ok_or_else(|| err_msg("no private key!"))?;
+
+ let (endpoint, init_packet, new_index, dead_index) = peer.initiate_new_session(private_key)?;
+
+ let _ = state.index_map.insert(new_index, peer_ref.clone());
+ if let Some(index) = dead_index {
+ trace!("removing abandoned 'next' session ({}) from index map", index);
+ let _ = state.index_map.remove(&index);
+ }
+
+ self.send_to_peer((endpoint, init_packet));
+ peer.last_rekey_init = Some(Instant::now());
+ let when = *REKEY_TIMEOUT + *TIMER_TICK_DURATION * 2;
+ self.timer.spawn_delayed(&self.handle,
+ when,
+ TimerMessage::Rekey(peer_ref.clone(), new_index));
+ Ok(new_index)
+ }
+
fn handle_timer(&mut self, message: TimerMessage) -> Result<(), Error> {
- let mut state = self.shared_state.borrow_mut();
match message {
- TimerMessage::Rekey(peer_ref, _our_index) => {
- let mut peer = peer_ref.borrow_mut();
-
- let now = Instant::now();
- if let Some(last_init) = peer.last_rekey_init {
- if now.duration_since(last_init) < *REKEY_TIMEOUT {
- debug!("too soon since last rekey attempt");
+ TimerMessage::Rekey(peer_ref, our_index) => {
+ {
+ let mut peer = peer_ref.borrow_mut();
+ let now = Instant::now();
+
+ match peer.find_session(our_index) {
+ Some((_, SessionType::Next)) => {
+ if let Some(last_init_sent) = peer.last_rekey_init {
+ let since_last_init = now.duration_since(last_init_sent);
+ if since_last_init < *REKEY_TIMEOUT {
+ let wait = *REKEY_TIMEOUT - since_last_init + *TIMER_TICK_DURATION * 2;
+ self.timer.spawn_delayed(&self.handle,
+ wait,
+ TimerMessage::Rekey(peer_ref.clone(), our_index));
+ bail!("too soon since last init sent, waiting {:?} ({})", wait, our_index);
+ } else if since_last_init > *REKEY_ATTEMPT_TIME {
+ bail!("REKEY_ATTEMPT_TIME exceeded ({})", our_index);
+ }
+ }
+ },
+ Some((_, SessionType::Current)) => {
+ if let Some(last_handshake) = peer.last_handshake_instant {
+ let since_last_handshake = now.duration_since(last_handshake);
+ if since_last_handshake <= *REKEY_AFTER_TIME {
+ let wait = *REKEY_AFTER_TIME - since_last_handshake + *TIMER_TICK_DURATION * 2;
+ self.timer.spawn_delayed(&self.handle,
+ wait,
+ TimerMessage::Rekey(peer_ref.clone(), our_index));
+ bail!("recent last complete handshake - waiting {:?} ({})", wait, our_index);
+ }
+ }
+ },
+ _ => bail!("index is linked to a dead session, bailing.")
}
}
- let private_key = &state.interface_info.private_key.expect("no private key!");
- let (init_packet, our_index) = peer.initiate_new_session(private_key).unwrap();
- let _ = state.index_map.insert(our_index, peer_ref.clone());
-
- let endpoint = peer.info.endpoint.ok_or_else(|| err_msg("no endpoint for peer"))?;
+ let new_index = self.send_handshake_init(peer_ref.clone())?;
+ debug!("sent handshake init (Rekey timer) ({} -> {})", our_index, new_index);
- self.send_to_peer((endpoint, init_packet));
- debug!("sent rekey");
},
TimerMessage::PassiveKeepAlive(peer_ref, our_index) => {
let mut peer = peer_ref.borrow_mut();
@@ -269,27 +303,51 @@ impl PeerServer {
// Just this way to avoid a double-mutable-borrow while peeking.
fn peek_from_tun_and_handle(&mut self) -> Result<bool, Error> {
- let (endpoint, out_packet) = {
+ enum Decision { Drop, Wait, Handshake(SharedPeer), Transport((SocketAddr, Vec<u8>))}
+ let decision = {
let packet = match self.outgoing_rx.peek() {
Ok(Async::Ready(Some(packet))) => packet,
Ok(Async::NotReady) => return Ok(false),
Ok(Async::Ready(None)) | Err(_) => bail!("channel failure"),
};
-
- ensure!(!packet.payload().is_empty() && packet.payload().len() < MAX_CONTENT_SIZE,
- "illegal packet size");
-
trace_packet("received UTUN packet: ", packet.payload());
- let state = self.shared_state.borrow();
- let peer = state.router.route_to_peer(packet.payload()).ok_or_else(|| err_msg("no route to peer"))?;
- let mut peer = peer.borrow_mut();
- peer.handle_outgoing_transport(packet.payload())?
+ let mut state = self.shared_state.borrow_mut();
+ let peer_ref = state.router.route_to_peer(packet.payload()).ok_or_else(|| err_msg("no route to peer"))?;
+ let mut peer = peer_ref.borrow_mut();
+
+ if packet.payload().is_empty() || packet.payload().len() > MAX_CONTENT_SIZE {
+ Decision::Drop
+ } else if peer.sessions.current.is_none() {
+ if peer.sessions.next.is_some() {
+ Decision::Wait
+ } else {
+ Decision::Handshake(peer_ref.clone())
+ }
+ } else {
+ Decision::Transport(peer.handle_outgoing_transport(packet.payload())?)
+ }
};
- self.send_to_peer((endpoint, out_packet));
- let _ = self.outgoing_rx.poll(); // if we haven't short-circuited yet, take the packet out of the queue
- Ok(true)
+ match decision {
+ Decision::Transport(outgoing) => {
+ self.send_to_peer(outgoing);
+ let _ = self.outgoing_rx.poll();
+ Ok(true)
+ },
+ Decision::Handshake(peer_ref) => {
+ debug!("kicking off handshake because there are pending outgoing packets");
+ self.send_handshake_init(peer_ref)?;
+ Ok(false)
+ },
+ Decision::Drop => {
+ let _ = self.outgoing_rx.poll();
+ Ok(true)
+ },
+ Decision::Wait => {
+ Ok(false)
+ }
+ }
}
}