aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJake McGinty <me@jake.su>2018-03-08 20:20:37 +0000
committerJake McGinty <me@jake.su>2018-03-08 21:05:51 +0000
commitac2e9275b6c3d6ddd16e616d24f86a055a9ad461 (patch)
tree08a5e899d4453f595c27d0cfd9731bba84640cc7
parentpeer: send only one of persistent or passive keepalive, not both (diff)
downloadwireguard-rs-ac2e9275b6c3d6ddd16e616d24f86a055a9ad461.tar.xz
wireguard-rs-ac2e9275b6c3d6ddd16e616d24f86a055a9ad461.zip
messages: strongly typed messages
-rw-r--r--Cargo.lock11
-rw-r--r--Cargo.toml1
-rw-r--r--src/cookie.rs9
-rw-r--r--src/interface/peer_server.rs76
-rw-r--r--src/lib.rs31
-rw-r--r--src/message.rs154
-rw-r--r--src/peer.rs27
-rw-r--r--src/timer.rs5
8 files changed, 240 insertions, 74 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 3505581..3bb3685 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -265,6 +265,15 @@ dependencies = [
]
[[package]]
+name = "derive_deref"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "quote 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)",
+ "syn 0.11.11 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
name = "digest"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1295,6 +1304,7 @@ dependencies = [
"colored 1.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
"criterion 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"daemonize 0.2.3 (git+https://github.com/mcginty/daemonize)",
+ "derive_deref 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"failure 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"fern 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -1401,6 +1411,7 @@ dependencies = [
"checksum crossbeam-utils 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "2760899e32a1d58d5abb31129f8fae5de75220bc2176e77ff7c627ae45c918d9"
"checksum curve25519-dalek 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4576702012648a8d7331c0ebb1a41a13723ef8d5bfc704a7ab4175a02e38906e"
"checksum daemonize 0.2.3 (git+https://github.com/mcginty/daemonize)" = "<none>"
+"checksum derive_deref 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "75977fd13de1d8a2b0db58cb124e8f1e63cf233917fea33065e8b214d1eccdb8"
"checksum digest 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e5b29bf156f3f4b3c4f610a25ff69370616ae6e0657d416de22645483e72af0a"
"checksum dtoa 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "09c3753c3db574d215cba4ea76018483895d7bff25a31b49ba45db21c48e50ab"
"checksum either 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "740178ddf48b1a9e878e6d6509a1442a2d42fd2928aae8e7a6f8a36fb01981b3"
diff --git a/Cargo.toml b/Cargo.toml
index f61f898..537314e 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -34,6 +34,7 @@ blake2-rfc = "0.2"
byteorder = "^1.2"
bytes = "0.4"
chacha20-poly1305-aead = "^0.1"
+derive_deref = "^1.0"
failure = "^0.1"
futures = "^0.1"
lazy_static = "^1"
diff --git a/src/cookie.rs b/src/cookie.rs
index cc28994..f6063ee 100644
--- a/src/cookie.rs
+++ b/src/cookie.rs
@@ -3,6 +3,7 @@
use blake2_rfc::blake2s::{blake2s, Blake2sResult};
use xchacha20poly1305;
use consts::COOKIE_REFRESH_TIME;
+use message::CookieReply;
use failure::{Error, err_msg};
use rand::{self, Rng};
use subtle;
@@ -97,14 +98,14 @@ impl Generator {
}
}
- pub fn consume_reply(&mut self, reply: &[u8]) -> Result<(), Error> {
+ pub fn consume_reply(&mut self, reply: &CookieReply) -> Result<(), Error> {
let last_mac1 = self.mac2.last_mac1.ok_or_else(|| err_msg("no last mac1"))?;
xchacha20poly1305::decrypt(self.mac2.key.as_bytes(),
- &reply[8..32],
- &reply[32..48],
+ reply.nonce(),
+ reply.cookie(),
last_mac1.as_bytes(),
- &reply[48..],
+ reply.aead_tag(),
&mut self.mac2.cookie)?;
self.mac2.cookie_time = Some(Instant::now());
diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs
index 3ab4a37..534eb1b 100644
--- a/src/interface/peer_server.rs
+++ b/src/interface/peer_server.rs
@@ -2,10 +2,12 @@ use consts::{REKEY_TIMEOUT, REKEY_ATTEMPT_TIME, KEEPALIVE_TIMEOUT, STALE_SESSION
MAX_CONTENT_SIZE, WIPE_AFTER_TIME};
use cookie;
use interface::{SharedPeer, SharedState, State, UtunPacket, config};
+use message::{Message, Initiation, Response, CookieReply, Transport};
use peer::{Peer, SessionType};
use time::Timestamp;
use timer::{Timer, TimerMessage};
+use std::convert::TryInto;
use std::net::{Ipv6Addr, SocketAddr};
use std::time::Duration;
@@ -108,18 +110,18 @@ impl PeerServer {
}
}
- fn handle_ingress_packet(&mut self, addr: SocketAddr, packet: &[u8]) -> Result<(), Error> {
+ fn handle_ingress_packet(&mut self, addr: SocketAddr, packet: Vec<u8>) -> Result<(), Error> {
trace!("got a UDP packet from {:?} of length {}, packet type {}", &addr, packet.len(), packet[0]);
- match packet[0] {
- 1 => self.handle_ingress_handshake_init(addr, packet),
- 2 => self.handle_ingress_handshake_resp(addr, packet),
- 3 => self.handle_ingress_cookie_reply(addr, packet),
- 4 => self.handle_ingress_transport(addr, packet),
- _ => bail!("unknown wireguard message type")
+
+ match packet.try_into()? {
+ Message::Initiation(packet) => self.handle_ingress_handshake_init(addr, packet),
+ Message::Response(packet) => self.handle_ingress_handshake_resp(addr, packet),
+ Message::CookieReply(packet) => self.handle_ingress_cookie_reply(addr, packet),
+ Message::Transport(packet) => self.handle_ingress_transport(addr, packet),
}
}
- fn handle_ingress_handshake_init(&mut self, addr: SocketAddr, packet: &[u8]) -> Result<(), Error> {
+ fn handle_ingress_handshake_init(&mut self, addr: SocketAddr, packet: Initiation) -> Result<(), Error> {
ensure!(packet.len() == 148, "handshake init packet length is incorrect");
let mut state = self.shared_state.borrow_mut();
{
@@ -131,7 +133,7 @@ impl PeerServer {
let handshake = Peer::process_incoming_handshake(
&state.interface_info.private_key.ok_or_else(|| err_msg("no private key!"))?,
- packet)?;
+ &packet)?;
let peer_ref = state.pubkey_map.get(handshake.their_pubkey())
.ok_or_else(|| err_msg("unknown peer pubkey"))?.clone();
@@ -147,7 +149,7 @@ impl PeerServer {
}
// TODO use the address to update endpoint if it changes i suppose
- fn handle_ingress_handshake_resp(&mut self, _addr: SocketAddr, packet: &[u8]) -> Result<(), Error> {
+ fn handle_ingress_handshake_resp(&mut self, _addr: SocketAddr, packet: Response) -> Result<(), Error> {
ensure!(packet.len() == 92, "handshake resp packet length is incorrect");
let mut state = self.shared_state.borrow_mut();
{
@@ -161,7 +163,7 @@ impl PeerServer {
.ok_or_else(|| format_err!("unknown our_index ({})", our_index))?
.clone();
let mut peer = peer_ref.borrow_mut();
- let dead_index = peer.process_incoming_handshake_response(packet)?;
+ let dead_index = peer.process_incoming_handshake_response(&packet)?;
if let Some(index) = dead_index {
let _ = state.index_map.remove(&index);
}
@@ -180,37 +182,37 @@ impl PeerServer {
}
info!("handshake response received, current session now {}", our_index);
- self.timer.spawn_delayed(*WIPE_AFTER_TIME, TimerMessage::Wipe(peer_ref.clone()));
+ self.timer.send_after(*WIPE_AFTER_TIME, TimerMessage::Wipe(peer_ref.clone()));
match peer.info.keepalive {
Some(keepalive) if keepalive > 0 => {
- self.timer.spawn_delayed(Duration::from_secs(u64::from(keepalive)),
- TimerMessage::PersistentKeepAlive(peer_ref.clone(), our_index));
+ self.timer.send_after(Duration::from_secs(u64::from(keepalive)),
+ TimerMessage::PersistentKeepAlive(peer_ref.clone(), our_index));
},
_ => {
- self.timer.spawn_delayed(*KEEPALIVE_TIMEOUT,
- TimerMessage::PassiveKeepAlive(peer_ref.clone(), our_index));
+ self.timer.send_after(*KEEPALIVE_TIMEOUT,
+ TimerMessage::PassiveKeepAlive(peer_ref.clone(), our_index));
}
}
Ok(())
}
- fn handle_ingress_cookie_reply(&mut self, _addr: SocketAddr, packet: &[u8]) -> Result<(), Error> {
+ fn handle_ingress_cookie_reply(&mut self, _addr: SocketAddr, packet: CookieReply) -> Result<(), Error> {
let state = self.shared_state.borrow_mut();
- let our_index = LittleEndian::read_u32(&packet[4..]);
- let peer_ref = state.index_map.get(&our_index).ok_or_else(|| err_msg("unknown our_index"))?.clone();
+ let peer_ref = state.index_map.get(&packet.our_index()).ok_or_else(|| err_msg("unknown our_index"))?.clone();
let mut peer = peer_ref.borrow_mut();
- peer.consume_cookie_reply(packet)
+ peer.consume_cookie_reply(&packet)
}
- fn handle_ingress_transport(&mut self, addr: SocketAddr, packet: &[u8]) -> Result<(), Error> {
- let our_index = LittleEndian::read_u32(&packet[4..]);
- let peer_ref = self.shared_state.borrow().index_map.get(&our_index).ok_or_else(|| err_msg("unknown our_index"))?.clone();
- let (raw_packet, needs_handshake) = {
+ fn handle_ingress_transport(&mut self, addr: SocketAddr, packet: Transport) -> Result<(), Error> {
+ let peer_ref = self.shared_state.borrow().index_map.get(&packet.our_index())
+ .ok_or_else(|| err_msg("unknown our_index"))?.clone();
+
+ let (raw_packet, needs_handshake) = {
let mut peer = peer_ref.borrow_mut();
let mut state = self.shared_state.borrow_mut();
- let (raw_packet, transition) = peer.handle_incoming_transport(addr, packet)?;
+ let (raw_packet, transition) = peer.handle_incoming_transport(addr, &packet)?;
// If a new session has been set to current (TODO make this more clear)
if let Some(possible_dead_index) = transition {
@@ -228,8 +230,8 @@ impl PeerServer {
}
let our_new_index = peer.sessions.current.as_ref().unwrap().our_index;
- self.timer.spawn_delayed(*KEEPALIVE_TIMEOUT, TimerMessage::PassiveKeepAlive(peer_ref.clone(), our_new_index));
- self.timer.spawn_delayed(*WIPE_AFTER_TIME, TimerMessage::Wipe(peer_ref.clone()));
+ self.timer.send_after(*KEEPALIVE_TIMEOUT, TimerMessage::PassiveKeepAlive(peer_ref.clone(), our_new_index));
+ self.timer.send_after(*WIPE_AFTER_TIME, TimerMessage::Wipe(peer_ref.clone()));
}
(raw_packet, peer.needs_new_handshake(false))
};
@@ -296,7 +298,7 @@ impl PeerServer {
self.send_to_peer((endpoint, init_packet))?;
peer.last_sent_init = Timestamp::now();
let when = *REKEY_TIMEOUT;
- self.timer.spawn_delayed(when, TimerMessage::Rekey(peer_ref.clone(), new_index));
+ self.timer.send_after(when, TimerMessage::Rekey(peer_ref.clone(), new_index));
Ok(new_index)
}
@@ -311,7 +313,7 @@ impl PeerServer {
Some((_, SessionType::Next)) => {
if peer.last_sent_init.elapsed() < *REKEY_TIMEOUT {
let wait = *REKEY_TIMEOUT - peer.last_sent_init.elapsed();
- self.timer.spawn_delayed(wait, Rekey(peer_ref.clone(), our_index));
+ self.timer.send_after(wait, Rekey(peer_ref.clone(), our_index));
bail!("too soon since last init sent, waiting {:?} ({})", wait, our_index);
} else if peer.last_tun_queue.elapsed() > *REKEY_ATTEMPT_TIME {
peer.sessions.next = None;
@@ -322,7 +324,7 @@ impl PeerServer {
let since_last_recv = peer.sessions.current.as_ref().unwrap().last_received.elapsed(); // gross
if since_last_recv <= *STALE_SESSION_TIMEOUT {
let wait = *STALE_SESSION_TIMEOUT - since_last_recv;
- self.timer.spawn_delayed(wait, Rekey(peer_ref.clone(), our_index));
+ self.timer.send_after(wait, Rekey(peer_ref.clone(), our_index));
bail!("rekey tick (waiting ~{}s due to stale session check)", wait.as_secs());
}
},
@@ -343,14 +345,14 @@ impl PeerServer {
let since_last_send = session.last_sent.elapsed();
if since_last_recv < *KEEPALIVE_TIMEOUT {
let wait = *KEEPALIVE_TIMEOUT - since_last_recv;
- self.timer.spawn_delayed(wait, PassiveKeepAlive(peer_ref.clone(), our_index));
+ self.timer.send_after(wait, PassiveKeepAlive(peer_ref.clone(), our_index));
bail!("passive keepalive tick (waiting ~{}s due to last recv time)", wait.as_secs());
} else if since_last_send < *KEEPALIVE_TIMEOUT {
let wait = *KEEPALIVE_TIMEOUT - since_last_send;
- self.timer.spawn_delayed(wait, PassiveKeepAlive(peer_ref.clone(), our_index));
+ self.timer.send_after(wait, PassiveKeepAlive(peer_ref.clone(), our_index));
bail!("passive keepalive tick (waiting ~{}s due to last send time)", wait.as_secs());
} else if session.keepalive_sent {
- self.timer.spawn_delayed(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone(), our_index));
+ self.timer.send_after(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone(), our_index));
bail!("passive keepalive already sent (waiting ~{}s to see if session survives)", KEEPALIVE_TIMEOUT.as_secs());
} else {
session.keepalive_sent = true;
@@ -360,7 +362,7 @@ impl PeerServer {
self.send_to_peer(peer.handle_outgoing_transport(&[])?)?;
debug!("sent passive keepalive packet ({})", our_index);
- self.timer.spawn_delayed(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone(), our_index));
+ self.timer.send_after(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone(), our_index));
},
PersistentKeepAlive(peer_ref, our_index) => {
let mut peer = peer_ref.borrow_mut();
@@ -373,8 +375,8 @@ impl PeerServer {
debug!("sent persistent keepalive packet ({})", our_index);
if let Some(keepalive) = peer.info.keepalive {
- self.timer.spawn_delayed(Duration::from_secs(u64::from(keepalive)),
- PersistentKeepAlive(peer_ref.clone(), our_index));
+ self.timer.send_after(Duration::from_secs(u64::from(keepalive)),
+ PersistentKeepAlive(peer_ref.clone(), our_index));
}
},
@@ -444,7 +446,7 @@ impl Future for PeerServer {
loop {
match self.udp.as_mut().unwrap().ingress.poll() {
Ok(Async::Ready(Some((addr, packet)))) => {
- let _ = self.handle_ingress_packet(addr, &packet).map_err(|e| warn!("UDP ERR: {:?}", e));
+ let _ = self.handle_ingress_packet(addr, packet).map_err(|e| warn!("UDP ERR: {:?}", e));
},
Ok(Async::NotReady) => break,
Ok(Async::Ready(None)) | Err(_) => return Err(()),
diff --git a/src/lib.rs b/src/lib.rs
index 7ff0944..76304dc 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,11 +1,13 @@
#![feature(ip_constructors)]
#![feature(try_trait)]
+#![feature(try_from)]
#![feature(test)]
#![cfg_attr(feature = "cargo-clippy", allow(doc_markdown))]
#![cfg_attr(feature = "cargo-clippy", allow(unreadable_literal))]
#![cfg_attr(feature = "cargo-clippy", allow(decimal_literal_representation))]
+#[macro_use] extern crate derive_deref;
#[macro_use] extern crate failure;
#[macro_use] extern crate futures;
#[macro_use] extern crate lazy_static;
@@ -35,18 +37,19 @@ extern crate tokio_signal;
extern crate treebitmap;
extern crate x25519_dalek;
-mod udp;
-
-pub mod consts;
-pub mod cookie;
-pub mod error;
pub mod interface;
-pub mod noise;
-pub mod peer;
-pub mod types;
-pub mod anti_replay;
-pub mod router;
-pub mod time;
-pub mod timer;
-pub mod ip_packet;
-pub mod xchacha20poly1305;
+
+mod udp;
+mod message;
+mod consts;
+mod cookie;
+mod error;
+mod noise;
+mod peer;
+mod types;
+mod anti_replay;
+mod router;
+mod time;
+mod timer;
+mod ip_packet;
+mod xchacha20poly1305;
diff --git a/src/message.rs b/src/message.rs
new file mode 100644
index 0000000..94104c3
--- /dev/null
+++ b/src/message.rs
@@ -0,0 +1,154 @@
+#![allow(unused)]
+
+use failure::Error;
+use std::convert::{TryFrom, TryInto};
+use byteorder::{ByteOrder, LittleEndian};
+
+pub enum Message {
+ Initiation(Initiation),
+ Response(Response),
+ CookieReply(CookieReply),
+ Transport(Transport),
+}
+
+impl TryFrom<Vec<u8>> for Message {
+ type Error = Error;
+
+ fn try_from(packet: Vec<u8>) -> Result<Self, Self::Error> {
+ Ok(match packet[0] {
+ 1 => Message::Initiation(packet.try_into()?),
+ 2 => Message::Response(packet.try_into()?),
+ 3 => Message::CookieReply(packet.try_into()?),
+ 4 => Message::Transport(packet.try_into()?),
+ _ => bail!("unknown wireguard message type")
+ })
+ }
+}
+
+#[derive(Deref)]
+pub struct Initiation(Vec<u8>);
+
+impl Initiation {
+ pub fn their_index(&self) -> u32 {
+ LittleEndian::read_u32(&self[4..])
+ }
+
+ pub fn noise_bytes(&self) -> &[u8] {
+ &self[8..116]
+ }
+
+ pub fn as_bytes(&self) -> &[u8] {
+ &self
+ }
+}
+
+impl TryFrom<Vec<u8>> for Initiation {
+ type Error = Error;
+
+ fn try_from(packet: Vec<u8>) -> Result<Self, Self::Error> {
+ ensure!(packet.len() == 148, "incorrect handshake initiation packet length.");
+ Ok(Initiation(packet))
+ }
+}
+
+#[derive(Deref)]
+pub struct Response(Vec<u8>);
+
+impl Response {
+ pub fn their_index(&self) -> u32 {
+ LittleEndian::read_u32(&self[4..])
+ }
+
+ pub fn our_index(&self) -> u32 {
+ LittleEndian::read_u32(&self[8..])
+ }
+
+ pub fn noise_bytes(&self) -> &[u8] {
+ &self[12..60]
+ }
+
+ pub fn mac1(&self) -> &[u8] {
+ &self[60..76]
+ }
+
+ pub fn mac2(&self) -> &[u8] {
+ &self[76..92]
+ }
+
+ pub fn as_bytes(&self) -> &[u8] {
+ &self
+ }
+}
+
+impl TryFrom<Vec<u8>> for Response {
+ type Error = Error;
+
+ fn try_from(packet: Vec<u8>) -> Result<Self, Self::Error> {
+ ensure!(packet.len() == 92, "incorrect handshake response packet length.");
+ Ok(Response(packet))
+ }
+}
+
+#[derive(Deref)]
+pub struct CookieReply(Vec<u8>);
+
+impl CookieReply {
+ pub fn our_index(&self) -> u32 {
+ LittleEndian::read_u32(&self[4..])
+ }
+
+ pub fn nonce(&self) -> &[u8] {
+ &self[8..32]
+ }
+
+ pub fn cookie(&self) -> &[u8] {
+ &self[32..48]
+ }
+
+ pub fn aead_tag(&self) -> &[u8] {
+ &self[48..64]
+ }
+
+ pub fn as_bytes(&self) -> &[u8] {
+ &self
+ }
+}
+
+impl TryFrom<Vec<u8>> for CookieReply {
+ type Error = Error;
+
+ fn try_from(packet: Vec<u8>) -> Result<Self, Self::Error> {
+ ensure!(packet.len() == 64, "incorrect cookie reply packet length.");
+ Ok(CookieReply(packet))
+ }
+}
+
+#[derive(Deref)]
+pub struct Transport(Vec<u8>);
+
+impl Transport {
+ pub fn our_index(&self) -> u32 {
+ LittleEndian::read_u32(&self[4..])
+ }
+
+ pub fn nonce(&self) -> u64 {
+ LittleEndian::read_u64(&self[8..16])
+ }
+
+ pub fn payload(&self) -> &[u8] {
+ &self[16..]
+ }
+
+ pub fn as_bytes(&self) -> &[u8] {
+ &self
+ }
+}
+
+impl TryFrom<Vec<u8>> for Transport {
+ type Error = Error;
+
+ fn try_from(packet: Vec<u8>) -> Result<Self, Self::Error> {
+ ensure!(packet.len() >= 32, "transport message smaller than minimum length.");
+ Ok(Transport(packet))
+ }
+}
diff --git a/src/peer.rs b/src/peer.rs
index e329886..41a1e34 100644
--- a/src/peer.rs
+++ b/src/peer.rs
@@ -7,6 +7,7 @@ use failure::{Error, err_msg};
use interface::UtunPacket;
use ip_packet::IpPacket;
use noise;
+use message::{Initiation, Response, CookieReply, Transport};
use std::{self, mem};
use std::collections::VecDeque;
use std::fmt::{self, Debug, Display, Formatter};
@@ -229,16 +230,15 @@ impl Peer {
Ok((endpoint, packet, dead_index))
}
- pub fn process_incoming_handshake(private_key: &[u8], packet: &[u8]) -> Result<IncompleteIncomingHandshake, Error> {
+ pub fn process_incoming_handshake(private_key: &[u8], packet: &Initiation) -> Result<IncompleteIncomingHandshake, Error> {
let mut timestamp = [0u8; 12];
let mut noise = noise::build_responder(private_key)?;
- let their_index = LittleEndian::read_u32(&packet[4..]);
- let len = noise.read_message(&packet[8..116], &mut timestamp)?;
+ let len = noise.read_message(packet.noise_bytes(), &mut timestamp)?;
ensure!(len == 12, "incorrect handshake payload length");
let timestamp = timestamp.into();
- Ok(IncompleteIncomingHandshake { their_index, timestamp, noise })
+ Ok(IncompleteIncomingHandshake { their_index: packet.their_index(), timestamp, noise })
}
/// Takes a new handshake packet (type 0x01), updates the internal peer state,
@@ -285,17 +285,15 @@ impl Peer {
Ok(packet)
}
- pub fn consume_cookie_reply(&mut self, reply: &[u8]) -> Result<(), Error> {
+ pub fn consume_cookie_reply(&mut self, reply: &CookieReply) -> Result<(), Error> {
self.cookie.consume_reply(reply)
}
- pub fn process_incoming_handshake_response(&mut self, packet: &[u8]) -> Result<Option<u32>, Error> {
- let their_index = LittleEndian::read_u32(&packet[4..]);
+ pub fn process_incoming_handshake_response(&mut self, packet: &Response) -> Result<Option<u32>, Error> {
let mut session = mem::replace(&mut self.sessions.next, None).ok_or_else(|| err_msg("no next session"))?;
- let _ = session.noise.read_message(&packet[12..60], &mut [])?;
-
+ let _ = session.noise.read_message(&packet.noise_bytes(), &mut [])?;
session = session.into_transport_mode()?;
- session.their_index = their_index;
+ session.their_index = packet.their_index();
session.birthday = Timestamp::now();
self.last_handshake = Timestamp::now();
@@ -305,22 +303,21 @@ impl Peer {
Ok(dead.map(|session| session.our_index))
}
- pub fn handle_incoming_transport(&mut self, addr: SocketAddr, packet: &[u8])
+ pub fn handle_incoming_transport(&mut self, addr: SocketAddr, packet: &Transport)
-> Result<(Vec<u8>, Option<Option<u32>>), Error> {
- let our_index = LittleEndian::read_u32(&packet[4..]);
- let nonce = LittleEndian::read_u64(&packet[8..]);
let mut raw_packet = vec![0u8; packet.len()];
+ let nonce = packet.nonce();
let session_type = {
- let (session, session_type) = self.find_session(our_index).ok_or_else(|| err_msg("no session with index"))?;
+ let (session, session_type) = self.find_session(packet.our_index()).ok_or_else(|| err_msg("no session with index"))?;
ensure!(session.noise.is_handshake_finished(), "session is not ready for transport packets");
ensure!(nonce < REJECT_AFTER_MESSAGES, "exceeded REJECT-AFTER-MESSAGES");
ensure!(session.birthday.elapsed() < *REJECT_AFTER_TIME, "exceeded REJECT-AFTER-TIME");
session.anti_replay.update(nonce)?;
session.noise.set_receiving_nonce(nonce)?;
- let len = session.noise.read_message(&packet[16..], &mut raw_packet)?;
+ let len = session.noise.read_message(packet.payload(), &mut raw_packet)?;
if len > 0 {
let len = IpPacket::new(&raw_packet[..len])
.ok_or_else(||format_err!("invalid IP packet (len {})", len))?
diff --git a/src/timer.rs b/src/timer.rs
index 7db3b2e..bbf80f3 100644
--- a/src/timer.rs
+++ b/src/timer.rs
@@ -30,7 +30,7 @@ impl Timer {
Self { handle, timer, tx, rx }
}
- pub fn spawn_delayed(&mut self, delay: Duration, message: TimerMessage) {
+ pub fn send_after(&mut self, delay: Duration, message: TimerMessage) {
trace!("queuing timer message {:?}", &message);
let timer = self.timer.sleep(delay + (*TIMER_RESOLUTION * 2));
let future = timer.and_then({
@@ -42,9 +42,6 @@ impl Timer {
self.handle.spawn(future);
}
- pub fn spawn_immediately(&mut self, message: TimerMessage) {
- self.handle.spawn(self.tx.clone().send(message).then(|_| Ok(())));
- }
}
impl Stream for Timer {