diff options
author | 2018-06-06 04:42:08 -0500 | |
---|---|---|
committer | 2018-06-06 04:42:08 -0500 | |
commit | b0dfc59e6959ff73b289fd48674b872c8c612d68 (patch) | |
tree | 1be67a3c0d37a01cad9e587d6d8d9e2b21fd17ed /src | |
parent | finish up nonce rework (diff) | |
download | wireguard-rs-jm/multithread-rayon.tar.xz wireguard-rs-jm/multithread-rayon.zip |
baby stepsjm/multithread-rayon
Diffstat (limited to 'src')
-rw-r--r-- | src/interface/mod.rs | 34 | ||||
-rw-r--r-- | src/interface/peer_server.rs | 75 | ||||
-rw-r--r-- | src/lib.rs | 2 | ||||
-rw-r--r-- | src/message.rs | 35 | ||||
-rw-r--r-- | src/peer.rs | 162 | ||||
-rw-r--r-- | src/types.rs | 2 |
6 files changed, 154 insertions, 156 deletions
diff --git a/src/interface/mod.rs b/src/interface/mod.rs index 7036a09..6d0fa85 100644 --- a/src/interface/mod.rs +++ b/src/interface/mod.rs @@ -12,7 +12,7 @@ use std::io; use std::rc::{Rc, Weak}; use std::cell::RefCell; use std::collections::HashMap; -use types::{InterfaceInfo}; +use types::{InterfaceInfo, PacketVec}; use rips_packets::ipv4::Ipv4Packet; @@ -44,39 +44,13 @@ pub struct Interface { } struct VecUtunCodec; -pub enum UtunPacket { - Inet4(Vec<u8>), - Inet6(Vec<u8>), -} - -impl UtunPacket { - pub fn payload(&self) -> &[u8] { - use self::UtunPacket::*; - match *self { - Inet4(ref payload) | Inet6(ref payload) => payload, - } - } - - pub fn from(raw_packet: Vec<u8>) -> Result<UtunPacket, Error> { - match raw_packet[0] >> 4 { - 4 => Ok(UtunPacket::Inet4(raw_packet)), - 6 => Ok(UtunPacket::Inet6(raw_packet)), - _ => bail!("unrecognized IP version") - } - } -} impl UtunCodec for VecUtunCodec { - type In = UtunPacket; - type Out = Vec<u8>; + type In = PacketVec; + type Out = PacketVec; fn decode(&mut self, buf: &[u8]) -> io::Result<Self::In> { - trace!("utun packet type {}", buf[3]); - match buf[4] >> 4 { - 4 => Ok(UtunPacket::Inet4(buf[4..].to_vec())), - 6 => Ok(UtunPacket::Inet6(buf[4..].to_vec())), - _ => Err(io::ErrorKind::InvalidData.into()) - } + Ok(buf[4..].into()) } fn encode(&mut self, mut msg: Self::Out, buf: &mut Vec<u8>) { diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs index d03fa58..ee40061 100644 --- a/src/interface/peer_server.rs +++ b/src/interface/peer_server.rs @@ -2,7 +2,7 @@ use consts::{REKEY_TIMEOUT, KEEPALIVE_TIMEOUT, STALE_SESSION_TIMEOUT, MAX_CONTENT_SIZE, WIPE_AFTER_TIME, MAX_HANDSHAKE_ATTEMPTS, UNDER_LOAD_QUEUE_SIZE, UNDER_LOAD_TIME}; use cookie; -use interface::{SharedPeer, SharedState, State, UtunPacket}; +use interface::{SharedPeer, SharedState, State}; use message::{Message, Initiation, Response, CookieReply, Transport}; use peer::{Peer, SessionType, SessionTransition}; use ratelimiter::RateLimiter; @@ -12,10 +12,10 @@ use timer::{Timer, TimerMessage}; use byteorder::{ByteOrder, LittleEndian}; use failure::{Error, err_msg}; use futures::{Async, Future, Stream, Poll, unsync::mpsc, task}; -use futures_cpupool::CpuPool; use rand::{self, Rng, ThreadRng}; use udp::{Endpoint, UdpSocket, PeerServerMessage, UdpChannel}; use tokio_core::reactor::Handle; +use types::PacketVec; use std::collections::VecDeque; use std::convert::TryInto; @@ -51,17 +51,15 @@ pub struct PeerServer { shared_state : SharedState, udp : Option<UdpChannel>, port : Option<u16>, - outgoing : Channel<UtunPacket>, + outgoing : Channel<PacketVec>, channel : Channel<ChannelMessage>, handshakes : VecDeque<(Endpoint, Message)>, timer : Timer, - tunnel_tx : mpsc::UnboundedSender<Vec<u8>>, + tunnel_tx : mpsc::UnboundedSender<PacketVec>, cookie : cookie::Validator, rate_limiter : RateLimiter, under_load_until : Instant, rng : ThreadRng, - cpu_pool : CpuPool, - decrypt_channel : Channel<(Endpoint, Transport, Vec<u8>, SessionType)>, } impl PeerServer { @@ -79,8 +77,6 @@ impl PeerServer { rate_limiter : RateLimiter::new(&handle)?, under_load_until : Instant::now(), rng : rand::thread_rng(), - cpu_pool : CpuPool::new_num_cpus(), - decrypt_channel : mpsc::unbounded().into(), }) } @@ -116,7 +112,7 @@ impl PeerServer { Ok(()) } - pub fn tunnel_tx(&self) -> mpsc::UnboundedSender<UtunPacket> { + pub fn tunnel_tx(&self) -> mpsc::UnboundedSender<PacketVec> { self.outgoing.tx.clone() } @@ -266,11 +262,11 @@ impl PeerServer { if !peer.outgoing_queue.is_empty() { debug!("sending {} queued egress packets", peer.outgoing_queue.len()); while let Some(packet) = peer.outgoing_queue.pop_front() { - self.send_to_peer(peer.handle_outgoing_transport(packet.payload())?)?; + self.send_to_peer(peer.handle_outgoing_transport(packet)?)?; } } else { debug!("sending empty keepalive"); - self.send_to_peer(peer.handle_outgoing_transport(&[])?)?; + self.send_to_peer(peer.handle_outgoing_transport(vec![])?)?; } } else { error!("peer not ready for transport after processing handshake response. this shouldn't happen."); @@ -290,42 +286,23 @@ impl PeerServer { } fn handle_ingress_transport(&mut self, addr: Endpoint, packet: Transport) -> Result<(), Error> { - let peer_ref = self.shared_state.borrow().index_map.get(&packet.our_index()) .ok_or_else(|| err_msg("unknown our_index"))?.clone(); - let mut peer = peer_ref.borrow_mut(); - let tx = self.decrypt_channel.tx.clone(); - let f = self.cpu_pool.spawn(peer.handle_incoming_transport(addr, packet)?) - .and_then(move |result| { - tx.unbounded_send(result).expect("broken decrypt channel"); - Ok(()) - }) - .map_err(|e| warn!("{:?}", e)); - self.handle.spawn(f); - - Ok(()) - } - - fn handle_ingress_decrypted_transport(&mut self, addr: Endpoint, orig_packet: Transport, raw_packet: Vec<u8>, session_type: SessionType) - -> Result<(), Error> - { - let peer_ref = self.shared_state.borrow().index_map.get(&orig_packet.our_index()) - .ok_or_else(|| err_msg("unknown our_index"))?.clone(); - - let needs_handshake = { + let (raw_packet, needs_handshake) = { let mut peer = peer_ref.borrow_mut(); - let transition = peer.handle_incoming_decrypted_transport(addr, &raw_packet, session_type)?; let mut state = self.shared_state.borrow_mut(); + let (raw_packet, transition) = peer.handle_incoming_transport(addr, &packet)?; + if let SessionTransition::Transition(possible_dead_index) = transition { if let Some(index) = possible_dead_index { let _ = state.index_map.remove(&index); } - let outgoing: Vec<UtunPacket> = peer.outgoing_queue.drain(..).collect(); + let outgoing: Vec<PacketVec> = peer.outgoing_queue.drain(..).collect(); for packet in outgoing { - match peer.handle_outgoing_transport(packet.payload()) { + match peer.handle_outgoing_transport(packet) { Ok(message) => self.send_to_peer(message)?, Err(e) => warn!("failed to encrypt packet: {}", e) } @@ -333,7 +310,7 @@ impl PeerServer { self.timer.send_after(*WIPE_AFTER_TIME, TimerMessage::Wipe(Rc::downgrade(&peer_ref))); } - peer.needs_new_handshake(false) + (raw_packet, peer.needs_new_handshake(false)) }; if needs_handshake { @@ -352,10 +329,10 @@ impl PeerServer { Ok(()) } - fn handle_egress_packet(&mut self, packet: UtunPacket) -> Result<(), Error> { - ensure!(!packet.payload().is_empty() && packet.payload().len() <= MAX_CONTENT_SIZE, "egress packet outside of size bounds"); + fn handle_egress_packet(&mut self, packet: PacketVec) -> Result<(), Error> { + ensure!(!packet.is_empty() && packet.len() <= MAX_CONTENT_SIZE, "egress packet outside of size bounds"); - let peer_ref = self.shared_state.borrow_mut().router.route_to_peer(packet.payload()) + let peer_ref = self.shared_state.borrow_mut().router.route_to_peer(&packet) .ok_or_else(|| err_msg("no route to peer"))?; let needs_handshake = { @@ -369,7 +346,7 @@ impl PeerServer { } while let Some(packet) = peer.outgoing_queue.pop_front() { - self.send_to_peer(peer.handle_outgoing_transport(packet.payload())?)?; + self.send_to_peer(peer.handle_outgoing_transport(packet)?)?; } } @@ -488,7 +465,7 @@ impl PeerServer { } } - self.send_to_peer(peer.handle_outgoing_transport(&[])?)?; + self.send_to_peer(peer.handle_outgoing_transport(vec![])?)?; debug!("sent passive keepalive packet"); self.timer.send_after(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone())); @@ -506,7 +483,7 @@ impl PeerServer { bail!("persistent keepalive tick (waiting ~{}s due to last authenticated packet time)", wait.as_secs()); } - self.send_to_peer(peer.handle_outgoing_transport(&[])?)?; + self.send_to_peer(peer.handle_outgoing_transport(vec![])?)?; let handle = self.timer.send_after(persistent_keepalive, PersistentKeepAlive(peer_ref.clone())); peer.timers.persistent_timer = Some(handle); debug!("sent persistent keepalive packet"); @@ -564,7 +541,7 @@ impl PeerServer { if let Some(keepalive) = peer.info.persistent_keepalive() { let handle = self.timer.send_after(keepalive, TimerMessage::PersistentKeepAlive(Rc::downgrade(&peer_ref))); peer.timers.persistent_timer = Some(handle); - self.send_to_peer(peer.handle_outgoing_transport(&[])?)?; + self.send_to_peer(peer.handle_outgoing_transport(vec![])?)?; debug!("set new keepalive timer and immediately sent new keepalive packet."); } } @@ -626,18 +603,6 @@ impl Future for PeerServer { } loop { - // Handle UDP packets from the outside world - match self.decrypt_channel.rx.poll() { - Ok(Async::Ready(Some((addr, orig_packet, decrypted, session_type)))) => { - let _ = self.handle_ingress_decrypted_transport(addr, orig_packet, decrypted, session_type).map_err(|e| warn!("UDP ERR: {:?}", e)); - }, - Ok(Async::NotReady) => { break; }, - Ok(Async::Ready(None)) => bail!("incoming udp stream ended unexpectedly"), - Err(e) => bail!("incoming udp stream error: {:?}", e) - } - } - - loop { // Handle packets coming from the local tunnel match self.outgoing.rx.poll() { Ok(Async::Ready(Some(packet))) => { @@ -21,12 +21,12 @@ extern crate blake2_rfc; extern crate byteorder; extern crate bytes; extern crate chacha20_poly1305_aead; -extern crate futures_cpupool; extern crate hex; extern crate libc; extern crate mio; extern crate nix; extern crate notify; +extern crate rayon; extern crate rand; extern crate rips_packets; extern crate snow; diff --git a/src/message.rs b/src/message.rs index 6a2b645..3a0d778 100644 --- a/src/message.rs +++ b/src/message.rs @@ -3,11 +3,12 @@ use failure::Error; use std::convert::{TryFrom, TryInto}; use byteorder::{ByteOrder, LittleEndian}; +use types::PacketVec; -#[derive(Deref, DerefMut)] pub struct Initiation(Vec<u8>); -#[derive(Deref, DerefMut)] pub struct Response(Vec<u8>); -#[derive(Deref, DerefMut)] pub struct CookieReply(Vec<u8>); -#[derive(Deref, DerefMut)] pub struct Transport(Vec<u8>); +#[derive(Deref, DerefMut)] pub struct Initiation(PacketVec); +#[derive(Deref, DerefMut)] pub struct Response(PacketVec); +#[derive(Deref, DerefMut)] pub struct CookieReply(PacketVec); +#[derive(Deref, DerefMut)] pub struct Transport(PacketVec); pub enum Message { Initiation(Initiation), @@ -16,10 +17,10 @@ pub enum Message { Transport(Transport), } -impl TryFrom<Vec<u8>> for Message { +impl TryFrom<PacketVec> for Message { type Error = Error; - fn try_from(packet: Vec<u8>) -> Result<Self, Self::Error> { + fn try_from(packet: PacketVec) -> Result<Self, Self::Error> { Ok(match packet[0] { 1 => Message::Initiation(packet.try_into()?), 2 => Message::Response(packet.try_into()?), @@ -48,10 +49,10 @@ impl Initiation { } } -impl TryFrom<Vec<u8>> for Initiation { +impl TryFrom<PacketVec> for Initiation { type Error = Error; - fn try_from(packet: Vec<u8>) -> Result<Self, Self::Error> { + fn try_from(packet: PacketVec) -> Result<Self, Self::Error> { ensure!(packet.len() == 148, "incorrect handshake initiation packet length."); Ok(Initiation(packet)) } @@ -83,10 +84,10 @@ impl Response { } } -impl TryFrom<Vec<u8>> for Response { +impl TryFrom<PacketVec> for Response { type Error = Error; - fn try_from(packet: Vec<u8>) -> Result<Self, Self::Error> { + fn try_from(packet: PacketVec) -> Result<Self, Self::Error> { ensure!(packet.len() == 92, "incorrect handshake response packet length."); Ok(Response(packet)) } @@ -133,10 +134,10 @@ impl CookieReply { } } -impl TryFrom<Vec<u8>> for CookieReply { +impl TryFrom<PacketVec> for CookieReply { type Error = Error; - fn try_from(packet: Vec<u8>) -> Result<Self, Self::Error> { + fn try_from(packet: PacketVec) -> Result<Self, Self::Error> { ensure!(packet.len() == 64, "incorrect cookie reply packet length."); Ok(CookieReply(packet)) } @@ -160,11 +161,17 @@ impl Transport { } } -impl TryFrom<Vec<u8>> for Transport { +impl TryFrom<PacketVec> for Transport { type Error = Error; - fn try_from(packet: Vec<u8>) -> Result<Self, Self::Error> { + fn try_from(packet: PacketVec) -> Result<Self, Self::Error> { ensure!(packet.len() >= 32, "transport message smaller than minimum length."); Ok(Transport(packet)) } } + +impl From<Transport> for PacketVec { + fn from(packet: Transport) -> PacketVec { + packet.0 + } +}
\ No newline at end of file diff --git a/src/peer.rs b/src/peer.rs index c0b21eb..a2d3a0d 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -1,26 +1,25 @@ use anti_replay::AntiReplay; -use byteorder::{ByteOrder, LittleEndian}; -use consts::{TRANSPORT_OVERHEAD, TRANSPORT_HEADER_SIZE, REKEY_AFTER_MESSAGES, REKEY_AFTER_TIME, - REKEY_AFTER_TIME_RECV, REJECT_AFTER_TIME, REJECT_AFTER_MESSAGES, PADDING_MULTIPLE, - MAX_QUEUED_PACKETS, MAX_HANDSHAKE_ATTEMPTS}; +use consts::*; use cookie; -use failure::{Error, err_msg}; -use futures::{Future, future}; -use interface::UtunPacket; use ip_packet::IpPacket; use noise; use message::{Initiation, Response, CookieReply, Transport}; -use std::{self, mem}; -use std::collections::VecDeque; -use std::fmt::{self, Debug, Display, Formatter}; -use std::time::{SystemTime, UNIX_EPOCH}; -use hex; use timer::TimerHandle; use timestamp::{Tai64n, Timestamp}; -use snow; -use types::PeerInfo; +use types::{PacketVec, PeerInfo}; use udp::Endpoint; +use byteorder::{ByteOrder, LittleEndian}; +use failure::{Error, err_msg}; +use hex; +use rayon::prelude::*; +use std::{self, fmt, mem, + collections::VecDeque, + iter::Iterator, + fmt::{Debug, Display, Formatter}, + time::{SystemTime, UNIX_EPOCH}}; +use snow; + pub struct Peer { pub info : PeerInfo, pub sessions : Sessions, @@ -28,7 +27,7 @@ pub struct Peer { pub tx_bytes : u64, pub rx_bytes : u64, pub last_handshake_tai64n : Option<Tai64n>, - pub outgoing_queue : VecDeque<UtunPacket>, + pub outgoing_queue : VecDeque<PacketVec>, pub cookie : cookie::Generator, } @@ -176,7 +175,7 @@ impl Peer { } } - pub fn queue_egress(&mut self, packet: UtunPacket) { + pub fn queue_egress(&mut self, packet: PacketVec) { if self.outgoing_queue.len() < MAX_QUEUED_PACKETS { self.outgoing_queue.push_back(packet); self.timers.handshake_attempts = 0; @@ -339,31 +338,56 @@ impl Peer { Ok(dead.map(|session| session.our_index)) } - pub fn handle_incoming_transport(&mut self, addr: Endpoint, packet: Transport) - -> Result<Box<Future<Item = (Endpoint, Transport, Vec<u8>, SessionType), Error = Error> + 'static + Send>, Error> - { + pub fn handle_incoming_transport(&mut self, addr: Endpoint, packet: &Transport) + -> Result<(Vec<u8>, SessionTransition), Error> { + let mut raw_packet = vec![0u8; packet.len()]; let nonce = packet.nonce(); - let (session, session_type) = self.find_session(packet.our_index()).ok_or_else(|| err_msg("no session with index"))?; - ensure!(session.noise.is_handshake_finished(), "session is not ready for transport packets"); - ensure!(nonce < REJECT_AFTER_MESSAGES, "exceeded REJECT-AFTER-MESSAGES"); - ensure!(session.birthday.elapsed() < *REJECT_AFTER_TIME, "exceeded REJECT-AFTER-TIME"); + let session_type = { + let (session, session_type) = self.find_session(packet.our_index()).ok_or_else(|| err_msg("no session with index"))?; + ensure!(session.noise.is_handshake_finished(), "session is not ready for transport packets"); + ensure!(nonce < REJECT_AFTER_MESSAGES, "exceeded REJECT-AFTER-MESSAGES"); + ensure!(session.birthday.elapsed() < *REJECT_AFTER_TIME, "exceeded REJECT-AFTER-TIME"); - session.anti_replay.update(nonce)?; - let mut transport = session.noise.get_async_transport_state()?.clone(); - Ok(Box::new(future::lazy(move || { - let len = transport.read_transport_message(nonce, packet.payload(), &mut raw_packet).unwrap(); + session.anti_replay.update(nonce)?; + let len = session.noise.read_async_message(nonce, packet.payload(), &mut raw_packet)?; if len > 0 { let len = IpPacket::new(&raw_packet[..len]) - .ok_or_else(||format_err!("invalid IP packet (len {})", len)).unwrap() + .ok_or_else(||format_err!("invalid IP packet (len {})", len))? .length(); raw_packet.truncate(len as usize); } else { raw_packet.truncate(0); } - Ok((addr, packet, raw_packet, session_type)) - }))) + + session_type + }; + + if !raw_packet.is_empty() { + self.timers.data_received = Timestamp::now(); + } + self.timers.authenticated_received = Timestamp::now(); + self.timers.authenticated_traversed = Timestamp::now(); + self.timers.keepalive_sent = false; // reset passive keepalive token since received a valid ingress transport + + let transition = if session_type == SessionType::Next { + debug!("moving 'next' session to current after receiving first transport packet"); + let next = std::mem::replace(&mut self.sessions.next, None); + let current = std::mem::replace(&mut self.sessions.current, next); + let dead = std::mem::replace(&mut self.sessions.past, current); + + self.timers.handshake_completed = Timestamp::now(); + + SessionTransition::Transition(dead.map(|session| session.our_index)) + } else { + SessionTransition::NoTransition + }; + + self.rx_bytes += packet.len() as u64; + self.info.endpoint = Some(addr); // update peer endpoint after successful authentication + + Ok((raw_packet, transition)) } pub fn handle_incoming_decrypted_transport(&mut self, addr: Endpoint, raw_packet: &[u8], session_type: SessionType) @@ -395,33 +419,59 @@ impl Peer { Ok(transition) } - pub fn handle_outgoing_transport(&mut self, packet: &[u8]) -> Result<(Endpoint, Vec<u8>), Error> { - let session = self.sessions.current.as_mut().ok_or_else(|| err_msg("no current noise session"))?; - let endpoint = self.info.endpoint.ok_or_else(|| err_msg("no known peer endpoint"))?; - let padding = if packet.len() % PADDING_MULTIPLE != 0 { - PADDING_MULTIPLE - (packet.len() % PADDING_MULTIPLE) - } else { 0 }; - let padded_len = packet.len() + padding; - let mut out_packet = vec![0u8; padded_len + TRANSPORT_OVERHEAD]; - - ensure!(session.nonce < REJECT_AFTER_MESSAGES, "exceeded REJECT-AFTER-MESSAGES"); - ensure!(session.birthday.elapsed() < *REJECT_AFTER_TIME, "exceeded REJECT-AFTER-TIME"); - - out_packet[0] = 4; - LittleEndian::write_u32(&mut out_packet[4..], session.their_index); - LittleEndian::write_u64(&mut out_packet[8..], session.nonce); - let padded_packet = &[packet, &vec![0u8; padding]].concat(); - let len = session.noise.write_async_message(session.nonce, padded_packet, &mut out_packet[16..])?; - session.nonce += 1; - self.tx_bytes += len as u64; - - if !packet.is_empty() { - self.timers.data_sent = Timestamp::now(); - } - self.timers.authenticated_traversed = Timestamp::now(); + pub fn handle_outgoing_transport<T>(&mut self, packet: T) -> Result<(Endpoint, PacketVec), Error> + where T: Into<PacketVec> + { + let (endpoint, mut packets) = self.handle_outgoing_transports(vec![packet.into()])?; + Ok((endpoint, packets.remove(0))) + } - out_packet.truncate(TRANSPORT_HEADER_SIZE + len); - Ok((endpoint, out_packet)) + pub fn handle_outgoing_transports<T>(&mut self, packets: T) -> Result<(Endpoint, Vec<PacketVec>), Error> + where T: IntoIterator<Item = PacketVec> + { + let session = self.sessions.current.as_mut().ok_or_else(|| err_msg("no current noise session"))?; + let endpoint = self.info.endpoint.ok_or_else(|| err_msg("no known peer endpoint"))?; + + ensure!(session.birthday.elapsed() < *REJECT_AFTER_TIME, "exceeded REJECT-AFTER-TIME"); + + let transport = session.noise.get_async_transport_state()?.clone(); + let encrypted_packets = packets.into_iter() + .filter_map(|mut packet| { + if session.nonce > REJECT_AFTER_MESSAGES { + warn!("exceeded REJECT-AFTER-MESSAGES"); + None + } else { + let padding = if packet.len() % PADDING_MULTIPLE != 0 { + PADDING_MULTIPLE - (packet.len() % PADDING_MULTIPLE) + } else { 0 }; + let padded_len = packet.len() + padding; + let mut out_packet = vec![0u8; padded_len + TRANSPORT_OVERHEAD]; + packet.resize(padded_len, 0); + + out_packet[0] = 4; + LittleEndian::write_u32(&mut out_packet[4..], session.their_index); + LittleEndian::write_u64(&mut out_packet[8..], session.nonce); + session.nonce += 1; + Some((session.nonce - 1, packet, out_packet)) + } + }) + .collect::<Vec<_>>() + .into_par_iter() + .map_with(transport, |transport, (nonce, in_packet, mut out_packet)| { + let len = transport.write_transport_message(nonce, &in_packet, &mut out_packet[16..]).unwrap(); + out_packet.truncate(TRANSPORT_HEADER_SIZE + len); + out_packet + }) + .collect::<Vec<_>>(); + + // self.tx_bytes += len as u64; + + // if !packet.is_empty() { + // self.timers.data_sent = Timestamp::now(); + // } + // self.timers.authenticated_traversed = Timestamp::now(); + + Ok((endpoint, encrypted_packets)) } pub fn to_config_string(&self) -> String { diff --git a/src/types.rs b/src/types.rs index 77a104d..2b569e7 100644 --- a/src/types.rs +++ b/src/types.rs @@ -4,6 +4,8 @@ use std::net::IpAddr; use std::time::Duration; use udp::Endpoint; +pub type PacketVec = Vec<u8>; + #[derive(Clone, Debug, Default)] pub struct PeerInfo { pub pub_key: [u8; 32], |