aboutsummaryrefslogtreecommitdiffstats
path: root/src/interface/peer_server.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/interface/peer_server.rs')
-rw-r--r--src/interface/peer_server.rs76
1 files changed, 39 insertions, 37 deletions
diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs
index 3ab4a37..534eb1b 100644
--- a/src/interface/peer_server.rs
+++ b/src/interface/peer_server.rs
@@ -2,10 +2,12 @@ use consts::{REKEY_TIMEOUT, REKEY_ATTEMPT_TIME, KEEPALIVE_TIMEOUT, STALE_SESSION
MAX_CONTENT_SIZE, WIPE_AFTER_TIME};
use cookie;
use interface::{SharedPeer, SharedState, State, UtunPacket, config};
+use message::{Message, Initiation, Response, CookieReply, Transport};
use peer::{Peer, SessionType};
use time::Timestamp;
use timer::{Timer, TimerMessage};
+use std::convert::TryInto;
use std::net::{Ipv6Addr, SocketAddr};
use std::time::Duration;
@@ -108,18 +110,18 @@ impl PeerServer {
}
}
- fn handle_ingress_packet(&mut self, addr: SocketAddr, packet: &[u8]) -> Result<(), Error> {
+ fn handle_ingress_packet(&mut self, addr: SocketAddr, packet: Vec<u8>) -> Result<(), Error> {
trace!("got a UDP packet from {:?} of length {}, packet type {}", &addr, packet.len(), packet[0]);
- match packet[0] {
- 1 => self.handle_ingress_handshake_init(addr, packet),
- 2 => self.handle_ingress_handshake_resp(addr, packet),
- 3 => self.handle_ingress_cookie_reply(addr, packet),
- 4 => self.handle_ingress_transport(addr, packet),
- _ => bail!("unknown wireguard message type")
+
+ match packet.try_into()? {
+ Message::Initiation(packet) => self.handle_ingress_handshake_init(addr, packet),
+ Message::Response(packet) => self.handle_ingress_handshake_resp(addr, packet),
+ Message::CookieReply(packet) => self.handle_ingress_cookie_reply(addr, packet),
+ Message::Transport(packet) => self.handle_ingress_transport(addr, packet),
}
}
- fn handle_ingress_handshake_init(&mut self, addr: SocketAddr, packet: &[u8]) -> Result<(), Error> {
+ fn handle_ingress_handshake_init(&mut self, addr: SocketAddr, packet: Initiation) -> Result<(), Error> {
ensure!(packet.len() == 148, "handshake init packet length is incorrect");
let mut state = self.shared_state.borrow_mut();
{
@@ -131,7 +133,7 @@ impl PeerServer {
let handshake = Peer::process_incoming_handshake(
&state.interface_info.private_key.ok_or_else(|| err_msg("no private key!"))?,
- packet)?;
+ &packet)?;
let peer_ref = state.pubkey_map.get(handshake.their_pubkey())
.ok_or_else(|| err_msg("unknown peer pubkey"))?.clone();
@@ -147,7 +149,7 @@ impl PeerServer {
}
// TODO use the address to update endpoint if it changes i suppose
- fn handle_ingress_handshake_resp(&mut self, _addr: SocketAddr, packet: &[u8]) -> Result<(), Error> {
+ fn handle_ingress_handshake_resp(&mut self, _addr: SocketAddr, packet: Response) -> Result<(), Error> {
ensure!(packet.len() == 92, "handshake resp packet length is incorrect");
let mut state = self.shared_state.borrow_mut();
{
@@ -161,7 +163,7 @@ impl PeerServer {
.ok_or_else(|| format_err!("unknown our_index ({})", our_index))?
.clone();
let mut peer = peer_ref.borrow_mut();
- let dead_index = peer.process_incoming_handshake_response(packet)?;
+ let dead_index = peer.process_incoming_handshake_response(&packet)?;
if let Some(index) = dead_index {
let _ = state.index_map.remove(&index);
}
@@ -180,37 +182,37 @@ impl PeerServer {
}
info!("handshake response received, current session now {}", our_index);
- self.timer.spawn_delayed(*WIPE_AFTER_TIME, TimerMessage::Wipe(peer_ref.clone()));
+ self.timer.send_after(*WIPE_AFTER_TIME, TimerMessage::Wipe(peer_ref.clone()));
match peer.info.keepalive {
Some(keepalive) if keepalive > 0 => {
- self.timer.spawn_delayed(Duration::from_secs(u64::from(keepalive)),
- TimerMessage::PersistentKeepAlive(peer_ref.clone(), our_index));
+ self.timer.send_after(Duration::from_secs(u64::from(keepalive)),
+ TimerMessage::PersistentKeepAlive(peer_ref.clone(), our_index));
},
_ => {
- self.timer.spawn_delayed(*KEEPALIVE_TIMEOUT,
- TimerMessage::PassiveKeepAlive(peer_ref.clone(), our_index));
+ self.timer.send_after(*KEEPALIVE_TIMEOUT,
+ TimerMessage::PassiveKeepAlive(peer_ref.clone(), our_index));
}
}
Ok(())
}
- fn handle_ingress_cookie_reply(&mut self, _addr: SocketAddr, packet: &[u8]) -> Result<(), Error> {
+ fn handle_ingress_cookie_reply(&mut self, _addr: SocketAddr, packet: CookieReply) -> Result<(), Error> {
let state = self.shared_state.borrow_mut();
- let our_index = LittleEndian::read_u32(&packet[4..]);
- let peer_ref = state.index_map.get(&our_index).ok_or_else(|| err_msg("unknown our_index"))?.clone();
+ let peer_ref = state.index_map.get(&packet.our_index()).ok_or_else(|| err_msg("unknown our_index"))?.clone();
let mut peer = peer_ref.borrow_mut();
- peer.consume_cookie_reply(packet)
+ peer.consume_cookie_reply(&packet)
}
- fn handle_ingress_transport(&mut self, addr: SocketAddr, packet: &[u8]) -> Result<(), Error> {
- let our_index = LittleEndian::read_u32(&packet[4..]);
- let peer_ref = self.shared_state.borrow().index_map.get(&our_index).ok_or_else(|| err_msg("unknown our_index"))?.clone();
- let (raw_packet, needs_handshake) = {
+ fn handle_ingress_transport(&mut self, addr: SocketAddr, packet: Transport) -> Result<(), Error> {
+ let peer_ref = self.shared_state.borrow().index_map.get(&packet.our_index())
+ .ok_or_else(|| err_msg("unknown our_index"))?.clone();
+
+ let (raw_packet, needs_handshake) = {
let mut peer = peer_ref.borrow_mut();
let mut state = self.shared_state.borrow_mut();
- let (raw_packet, transition) = peer.handle_incoming_transport(addr, packet)?;
+ let (raw_packet, transition) = peer.handle_incoming_transport(addr, &packet)?;
// If a new session has been set to current (TODO make this more clear)
if let Some(possible_dead_index) = transition {
@@ -228,8 +230,8 @@ impl PeerServer {
}
let our_new_index = peer.sessions.current.as_ref().unwrap().our_index;
- self.timer.spawn_delayed(*KEEPALIVE_TIMEOUT, TimerMessage::PassiveKeepAlive(peer_ref.clone(), our_new_index));
- self.timer.spawn_delayed(*WIPE_AFTER_TIME, TimerMessage::Wipe(peer_ref.clone()));
+ self.timer.send_after(*KEEPALIVE_TIMEOUT, TimerMessage::PassiveKeepAlive(peer_ref.clone(), our_new_index));
+ self.timer.send_after(*WIPE_AFTER_TIME, TimerMessage::Wipe(peer_ref.clone()));
}
(raw_packet, peer.needs_new_handshake(false))
};
@@ -296,7 +298,7 @@ impl PeerServer {
self.send_to_peer((endpoint, init_packet))?;
peer.last_sent_init = Timestamp::now();
let when = *REKEY_TIMEOUT;
- self.timer.spawn_delayed(when, TimerMessage::Rekey(peer_ref.clone(), new_index));
+ self.timer.send_after(when, TimerMessage::Rekey(peer_ref.clone(), new_index));
Ok(new_index)
}
@@ -311,7 +313,7 @@ impl PeerServer {
Some((_, SessionType::Next)) => {
if peer.last_sent_init.elapsed() < *REKEY_TIMEOUT {
let wait = *REKEY_TIMEOUT - peer.last_sent_init.elapsed();
- self.timer.spawn_delayed(wait, Rekey(peer_ref.clone(), our_index));
+ self.timer.send_after(wait, Rekey(peer_ref.clone(), our_index));
bail!("too soon since last init sent, waiting {:?} ({})", wait, our_index);
} else if peer.last_tun_queue.elapsed() > *REKEY_ATTEMPT_TIME {
peer.sessions.next = None;
@@ -322,7 +324,7 @@ impl PeerServer {
let since_last_recv = peer.sessions.current.as_ref().unwrap().last_received.elapsed(); // gross
if since_last_recv <= *STALE_SESSION_TIMEOUT {
let wait = *STALE_SESSION_TIMEOUT - since_last_recv;
- self.timer.spawn_delayed(wait, Rekey(peer_ref.clone(), our_index));
+ self.timer.send_after(wait, Rekey(peer_ref.clone(), our_index));
bail!("rekey tick (waiting ~{}s due to stale session check)", wait.as_secs());
}
},
@@ -343,14 +345,14 @@ impl PeerServer {
let since_last_send = session.last_sent.elapsed();
if since_last_recv < *KEEPALIVE_TIMEOUT {
let wait = *KEEPALIVE_TIMEOUT - since_last_recv;
- self.timer.spawn_delayed(wait, PassiveKeepAlive(peer_ref.clone(), our_index));
+ self.timer.send_after(wait, PassiveKeepAlive(peer_ref.clone(), our_index));
bail!("passive keepalive tick (waiting ~{}s due to last recv time)", wait.as_secs());
} else if since_last_send < *KEEPALIVE_TIMEOUT {
let wait = *KEEPALIVE_TIMEOUT - since_last_send;
- self.timer.spawn_delayed(wait, PassiveKeepAlive(peer_ref.clone(), our_index));
+ self.timer.send_after(wait, PassiveKeepAlive(peer_ref.clone(), our_index));
bail!("passive keepalive tick (waiting ~{}s due to last send time)", wait.as_secs());
} else if session.keepalive_sent {
- self.timer.spawn_delayed(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone(), our_index));
+ self.timer.send_after(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone(), our_index));
bail!("passive keepalive already sent (waiting ~{}s to see if session survives)", KEEPALIVE_TIMEOUT.as_secs());
} else {
session.keepalive_sent = true;
@@ -360,7 +362,7 @@ impl PeerServer {
self.send_to_peer(peer.handle_outgoing_transport(&[])?)?;
debug!("sent passive keepalive packet ({})", our_index);
- self.timer.spawn_delayed(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone(), our_index));
+ self.timer.send_after(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone(), our_index));
},
PersistentKeepAlive(peer_ref, our_index) => {
let mut peer = peer_ref.borrow_mut();
@@ -373,8 +375,8 @@ impl PeerServer {
debug!("sent persistent keepalive packet ({})", our_index);
if let Some(keepalive) = peer.info.keepalive {
- self.timer.spawn_delayed(Duration::from_secs(u64::from(keepalive)),
- PersistentKeepAlive(peer_ref.clone(), our_index));
+ self.timer.send_after(Duration::from_secs(u64::from(keepalive)),
+ PersistentKeepAlive(peer_ref.clone(), our_index));
}
},
@@ -444,7 +446,7 @@ impl Future for PeerServer {
loop {
match self.udp.as_mut().unwrap().ingress.poll() {
Ok(Async::Ready(Some((addr, packet)))) => {
- let _ = self.handle_ingress_packet(addr, &packet).map_err(|e| warn!("UDP ERR: {:?}", e));
+ let _ = self.handle_ingress_packet(addr, packet).map_err(|e| warn!("UDP ERR: {:?}", e));
},
Ok(Async::NotReady) => break,
Ok(Async::Ready(None)) | Err(_) => return Err(()),