From ac2e9275b6c3d6ddd16e616d24f86a055a9ad461 Mon Sep 17 00:00:00 2001 From: Jake McGinty Date: Thu, 8 Mar 2018 20:20:37 +0000 Subject: messages: strongly typed messages --- src/interface/peer_server.rs | 76 +++++++++++++++++++++++--------------------- 1 file changed, 39 insertions(+), 37 deletions(-) (limited to 'src/interface/peer_server.rs') diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs index 3ab4a37..534eb1b 100644 --- a/src/interface/peer_server.rs +++ b/src/interface/peer_server.rs @@ -2,10 +2,12 @@ use consts::{REKEY_TIMEOUT, REKEY_ATTEMPT_TIME, KEEPALIVE_TIMEOUT, STALE_SESSION MAX_CONTENT_SIZE, WIPE_AFTER_TIME}; use cookie; use interface::{SharedPeer, SharedState, State, UtunPacket, config}; +use message::{Message, Initiation, Response, CookieReply, Transport}; use peer::{Peer, SessionType}; use time::Timestamp; use timer::{Timer, TimerMessage}; +use std::convert::TryInto; use std::net::{Ipv6Addr, SocketAddr}; use std::time::Duration; @@ -108,18 +110,18 @@ impl PeerServer { } } - fn handle_ingress_packet(&mut self, addr: SocketAddr, packet: &[u8]) -> Result<(), Error> { + fn handle_ingress_packet(&mut self, addr: SocketAddr, packet: Vec) -> Result<(), Error> { trace!("got a UDP packet from {:?} of length {}, packet type {}", &addr, packet.len(), packet[0]); - match packet[0] { - 1 => self.handle_ingress_handshake_init(addr, packet), - 2 => self.handle_ingress_handshake_resp(addr, packet), - 3 => self.handle_ingress_cookie_reply(addr, packet), - 4 => self.handle_ingress_transport(addr, packet), - _ => bail!("unknown wireguard message type") + + match packet.try_into()? { + Message::Initiation(packet) => self.handle_ingress_handshake_init(addr, packet), + Message::Response(packet) => self.handle_ingress_handshake_resp(addr, packet), + Message::CookieReply(packet) => self.handle_ingress_cookie_reply(addr, packet), + Message::Transport(packet) => self.handle_ingress_transport(addr, packet), } } - fn handle_ingress_handshake_init(&mut self, addr: SocketAddr, packet: &[u8]) -> Result<(), Error> { + fn handle_ingress_handshake_init(&mut self, addr: SocketAddr, packet: Initiation) -> Result<(), Error> { ensure!(packet.len() == 148, "handshake init packet length is incorrect"); let mut state = self.shared_state.borrow_mut(); { @@ -131,7 +133,7 @@ impl PeerServer { let handshake = Peer::process_incoming_handshake( &state.interface_info.private_key.ok_or_else(|| err_msg("no private key!"))?, - packet)?; + &packet)?; let peer_ref = state.pubkey_map.get(handshake.their_pubkey()) .ok_or_else(|| err_msg("unknown peer pubkey"))?.clone(); @@ -147,7 +149,7 @@ impl PeerServer { } // TODO use the address to update endpoint if it changes i suppose - fn handle_ingress_handshake_resp(&mut self, _addr: SocketAddr, packet: &[u8]) -> Result<(), Error> { + fn handle_ingress_handshake_resp(&mut self, _addr: SocketAddr, packet: Response) -> Result<(), Error> { ensure!(packet.len() == 92, "handshake resp packet length is incorrect"); let mut state = self.shared_state.borrow_mut(); { @@ -161,7 +163,7 @@ impl PeerServer { .ok_or_else(|| format_err!("unknown our_index ({})", our_index))? .clone(); let mut peer = peer_ref.borrow_mut(); - let dead_index = peer.process_incoming_handshake_response(packet)?; + let dead_index = peer.process_incoming_handshake_response(&packet)?; if let Some(index) = dead_index { let _ = state.index_map.remove(&index); } @@ -180,37 +182,37 @@ impl PeerServer { } info!("handshake response received, current session now {}", our_index); - self.timer.spawn_delayed(*WIPE_AFTER_TIME, TimerMessage::Wipe(peer_ref.clone())); + self.timer.send_after(*WIPE_AFTER_TIME, TimerMessage::Wipe(peer_ref.clone())); match peer.info.keepalive { Some(keepalive) if keepalive > 0 => { - self.timer.spawn_delayed(Duration::from_secs(u64::from(keepalive)), - TimerMessage::PersistentKeepAlive(peer_ref.clone(), our_index)); + self.timer.send_after(Duration::from_secs(u64::from(keepalive)), + TimerMessage::PersistentKeepAlive(peer_ref.clone(), our_index)); }, _ => { - self.timer.spawn_delayed(*KEEPALIVE_TIMEOUT, - TimerMessage::PassiveKeepAlive(peer_ref.clone(), our_index)); + self.timer.send_after(*KEEPALIVE_TIMEOUT, + TimerMessage::PassiveKeepAlive(peer_ref.clone(), our_index)); } } Ok(()) } - fn handle_ingress_cookie_reply(&mut self, _addr: SocketAddr, packet: &[u8]) -> Result<(), Error> { + fn handle_ingress_cookie_reply(&mut self, _addr: SocketAddr, packet: CookieReply) -> Result<(), Error> { let state = self.shared_state.borrow_mut(); - let our_index = LittleEndian::read_u32(&packet[4..]); - let peer_ref = state.index_map.get(&our_index).ok_or_else(|| err_msg("unknown our_index"))?.clone(); + let peer_ref = state.index_map.get(&packet.our_index()).ok_or_else(|| err_msg("unknown our_index"))?.clone(); let mut peer = peer_ref.borrow_mut(); - peer.consume_cookie_reply(packet) + peer.consume_cookie_reply(&packet) } - fn handle_ingress_transport(&mut self, addr: SocketAddr, packet: &[u8]) -> Result<(), Error> { - let our_index = LittleEndian::read_u32(&packet[4..]); - let peer_ref = self.shared_state.borrow().index_map.get(&our_index).ok_or_else(|| err_msg("unknown our_index"))?.clone(); - let (raw_packet, needs_handshake) = { + fn handle_ingress_transport(&mut self, addr: SocketAddr, packet: Transport) -> Result<(), Error> { + let peer_ref = self.shared_state.borrow().index_map.get(&packet.our_index()) + .ok_or_else(|| err_msg("unknown our_index"))?.clone(); + + let (raw_packet, needs_handshake) = { let mut peer = peer_ref.borrow_mut(); let mut state = self.shared_state.borrow_mut(); - let (raw_packet, transition) = peer.handle_incoming_transport(addr, packet)?; + let (raw_packet, transition) = peer.handle_incoming_transport(addr, &packet)?; // If a new session has been set to current (TODO make this more clear) if let Some(possible_dead_index) = transition { @@ -228,8 +230,8 @@ impl PeerServer { } let our_new_index = peer.sessions.current.as_ref().unwrap().our_index; - self.timer.spawn_delayed(*KEEPALIVE_TIMEOUT, TimerMessage::PassiveKeepAlive(peer_ref.clone(), our_new_index)); - self.timer.spawn_delayed(*WIPE_AFTER_TIME, TimerMessage::Wipe(peer_ref.clone())); + self.timer.send_after(*KEEPALIVE_TIMEOUT, TimerMessage::PassiveKeepAlive(peer_ref.clone(), our_new_index)); + self.timer.send_after(*WIPE_AFTER_TIME, TimerMessage::Wipe(peer_ref.clone())); } (raw_packet, peer.needs_new_handshake(false)) }; @@ -296,7 +298,7 @@ impl PeerServer { self.send_to_peer((endpoint, init_packet))?; peer.last_sent_init = Timestamp::now(); let when = *REKEY_TIMEOUT; - self.timer.spawn_delayed(when, TimerMessage::Rekey(peer_ref.clone(), new_index)); + self.timer.send_after(when, TimerMessage::Rekey(peer_ref.clone(), new_index)); Ok(new_index) } @@ -311,7 +313,7 @@ impl PeerServer { Some((_, SessionType::Next)) => { if peer.last_sent_init.elapsed() < *REKEY_TIMEOUT { let wait = *REKEY_TIMEOUT - peer.last_sent_init.elapsed(); - self.timer.spawn_delayed(wait, Rekey(peer_ref.clone(), our_index)); + self.timer.send_after(wait, Rekey(peer_ref.clone(), our_index)); bail!("too soon since last init sent, waiting {:?} ({})", wait, our_index); } else if peer.last_tun_queue.elapsed() > *REKEY_ATTEMPT_TIME { peer.sessions.next = None; @@ -322,7 +324,7 @@ impl PeerServer { let since_last_recv = peer.sessions.current.as_ref().unwrap().last_received.elapsed(); // gross if since_last_recv <= *STALE_SESSION_TIMEOUT { let wait = *STALE_SESSION_TIMEOUT - since_last_recv; - self.timer.spawn_delayed(wait, Rekey(peer_ref.clone(), our_index)); + self.timer.send_after(wait, Rekey(peer_ref.clone(), our_index)); bail!("rekey tick (waiting ~{}s due to stale session check)", wait.as_secs()); } }, @@ -343,14 +345,14 @@ impl PeerServer { let since_last_send = session.last_sent.elapsed(); if since_last_recv < *KEEPALIVE_TIMEOUT { let wait = *KEEPALIVE_TIMEOUT - since_last_recv; - self.timer.spawn_delayed(wait, PassiveKeepAlive(peer_ref.clone(), our_index)); + self.timer.send_after(wait, PassiveKeepAlive(peer_ref.clone(), our_index)); bail!("passive keepalive tick (waiting ~{}s due to last recv time)", wait.as_secs()); } else if since_last_send < *KEEPALIVE_TIMEOUT { let wait = *KEEPALIVE_TIMEOUT - since_last_send; - self.timer.spawn_delayed(wait, PassiveKeepAlive(peer_ref.clone(), our_index)); + self.timer.send_after(wait, PassiveKeepAlive(peer_ref.clone(), our_index)); bail!("passive keepalive tick (waiting ~{}s due to last send time)", wait.as_secs()); } else if session.keepalive_sent { - self.timer.spawn_delayed(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone(), our_index)); + self.timer.send_after(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone(), our_index)); bail!("passive keepalive already sent (waiting ~{}s to see if session survives)", KEEPALIVE_TIMEOUT.as_secs()); } else { session.keepalive_sent = true; @@ -360,7 +362,7 @@ impl PeerServer { self.send_to_peer(peer.handle_outgoing_transport(&[])?)?; debug!("sent passive keepalive packet ({})", our_index); - self.timer.spawn_delayed(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone(), our_index)); + self.timer.send_after(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone(), our_index)); }, PersistentKeepAlive(peer_ref, our_index) => { let mut peer = peer_ref.borrow_mut(); @@ -373,8 +375,8 @@ impl PeerServer { debug!("sent persistent keepalive packet ({})", our_index); if let Some(keepalive) = peer.info.keepalive { - self.timer.spawn_delayed(Duration::from_secs(u64::from(keepalive)), - PersistentKeepAlive(peer_ref.clone(), our_index)); + self.timer.send_after(Duration::from_secs(u64::from(keepalive)), + PersistentKeepAlive(peer_ref.clone(), our_index)); } }, @@ -444,7 +446,7 @@ impl Future for PeerServer { loop { match self.udp.as_mut().unwrap().ingress.poll() { Ok(Async::Ready(Some((addr, packet)))) => { - let _ = self.handle_ingress_packet(addr, &packet).map_err(|e| warn!("UDP ERR: {:?}", e)); + let _ = self.handle_ingress_packet(addr, packet).map_err(|e| warn!("UDP ERR: {:?}", e)); }, Ok(Async::NotReady) => break, Ok(Async::Ready(None)) | Err(_) => return Err(()), -- cgit v1.2.3-59-g8ed1b