aboutsummaryrefslogtreecommitdiffstats
path: root/src/peer.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/peer.rs')
-rw-r--r--src/peer.rs162
1 files changed, 106 insertions, 56 deletions
diff --git a/src/peer.rs b/src/peer.rs
index c0b21eb..a2d3a0d 100644
--- a/src/peer.rs
+++ b/src/peer.rs
@@ -1,26 +1,25 @@
use anti_replay::AntiReplay;
-use byteorder::{ByteOrder, LittleEndian};
-use consts::{TRANSPORT_OVERHEAD, TRANSPORT_HEADER_SIZE, REKEY_AFTER_MESSAGES, REKEY_AFTER_TIME,
- REKEY_AFTER_TIME_RECV, REJECT_AFTER_TIME, REJECT_AFTER_MESSAGES, PADDING_MULTIPLE,
- MAX_QUEUED_PACKETS, MAX_HANDSHAKE_ATTEMPTS};
+use consts::*;
use cookie;
-use failure::{Error, err_msg};
-use futures::{Future, future};
-use interface::UtunPacket;
use ip_packet::IpPacket;
use noise;
use message::{Initiation, Response, CookieReply, Transport};
-use std::{self, mem};
-use std::collections::VecDeque;
-use std::fmt::{self, Debug, Display, Formatter};
-use std::time::{SystemTime, UNIX_EPOCH};
-use hex;
use timer::TimerHandle;
use timestamp::{Tai64n, Timestamp};
-use snow;
-use types::PeerInfo;
+use types::{PacketVec, PeerInfo};
use udp::Endpoint;
+use byteorder::{ByteOrder, LittleEndian};
+use failure::{Error, err_msg};
+use hex;
+use rayon::prelude::*;
+use std::{self, fmt, mem,
+ collections::VecDeque,
+ iter::Iterator,
+ fmt::{Debug, Display, Formatter},
+ time::{SystemTime, UNIX_EPOCH}};
+use snow;
+
pub struct Peer {
pub info : PeerInfo,
pub sessions : Sessions,
@@ -28,7 +27,7 @@ pub struct Peer {
pub tx_bytes : u64,
pub rx_bytes : u64,
pub last_handshake_tai64n : Option<Tai64n>,
- pub outgoing_queue : VecDeque<UtunPacket>,
+ pub outgoing_queue : VecDeque<PacketVec>,
pub cookie : cookie::Generator,
}
@@ -176,7 +175,7 @@ impl Peer {
}
}
- pub fn queue_egress(&mut self, packet: UtunPacket) {
+ pub fn queue_egress(&mut self, packet: PacketVec) {
if self.outgoing_queue.len() < MAX_QUEUED_PACKETS {
self.outgoing_queue.push_back(packet);
self.timers.handshake_attempts = 0;
@@ -339,31 +338,56 @@ impl Peer {
Ok(dead.map(|session| session.our_index))
}
- pub fn handle_incoming_transport(&mut self, addr: Endpoint, packet: Transport)
- -> Result<Box<Future<Item = (Endpoint, Transport, Vec<u8>, SessionType), Error = Error> + 'static + Send>, Error>
- {
+ pub fn handle_incoming_transport(&mut self, addr: Endpoint, packet: &Transport)
+ -> Result<(Vec<u8>, SessionTransition), Error> {
+
let mut raw_packet = vec![0u8; packet.len()];
let nonce = packet.nonce();
- let (session, session_type) = self.find_session(packet.our_index()).ok_or_else(|| err_msg("no session with index"))?;
- ensure!(session.noise.is_handshake_finished(), "session is not ready for transport packets");
- ensure!(nonce < REJECT_AFTER_MESSAGES, "exceeded REJECT-AFTER-MESSAGES");
- ensure!(session.birthday.elapsed() < *REJECT_AFTER_TIME, "exceeded REJECT-AFTER-TIME");
+ let session_type = {
+ let (session, session_type) = self.find_session(packet.our_index()).ok_or_else(|| err_msg("no session with index"))?;
+ ensure!(session.noise.is_handshake_finished(), "session is not ready for transport packets");
+ ensure!(nonce < REJECT_AFTER_MESSAGES, "exceeded REJECT-AFTER-MESSAGES");
+ ensure!(session.birthday.elapsed() < *REJECT_AFTER_TIME, "exceeded REJECT-AFTER-TIME");
- session.anti_replay.update(nonce)?;
- let mut transport = session.noise.get_async_transport_state()?.clone();
- Ok(Box::new(future::lazy(move || {
- let len = transport.read_transport_message(nonce, packet.payload(), &mut raw_packet).unwrap();
+ session.anti_replay.update(nonce)?;
+ let len = session.noise.read_async_message(nonce, packet.payload(), &mut raw_packet)?;
if len > 0 {
let len = IpPacket::new(&raw_packet[..len])
- .ok_or_else(||format_err!("invalid IP packet (len {})", len)).unwrap()
+ .ok_or_else(||format_err!("invalid IP packet (len {})", len))?
.length();
raw_packet.truncate(len as usize);
} else {
raw_packet.truncate(0);
}
- Ok((addr, packet, raw_packet, session_type))
- })))
+
+ session_type
+ };
+
+ if !raw_packet.is_empty() {
+ self.timers.data_received = Timestamp::now();
+ }
+ self.timers.authenticated_received = Timestamp::now();
+ self.timers.authenticated_traversed = Timestamp::now();
+ self.timers.keepalive_sent = false; // reset passive keepalive token since received a valid ingress transport
+
+ let transition = if session_type == SessionType::Next {
+ debug!("moving 'next' session to current after receiving first transport packet");
+ let next = std::mem::replace(&mut self.sessions.next, None);
+ let current = std::mem::replace(&mut self.sessions.current, next);
+ let dead = std::mem::replace(&mut self.sessions.past, current);
+
+ self.timers.handshake_completed = Timestamp::now();
+
+ SessionTransition::Transition(dead.map(|session| session.our_index))
+ } else {
+ SessionTransition::NoTransition
+ };
+
+ self.rx_bytes += packet.len() as u64;
+ self.info.endpoint = Some(addr); // update peer endpoint after successful authentication
+
+ Ok((raw_packet, transition))
}
pub fn handle_incoming_decrypted_transport(&mut self, addr: Endpoint, raw_packet: &[u8], session_type: SessionType)
@@ -395,33 +419,59 @@ impl Peer {
Ok(transition)
}
- pub fn handle_outgoing_transport(&mut self, packet: &[u8]) -> Result<(Endpoint, Vec<u8>), Error> {
- let session = self.sessions.current.as_mut().ok_or_else(|| err_msg("no current noise session"))?;
- let endpoint = self.info.endpoint.ok_or_else(|| err_msg("no known peer endpoint"))?;
- let padding = if packet.len() % PADDING_MULTIPLE != 0 {
- PADDING_MULTIPLE - (packet.len() % PADDING_MULTIPLE)
- } else { 0 };
- let padded_len = packet.len() + padding;
- let mut out_packet = vec![0u8; padded_len + TRANSPORT_OVERHEAD];
-
- ensure!(session.nonce < REJECT_AFTER_MESSAGES, "exceeded REJECT-AFTER-MESSAGES");
- ensure!(session.birthday.elapsed() < *REJECT_AFTER_TIME, "exceeded REJECT-AFTER-TIME");
-
- out_packet[0] = 4;
- LittleEndian::write_u32(&mut out_packet[4..], session.their_index);
- LittleEndian::write_u64(&mut out_packet[8..], session.nonce);
- let padded_packet = &[packet, &vec![0u8; padding]].concat();
- let len = session.noise.write_async_message(session.nonce, padded_packet, &mut out_packet[16..])?;
- session.nonce += 1;
- self.tx_bytes += len as u64;
-
- if !packet.is_empty() {
- self.timers.data_sent = Timestamp::now();
- }
- self.timers.authenticated_traversed = Timestamp::now();
+ pub fn handle_outgoing_transport<T>(&mut self, packet: T) -> Result<(Endpoint, PacketVec), Error>
+ where T: Into<PacketVec>
+ {
+ let (endpoint, mut packets) = self.handle_outgoing_transports(vec![packet.into()])?;
+ Ok((endpoint, packets.remove(0)))
+ }
- out_packet.truncate(TRANSPORT_HEADER_SIZE + len);
- Ok((endpoint, out_packet))
+ pub fn handle_outgoing_transports<T>(&mut self, packets: T) -> Result<(Endpoint, Vec<PacketVec>), Error>
+ where T: IntoIterator<Item = PacketVec>
+ {
+ let session = self.sessions.current.as_mut().ok_or_else(|| err_msg("no current noise session"))?;
+ let endpoint = self.info.endpoint.ok_or_else(|| err_msg("no known peer endpoint"))?;
+
+ ensure!(session.birthday.elapsed() < *REJECT_AFTER_TIME, "exceeded REJECT-AFTER-TIME");
+
+ let transport = session.noise.get_async_transport_state()?.clone();
+ let encrypted_packets = packets.into_iter()
+ .filter_map(|mut packet| {
+ if session.nonce > REJECT_AFTER_MESSAGES {
+ warn!("exceeded REJECT-AFTER-MESSAGES");
+ None
+ } else {
+ let padding = if packet.len() % PADDING_MULTIPLE != 0 {
+ PADDING_MULTIPLE - (packet.len() % PADDING_MULTIPLE)
+ } else { 0 };
+ let padded_len = packet.len() + padding;
+ let mut out_packet = vec![0u8; padded_len + TRANSPORT_OVERHEAD];
+ packet.resize(padded_len, 0);
+
+ out_packet[0] = 4;
+ LittleEndian::write_u32(&mut out_packet[4..], session.their_index);
+ LittleEndian::write_u64(&mut out_packet[8..], session.nonce);
+ session.nonce += 1;
+ Some((session.nonce - 1, packet, out_packet))
+ }
+ })
+ .collect::<Vec<_>>()
+ .into_par_iter()
+ .map_with(transport, |transport, (nonce, in_packet, mut out_packet)| {
+ let len = transport.write_transport_message(nonce, &in_packet, &mut out_packet[16..]).unwrap();
+ out_packet.truncate(TRANSPORT_HEADER_SIZE + len);
+ out_packet
+ })
+ .collect::<Vec<_>>();
+
+ // self.tx_bytes += len as u64;
+
+ // if !packet.is_empty() {
+ // self.timers.data_sent = Timestamp::now();
+ // }
+ // self.timers.authenticated_traversed = Timestamp::now();
+
+ Ok((endpoint, encrypted_packets))
}
pub fn to_config_string(&self) -> String {