diff options
Diffstat (limited to 'src/peer.rs')
-rw-r--r-- | src/peer.rs | 162 |
1 files changed, 106 insertions, 56 deletions
diff --git a/src/peer.rs b/src/peer.rs index c0b21eb..a2d3a0d 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -1,26 +1,25 @@ use anti_replay::AntiReplay; -use byteorder::{ByteOrder, LittleEndian}; -use consts::{TRANSPORT_OVERHEAD, TRANSPORT_HEADER_SIZE, REKEY_AFTER_MESSAGES, REKEY_AFTER_TIME, - REKEY_AFTER_TIME_RECV, REJECT_AFTER_TIME, REJECT_AFTER_MESSAGES, PADDING_MULTIPLE, - MAX_QUEUED_PACKETS, MAX_HANDSHAKE_ATTEMPTS}; +use consts::*; use cookie; -use failure::{Error, err_msg}; -use futures::{Future, future}; -use interface::UtunPacket; use ip_packet::IpPacket; use noise; use message::{Initiation, Response, CookieReply, Transport}; -use std::{self, mem}; -use std::collections::VecDeque; -use std::fmt::{self, Debug, Display, Formatter}; -use std::time::{SystemTime, UNIX_EPOCH}; -use hex; use timer::TimerHandle; use timestamp::{Tai64n, Timestamp}; -use snow; -use types::PeerInfo; +use types::{PacketVec, PeerInfo}; use udp::Endpoint; +use byteorder::{ByteOrder, LittleEndian}; +use failure::{Error, err_msg}; +use hex; +use rayon::prelude::*; +use std::{self, fmt, mem, + collections::VecDeque, + iter::Iterator, + fmt::{Debug, Display, Formatter}, + time::{SystemTime, UNIX_EPOCH}}; +use snow; + pub struct Peer { pub info : PeerInfo, pub sessions : Sessions, @@ -28,7 +27,7 @@ pub struct Peer { pub tx_bytes : u64, pub rx_bytes : u64, pub last_handshake_tai64n : Option<Tai64n>, - pub outgoing_queue : VecDeque<UtunPacket>, + pub outgoing_queue : VecDeque<PacketVec>, pub cookie : cookie::Generator, } @@ -176,7 +175,7 @@ impl Peer { } } - pub fn queue_egress(&mut self, packet: UtunPacket) { + pub fn queue_egress(&mut self, packet: PacketVec) { if self.outgoing_queue.len() < MAX_QUEUED_PACKETS { self.outgoing_queue.push_back(packet); self.timers.handshake_attempts = 0; @@ -339,31 +338,56 @@ impl Peer { Ok(dead.map(|session| session.our_index)) } - pub fn handle_incoming_transport(&mut self, addr: Endpoint, packet: Transport) - -> Result<Box<Future<Item = (Endpoint, Transport, Vec<u8>, SessionType), Error = Error> + 'static + Send>, Error> - { + pub fn handle_incoming_transport(&mut self, addr: Endpoint, packet: &Transport) + -> Result<(Vec<u8>, SessionTransition), Error> { + let mut raw_packet = vec![0u8; packet.len()]; let nonce = packet.nonce(); - let (session, session_type) = self.find_session(packet.our_index()).ok_or_else(|| err_msg("no session with index"))?; - ensure!(session.noise.is_handshake_finished(), "session is not ready for transport packets"); - ensure!(nonce < REJECT_AFTER_MESSAGES, "exceeded REJECT-AFTER-MESSAGES"); - ensure!(session.birthday.elapsed() < *REJECT_AFTER_TIME, "exceeded REJECT-AFTER-TIME"); + let session_type = { + let (session, session_type) = self.find_session(packet.our_index()).ok_or_else(|| err_msg("no session with index"))?; + ensure!(session.noise.is_handshake_finished(), "session is not ready for transport packets"); + ensure!(nonce < REJECT_AFTER_MESSAGES, "exceeded REJECT-AFTER-MESSAGES"); + ensure!(session.birthday.elapsed() < *REJECT_AFTER_TIME, "exceeded REJECT-AFTER-TIME"); - session.anti_replay.update(nonce)?; - let mut transport = session.noise.get_async_transport_state()?.clone(); - Ok(Box::new(future::lazy(move || { - let len = transport.read_transport_message(nonce, packet.payload(), &mut raw_packet).unwrap(); + session.anti_replay.update(nonce)?; + let len = session.noise.read_async_message(nonce, packet.payload(), &mut raw_packet)?; if len > 0 { let len = IpPacket::new(&raw_packet[..len]) - .ok_or_else(||format_err!("invalid IP packet (len {})", len)).unwrap() + .ok_or_else(||format_err!("invalid IP packet (len {})", len))? .length(); raw_packet.truncate(len as usize); } else { raw_packet.truncate(0); } - Ok((addr, packet, raw_packet, session_type)) - }))) + + session_type + }; + + if !raw_packet.is_empty() { + self.timers.data_received = Timestamp::now(); + } + self.timers.authenticated_received = Timestamp::now(); + self.timers.authenticated_traversed = Timestamp::now(); + self.timers.keepalive_sent = false; // reset passive keepalive token since received a valid ingress transport + + let transition = if session_type == SessionType::Next { + debug!("moving 'next' session to current after receiving first transport packet"); + let next = std::mem::replace(&mut self.sessions.next, None); + let current = std::mem::replace(&mut self.sessions.current, next); + let dead = std::mem::replace(&mut self.sessions.past, current); + + self.timers.handshake_completed = Timestamp::now(); + + SessionTransition::Transition(dead.map(|session| session.our_index)) + } else { + SessionTransition::NoTransition + }; + + self.rx_bytes += packet.len() as u64; + self.info.endpoint = Some(addr); // update peer endpoint after successful authentication + + Ok((raw_packet, transition)) } pub fn handle_incoming_decrypted_transport(&mut self, addr: Endpoint, raw_packet: &[u8], session_type: SessionType) @@ -395,33 +419,59 @@ impl Peer { Ok(transition) } - pub fn handle_outgoing_transport(&mut self, packet: &[u8]) -> Result<(Endpoint, Vec<u8>), Error> { - let session = self.sessions.current.as_mut().ok_or_else(|| err_msg("no current noise session"))?; - let endpoint = self.info.endpoint.ok_or_else(|| err_msg("no known peer endpoint"))?; - let padding = if packet.len() % PADDING_MULTIPLE != 0 { - PADDING_MULTIPLE - (packet.len() % PADDING_MULTIPLE) - } else { 0 }; - let padded_len = packet.len() + padding; - let mut out_packet = vec![0u8; padded_len + TRANSPORT_OVERHEAD]; - - ensure!(session.nonce < REJECT_AFTER_MESSAGES, "exceeded REJECT-AFTER-MESSAGES"); - ensure!(session.birthday.elapsed() < *REJECT_AFTER_TIME, "exceeded REJECT-AFTER-TIME"); - - out_packet[0] = 4; - LittleEndian::write_u32(&mut out_packet[4..], session.their_index); - LittleEndian::write_u64(&mut out_packet[8..], session.nonce); - let padded_packet = &[packet, &vec![0u8; padding]].concat(); - let len = session.noise.write_async_message(session.nonce, padded_packet, &mut out_packet[16..])?; - session.nonce += 1; - self.tx_bytes += len as u64; - - if !packet.is_empty() { - self.timers.data_sent = Timestamp::now(); - } - self.timers.authenticated_traversed = Timestamp::now(); + pub fn handle_outgoing_transport<T>(&mut self, packet: T) -> Result<(Endpoint, PacketVec), Error> + where T: Into<PacketVec> + { + let (endpoint, mut packets) = self.handle_outgoing_transports(vec![packet.into()])?; + Ok((endpoint, packets.remove(0))) + } - out_packet.truncate(TRANSPORT_HEADER_SIZE + len); - Ok((endpoint, out_packet)) + pub fn handle_outgoing_transports<T>(&mut self, packets: T) -> Result<(Endpoint, Vec<PacketVec>), Error> + where T: IntoIterator<Item = PacketVec> + { + let session = self.sessions.current.as_mut().ok_or_else(|| err_msg("no current noise session"))?; + let endpoint = self.info.endpoint.ok_or_else(|| err_msg("no known peer endpoint"))?; + + ensure!(session.birthday.elapsed() < *REJECT_AFTER_TIME, "exceeded REJECT-AFTER-TIME"); + + let transport = session.noise.get_async_transport_state()?.clone(); + let encrypted_packets = packets.into_iter() + .filter_map(|mut packet| { + if session.nonce > REJECT_AFTER_MESSAGES { + warn!("exceeded REJECT-AFTER-MESSAGES"); + None + } else { + let padding = if packet.len() % PADDING_MULTIPLE != 0 { + PADDING_MULTIPLE - (packet.len() % PADDING_MULTIPLE) + } else { 0 }; + let padded_len = packet.len() + padding; + let mut out_packet = vec![0u8; padded_len + TRANSPORT_OVERHEAD]; + packet.resize(padded_len, 0); + + out_packet[0] = 4; + LittleEndian::write_u32(&mut out_packet[4..], session.their_index); + LittleEndian::write_u64(&mut out_packet[8..], session.nonce); + session.nonce += 1; + Some((session.nonce - 1, packet, out_packet)) + } + }) + .collect::<Vec<_>>() + .into_par_iter() + .map_with(transport, |transport, (nonce, in_packet, mut out_packet)| { + let len = transport.write_transport_message(nonce, &in_packet, &mut out_packet[16..]).unwrap(); + out_packet.truncate(TRANSPORT_HEADER_SIZE + len); + out_packet + }) + .collect::<Vec<_>>(); + + // self.tx_bytes += len as u64; + + // if !packet.is_empty() { + // self.timers.data_sent = Timestamp::now(); + // } + // self.timers.authenticated_traversed = Timestamp::now(); + + Ok((endpoint, encrypted_packets)) } pub fn to_config_string(&self) -> String { |