From ac2e9275b6c3d6ddd16e616d24f86a055a9ad461 Mon Sep 17 00:00:00 2001 From: Jake McGinty Date: Thu, 8 Mar 2018 20:20:37 +0000 Subject: messages: strongly typed messages --- Cargo.lock | 11 ++++ Cargo.toml | 1 + src/cookie.rs | 9 +-- src/interface/peer_server.rs | 76 ++++++++++----------- src/lib.rs | 31 +++++---- src/message.rs | 154 +++++++++++++++++++++++++++++++++++++++++++ src/peer.rs | 27 ++++---- src/timer.rs | 5 +- 8 files changed, 240 insertions(+), 74 deletions(-) create mode 100644 src/message.rs diff --git a/Cargo.lock b/Cargo.lock index 3505581..3bb3685 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -264,6 +264,15 @@ dependencies = [ "libc 0.2.37 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "derive_deref" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "quote 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 0.11.11 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "digest" version = "0.6.2" @@ -1295,6 +1304,7 @@ dependencies = [ "colored 1.6.0 (registry+https://github.com/rust-lang/crates.io-index)", "criterion 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "daemonize 0.2.3 (git+https://github.com/mcginty/daemonize)", + "derive_deref 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "failure 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "fern 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1401,6 +1411,7 @@ dependencies = [ "checksum crossbeam-utils 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "2760899e32a1d58d5abb31129f8fae5de75220bc2176e77ff7c627ae45c918d9" "checksum curve25519-dalek 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4576702012648a8d7331c0ebb1a41a13723ef8d5bfc704a7ab4175a02e38906e" "checksum daemonize 0.2.3 (git+https://github.com/mcginty/daemonize)" = "" +"checksum derive_deref 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "75977fd13de1d8a2b0db58cb124e8f1e63cf233917fea33065e8b214d1eccdb8" "checksum digest 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e5b29bf156f3f4b3c4f610a25ff69370616ae6e0657d416de22645483e72af0a" "checksum dtoa 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "09c3753c3db574d215cba4ea76018483895d7bff25a31b49ba45db21c48e50ab" "checksum either 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "740178ddf48b1a9e878e6d6509a1442a2d42fd2928aae8e7a6f8a36fb01981b3" diff --git a/Cargo.toml b/Cargo.toml index f61f898..537314e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ blake2-rfc = "0.2" byteorder = "^1.2" bytes = "0.4" chacha20-poly1305-aead = "^0.1" +derive_deref = "^1.0" failure = "^0.1" futures = "^0.1" lazy_static = "^1" diff --git a/src/cookie.rs b/src/cookie.rs index cc28994..f6063ee 100644 --- a/src/cookie.rs +++ b/src/cookie.rs @@ -3,6 +3,7 @@ use blake2_rfc::blake2s::{blake2s, Blake2sResult}; use xchacha20poly1305; use consts::COOKIE_REFRESH_TIME; +use message::CookieReply; use failure::{Error, err_msg}; use rand::{self, Rng}; use subtle; @@ -97,14 +98,14 @@ impl Generator { } } - pub fn consume_reply(&mut self, reply: &[u8]) -> Result<(), Error> { + pub fn consume_reply(&mut self, reply: &CookieReply) -> Result<(), Error> { let last_mac1 = self.mac2.last_mac1.ok_or_else(|| err_msg("no last mac1"))?; xchacha20poly1305::decrypt(self.mac2.key.as_bytes(), - &reply[8..32], - &reply[32..48], + reply.nonce(), + reply.cookie(), last_mac1.as_bytes(), - &reply[48..], + reply.aead_tag(), &mut self.mac2.cookie)?; self.mac2.cookie_time = Some(Instant::now()); diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs index 3ab4a37..534eb1b 100644 --- a/src/interface/peer_server.rs +++ b/src/interface/peer_server.rs @@ -2,10 +2,12 @@ use consts::{REKEY_TIMEOUT, REKEY_ATTEMPT_TIME, KEEPALIVE_TIMEOUT, STALE_SESSION MAX_CONTENT_SIZE, WIPE_AFTER_TIME}; use cookie; use interface::{SharedPeer, SharedState, State, UtunPacket, config}; +use message::{Message, Initiation, Response, CookieReply, Transport}; use peer::{Peer, SessionType}; use time::Timestamp; use timer::{Timer, TimerMessage}; +use std::convert::TryInto; use std::net::{Ipv6Addr, SocketAddr}; use std::time::Duration; @@ -108,18 +110,18 @@ impl PeerServer { } } - fn handle_ingress_packet(&mut self, addr: SocketAddr, packet: &[u8]) -> Result<(), Error> { + fn handle_ingress_packet(&mut self, addr: SocketAddr, packet: Vec) -> Result<(), Error> { trace!("got a UDP packet from {:?} of length {}, packet type {}", &addr, packet.len(), packet[0]); - match packet[0] { - 1 => self.handle_ingress_handshake_init(addr, packet), - 2 => self.handle_ingress_handshake_resp(addr, packet), - 3 => self.handle_ingress_cookie_reply(addr, packet), - 4 => self.handle_ingress_transport(addr, packet), - _ => bail!("unknown wireguard message type") + + match packet.try_into()? { + Message::Initiation(packet) => self.handle_ingress_handshake_init(addr, packet), + Message::Response(packet) => self.handle_ingress_handshake_resp(addr, packet), + Message::CookieReply(packet) => self.handle_ingress_cookie_reply(addr, packet), + Message::Transport(packet) => self.handle_ingress_transport(addr, packet), } } - fn handle_ingress_handshake_init(&mut self, addr: SocketAddr, packet: &[u8]) -> Result<(), Error> { + fn handle_ingress_handshake_init(&mut self, addr: SocketAddr, packet: Initiation) -> Result<(), Error> { ensure!(packet.len() == 148, "handshake init packet length is incorrect"); let mut state = self.shared_state.borrow_mut(); { @@ -131,7 +133,7 @@ impl PeerServer { let handshake = Peer::process_incoming_handshake( &state.interface_info.private_key.ok_or_else(|| err_msg("no private key!"))?, - packet)?; + &packet)?; let peer_ref = state.pubkey_map.get(handshake.their_pubkey()) .ok_or_else(|| err_msg("unknown peer pubkey"))?.clone(); @@ -147,7 +149,7 @@ impl PeerServer { } // TODO use the address to update endpoint if it changes i suppose - fn handle_ingress_handshake_resp(&mut self, _addr: SocketAddr, packet: &[u8]) -> Result<(), Error> { + fn handle_ingress_handshake_resp(&mut self, _addr: SocketAddr, packet: Response) -> Result<(), Error> { ensure!(packet.len() == 92, "handshake resp packet length is incorrect"); let mut state = self.shared_state.borrow_mut(); { @@ -161,7 +163,7 @@ impl PeerServer { .ok_or_else(|| format_err!("unknown our_index ({})", our_index))? .clone(); let mut peer = peer_ref.borrow_mut(); - let dead_index = peer.process_incoming_handshake_response(packet)?; + let dead_index = peer.process_incoming_handshake_response(&packet)?; if let Some(index) = dead_index { let _ = state.index_map.remove(&index); } @@ -180,37 +182,37 @@ impl PeerServer { } info!("handshake response received, current session now {}", our_index); - self.timer.spawn_delayed(*WIPE_AFTER_TIME, TimerMessage::Wipe(peer_ref.clone())); + self.timer.send_after(*WIPE_AFTER_TIME, TimerMessage::Wipe(peer_ref.clone())); match peer.info.keepalive { Some(keepalive) if keepalive > 0 => { - self.timer.spawn_delayed(Duration::from_secs(u64::from(keepalive)), - TimerMessage::PersistentKeepAlive(peer_ref.clone(), our_index)); + self.timer.send_after(Duration::from_secs(u64::from(keepalive)), + TimerMessage::PersistentKeepAlive(peer_ref.clone(), our_index)); }, _ => { - self.timer.spawn_delayed(*KEEPALIVE_TIMEOUT, - TimerMessage::PassiveKeepAlive(peer_ref.clone(), our_index)); + self.timer.send_after(*KEEPALIVE_TIMEOUT, + TimerMessage::PassiveKeepAlive(peer_ref.clone(), our_index)); } } Ok(()) } - fn handle_ingress_cookie_reply(&mut self, _addr: SocketAddr, packet: &[u8]) -> Result<(), Error> { + fn handle_ingress_cookie_reply(&mut self, _addr: SocketAddr, packet: CookieReply) -> Result<(), Error> { let state = self.shared_state.borrow_mut(); - let our_index = LittleEndian::read_u32(&packet[4..]); - let peer_ref = state.index_map.get(&our_index).ok_or_else(|| err_msg("unknown our_index"))?.clone(); + let peer_ref = state.index_map.get(&packet.our_index()).ok_or_else(|| err_msg("unknown our_index"))?.clone(); let mut peer = peer_ref.borrow_mut(); - peer.consume_cookie_reply(packet) + peer.consume_cookie_reply(&packet) } - fn handle_ingress_transport(&mut self, addr: SocketAddr, packet: &[u8]) -> Result<(), Error> { - let our_index = LittleEndian::read_u32(&packet[4..]); - let peer_ref = self.shared_state.borrow().index_map.get(&our_index).ok_or_else(|| err_msg("unknown our_index"))?.clone(); - let (raw_packet, needs_handshake) = { + fn handle_ingress_transport(&mut self, addr: SocketAddr, packet: Transport) -> Result<(), Error> { + let peer_ref = self.shared_state.borrow().index_map.get(&packet.our_index()) + .ok_or_else(|| err_msg("unknown our_index"))?.clone(); + + let (raw_packet, needs_handshake) = { let mut peer = peer_ref.borrow_mut(); let mut state = self.shared_state.borrow_mut(); - let (raw_packet, transition) = peer.handle_incoming_transport(addr, packet)?; + let (raw_packet, transition) = peer.handle_incoming_transport(addr, &packet)?; // If a new session has been set to current (TODO make this more clear) if let Some(possible_dead_index) = transition { @@ -228,8 +230,8 @@ impl PeerServer { } let our_new_index = peer.sessions.current.as_ref().unwrap().our_index; - self.timer.spawn_delayed(*KEEPALIVE_TIMEOUT, TimerMessage::PassiveKeepAlive(peer_ref.clone(), our_new_index)); - self.timer.spawn_delayed(*WIPE_AFTER_TIME, TimerMessage::Wipe(peer_ref.clone())); + self.timer.send_after(*KEEPALIVE_TIMEOUT, TimerMessage::PassiveKeepAlive(peer_ref.clone(), our_new_index)); + self.timer.send_after(*WIPE_AFTER_TIME, TimerMessage::Wipe(peer_ref.clone())); } (raw_packet, peer.needs_new_handshake(false)) }; @@ -296,7 +298,7 @@ impl PeerServer { self.send_to_peer((endpoint, init_packet))?; peer.last_sent_init = Timestamp::now(); let when = *REKEY_TIMEOUT; - self.timer.spawn_delayed(when, TimerMessage::Rekey(peer_ref.clone(), new_index)); + self.timer.send_after(when, TimerMessage::Rekey(peer_ref.clone(), new_index)); Ok(new_index) } @@ -311,7 +313,7 @@ impl PeerServer { Some((_, SessionType::Next)) => { if peer.last_sent_init.elapsed() < *REKEY_TIMEOUT { let wait = *REKEY_TIMEOUT - peer.last_sent_init.elapsed(); - self.timer.spawn_delayed(wait, Rekey(peer_ref.clone(), our_index)); + self.timer.send_after(wait, Rekey(peer_ref.clone(), our_index)); bail!("too soon since last init sent, waiting {:?} ({})", wait, our_index); } else if peer.last_tun_queue.elapsed() > *REKEY_ATTEMPT_TIME { peer.sessions.next = None; @@ -322,7 +324,7 @@ impl PeerServer { let since_last_recv = peer.sessions.current.as_ref().unwrap().last_received.elapsed(); // gross if since_last_recv <= *STALE_SESSION_TIMEOUT { let wait = *STALE_SESSION_TIMEOUT - since_last_recv; - self.timer.spawn_delayed(wait, Rekey(peer_ref.clone(), our_index)); + self.timer.send_after(wait, Rekey(peer_ref.clone(), our_index)); bail!("rekey tick (waiting ~{}s due to stale session check)", wait.as_secs()); } }, @@ -343,14 +345,14 @@ impl PeerServer { let since_last_send = session.last_sent.elapsed(); if since_last_recv < *KEEPALIVE_TIMEOUT { let wait = *KEEPALIVE_TIMEOUT - since_last_recv; - self.timer.spawn_delayed(wait, PassiveKeepAlive(peer_ref.clone(), our_index)); + self.timer.send_after(wait, PassiveKeepAlive(peer_ref.clone(), our_index)); bail!("passive keepalive tick (waiting ~{}s due to last recv time)", wait.as_secs()); } else if since_last_send < *KEEPALIVE_TIMEOUT { let wait = *KEEPALIVE_TIMEOUT - since_last_send; - self.timer.spawn_delayed(wait, PassiveKeepAlive(peer_ref.clone(), our_index)); + self.timer.send_after(wait, PassiveKeepAlive(peer_ref.clone(), our_index)); bail!("passive keepalive tick (waiting ~{}s due to last send time)", wait.as_secs()); } else if session.keepalive_sent { - self.timer.spawn_delayed(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone(), our_index)); + self.timer.send_after(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone(), our_index)); bail!("passive keepalive already sent (waiting ~{}s to see if session survives)", KEEPALIVE_TIMEOUT.as_secs()); } else { session.keepalive_sent = true; @@ -360,7 +362,7 @@ impl PeerServer { self.send_to_peer(peer.handle_outgoing_transport(&[])?)?; debug!("sent passive keepalive packet ({})", our_index); - self.timer.spawn_delayed(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone(), our_index)); + self.timer.send_after(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone(), our_index)); }, PersistentKeepAlive(peer_ref, our_index) => { let mut peer = peer_ref.borrow_mut(); @@ -373,8 +375,8 @@ impl PeerServer { debug!("sent persistent keepalive packet ({})", our_index); if let Some(keepalive) = peer.info.keepalive { - self.timer.spawn_delayed(Duration::from_secs(u64::from(keepalive)), - PersistentKeepAlive(peer_ref.clone(), our_index)); + self.timer.send_after(Duration::from_secs(u64::from(keepalive)), + PersistentKeepAlive(peer_ref.clone(), our_index)); } }, @@ -444,7 +446,7 @@ impl Future for PeerServer { loop { match self.udp.as_mut().unwrap().ingress.poll() { Ok(Async::Ready(Some((addr, packet)))) => { - let _ = self.handle_ingress_packet(addr, &packet).map_err(|e| warn!("UDP ERR: {:?}", e)); + let _ = self.handle_ingress_packet(addr, packet).map_err(|e| warn!("UDP ERR: {:?}", e)); }, Ok(Async::NotReady) => break, Ok(Async::Ready(None)) | Err(_) => return Err(()), diff --git a/src/lib.rs b/src/lib.rs index 7ff0944..76304dc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,11 +1,13 @@ #![feature(ip_constructors)] #![feature(try_trait)] +#![feature(try_from)] #![feature(test)] #![cfg_attr(feature = "cargo-clippy", allow(doc_markdown))] #![cfg_attr(feature = "cargo-clippy", allow(unreadable_literal))] #![cfg_attr(feature = "cargo-clippy", allow(decimal_literal_representation))] +#[macro_use] extern crate derive_deref; #[macro_use] extern crate failure; #[macro_use] extern crate futures; #[macro_use] extern crate lazy_static; @@ -35,18 +37,19 @@ extern crate tokio_signal; extern crate treebitmap; extern crate x25519_dalek; -mod udp; - -pub mod consts; -pub mod cookie; -pub mod error; pub mod interface; -pub mod noise; -pub mod peer; -pub mod types; -pub mod anti_replay; -pub mod router; -pub mod time; -pub mod timer; -pub mod ip_packet; -pub mod xchacha20poly1305; + +mod udp; +mod message; +mod consts; +mod cookie; +mod error; +mod noise; +mod peer; +mod types; +mod anti_replay; +mod router; +mod time; +mod timer; +mod ip_packet; +mod xchacha20poly1305; diff --git a/src/message.rs b/src/message.rs new file mode 100644 index 0000000..94104c3 --- /dev/null +++ b/src/message.rs @@ -0,0 +1,154 @@ +#![allow(unused)] + +use failure::Error; +use std::convert::{TryFrom, TryInto}; +use byteorder::{ByteOrder, LittleEndian}; + +pub enum Message { + Initiation(Initiation), + Response(Response), + CookieReply(CookieReply), + Transport(Transport), +} + +impl TryFrom> for Message { + type Error = Error; + + fn try_from(packet: Vec) -> Result { + Ok(match packet[0] { + 1 => Message::Initiation(packet.try_into()?), + 2 => Message::Response(packet.try_into()?), + 3 => Message::CookieReply(packet.try_into()?), + 4 => Message::Transport(packet.try_into()?), + _ => bail!("unknown wireguard message type") + }) + } +} + +#[derive(Deref)] +pub struct Initiation(Vec); + +impl Initiation { + pub fn their_index(&self) -> u32 { + LittleEndian::read_u32(&self[4..]) + } + + pub fn noise_bytes(&self) -> &[u8] { + &self[8..116] + } + + pub fn as_bytes(&self) -> &[u8] { + &self + } +} + +impl TryFrom> for Initiation { + type Error = Error; + + fn try_from(packet: Vec) -> Result { + ensure!(packet.len() == 148, "incorrect handshake initiation packet length."); + Ok(Initiation(packet)) + } +} + +#[derive(Deref)] +pub struct Response(Vec); + +impl Response { + pub fn their_index(&self) -> u32 { + LittleEndian::read_u32(&self[4..]) + } + + pub fn our_index(&self) -> u32 { + LittleEndian::read_u32(&self[8..]) + } + + pub fn noise_bytes(&self) -> &[u8] { + &self[12..60] + } + + pub fn mac1(&self) -> &[u8] { + &self[60..76] + } + + pub fn mac2(&self) -> &[u8] { + &self[76..92] + } + + pub fn as_bytes(&self) -> &[u8] { + &self + } +} + +impl TryFrom> for Response { + type Error = Error; + + fn try_from(packet: Vec) -> Result { + ensure!(packet.len() == 92, "incorrect handshake response packet length."); + Ok(Response(packet)) + } +} + +#[derive(Deref)] +pub struct CookieReply(Vec); + +impl CookieReply { + pub fn our_index(&self) -> u32 { + LittleEndian::read_u32(&self[4..]) + } + + pub fn nonce(&self) -> &[u8] { + &self[8..32] + } + + pub fn cookie(&self) -> &[u8] { + &self[32..48] + } + + pub fn aead_tag(&self) -> &[u8] { + &self[48..64] + } + + pub fn as_bytes(&self) -> &[u8] { + &self + } +} + +impl TryFrom> for CookieReply { + type Error = Error; + + fn try_from(packet: Vec) -> Result { + ensure!(packet.len() == 64, "incorrect cookie reply packet length."); + Ok(CookieReply(packet)) + } +} + +#[derive(Deref)] +pub struct Transport(Vec); + +impl Transport { + pub fn our_index(&self) -> u32 { + LittleEndian::read_u32(&self[4..]) + } + + pub fn nonce(&self) -> u64 { + LittleEndian::read_u64(&self[8..16]) + } + + pub fn payload(&self) -> &[u8] { + &self[16..] + } + + pub fn as_bytes(&self) -> &[u8] { + &self + } +} + +impl TryFrom> for Transport { + type Error = Error; + + fn try_from(packet: Vec) -> Result { + ensure!(packet.len() >= 32, "transport message smaller than minimum length."); + Ok(Transport(packet)) + } +} diff --git a/src/peer.rs b/src/peer.rs index e329886..41a1e34 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -7,6 +7,7 @@ use failure::{Error, err_msg}; use interface::UtunPacket; use ip_packet::IpPacket; use noise; +use message::{Initiation, Response, CookieReply, Transport}; use std::{self, mem}; use std::collections::VecDeque; use std::fmt::{self, Debug, Display, Formatter}; @@ -229,16 +230,15 @@ impl Peer { Ok((endpoint, packet, dead_index)) } - pub fn process_incoming_handshake(private_key: &[u8], packet: &[u8]) -> Result { + pub fn process_incoming_handshake(private_key: &[u8], packet: &Initiation) -> Result { let mut timestamp = [0u8; 12]; let mut noise = noise::build_responder(private_key)?; - let their_index = LittleEndian::read_u32(&packet[4..]); - let len = noise.read_message(&packet[8..116], &mut timestamp)?; + let len = noise.read_message(packet.noise_bytes(), &mut timestamp)?; ensure!(len == 12, "incorrect handshake payload length"); let timestamp = timestamp.into(); - Ok(IncompleteIncomingHandshake { their_index, timestamp, noise }) + Ok(IncompleteIncomingHandshake { their_index: packet.their_index(), timestamp, noise }) } /// Takes a new handshake packet (type 0x01), updates the internal peer state, @@ -285,17 +285,15 @@ impl Peer { Ok(packet) } - pub fn consume_cookie_reply(&mut self, reply: &[u8]) -> Result<(), Error> { + pub fn consume_cookie_reply(&mut self, reply: &CookieReply) -> Result<(), Error> { self.cookie.consume_reply(reply) } - pub fn process_incoming_handshake_response(&mut self, packet: &[u8]) -> Result, Error> { - let their_index = LittleEndian::read_u32(&packet[4..]); + pub fn process_incoming_handshake_response(&mut self, packet: &Response) -> Result, Error> { let mut session = mem::replace(&mut self.sessions.next, None).ok_or_else(|| err_msg("no next session"))?; - let _ = session.noise.read_message(&packet[12..60], &mut [])?; - + let _ = session.noise.read_message(&packet.noise_bytes(), &mut [])?; session = session.into_transport_mode()?; - session.their_index = their_index; + session.their_index = packet.their_index(); session.birthday = Timestamp::now(); self.last_handshake = Timestamp::now(); @@ -305,22 +303,21 @@ impl Peer { Ok(dead.map(|session| session.our_index)) } - pub fn handle_incoming_transport(&mut self, addr: SocketAddr, packet: &[u8]) + pub fn handle_incoming_transport(&mut self, addr: SocketAddr, packet: &Transport) -> Result<(Vec, Option>), Error> { - let our_index = LittleEndian::read_u32(&packet[4..]); - let nonce = LittleEndian::read_u64(&packet[8..]); let mut raw_packet = vec![0u8; packet.len()]; + let nonce = packet.nonce(); let session_type = { - let (session, session_type) = self.find_session(our_index).ok_or_else(|| err_msg("no session with index"))?; + let (session, session_type) = self.find_session(packet.our_index()).ok_or_else(|| err_msg("no session with index"))?; ensure!(session.noise.is_handshake_finished(), "session is not ready for transport packets"); ensure!(nonce < REJECT_AFTER_MESSAGES, "exceeded REJECT-AFTER-MESSAGES"); ensure!(session.birthday.elapsed() < *REJECT_AFTER_TIME, "exceeded REJECT-AFTER-TIME"); session.anti_replay.update(nonce)?; session.noise.set_receiving_nonce(nonce)?; - let len = session.noise.read_message(&packet[16..], &mut raw_packet)?; + let len = session.noise.read_message(packet.payload(), &mut raw_packet)?; if len > 0 { let len = IpPacket::new(&raw_packet[..len]) .ok_or_else(||format_err!("invalid IP packet (len {})", len))? diff --git a/src/timer.rs b/src/timer.rs index 7db3b2e..bbf80f3 100644 --- a/src/timer.rs +++ b/src/timer.rs @@ -30,7 +30,7 @@ impl Timer { Self { handle, timer, tx, rx } } - pub fn spawn_delayed(&mut self, delay: Duration, message: TimerMessage) { + pub fn send_after(&mut self, delay: Duration, message: TimerMessage) { trace!("queuing timer message {:?}", &message); let timer = self.timer.sleep(delay + (*TIMER_RESOLUTION * 2)); let future = timer.and_then({ @@ -42,9 +42,6 @@ impl Timer { self.handle.spawn(future); } - pub fn spawn_immediately(&mut self, message: TimerMessage) { - self.handle.spawn(self.tx.clone().send(message).then(|_| Ok(()))); - } } impl Stream for Timer { -- cgit v1.2.3-59-g8ed1b