From b0dfc59e6959ff73b289fd48674b872c8c612d68 Mon Sep 17 00:00:00 2001 From: Jake McGinty Date: Wed, 6 Jun 2018 04:42:08 -0500 Subject: baby steps --- Cargo.lock | 69 +++++++++++++++--- Cargo.toml | 2 +- src/interface/mod.rs | 34 ++------- src/interface/peer_server.rs | 75 ++++++-------------- src/lib.rs | 2 +- src/message.rs | 35 ++++++---- src/peer.rs | 162 ++++++++++++++++++++++++++++--------------- src/types.rs | 2 + 8 files changed, 213 insertions(+), 168 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a3bfb0c..5eb4250 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -228,6 +228,15 @@ dependencies = [ "thread-scoped 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "crossbeam-deque" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "crossbeam-epoch 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-utils 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "crossbeam-deque" version = "0.3.1" @@ -237,6 +246,20 @@ dependencies = [ "crossbeam-utils 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "crossbeam-epoch" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "arrayvec 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", + "cfg-if 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-utils 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", + "memoffset 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "nodrop 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", + "scopeguard 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "crossbeam-epoch" version = "0.4.1" @@ -250,6 +273,14 @@ dependencies = [ "scopeguard 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "crossbeam-utils" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "crossbeam-utils" version = "0.3.2" @@ -381,15 +412,6 @@ name = "futures" version = "0.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" -[[package]] -name = "futures-cpupool" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", - "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "gcc" version = "0.3.54" @@ -790,6 +812,27 @@ name = "rand_core" version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "rayon" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "either 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rayon-core 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rayon-core" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "crossbeam-deque 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.42 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "redox_syscall" version = "0.1.40" @@ -1393,7 +1436,6 @@ dependencies = [ "failure 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "fern 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", - "futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.42 (git+https://github.com/rust-lang/libc)", @@ -1402,6 +1444,7 @@ dependencies = [ "nix 0.11.0-pre (git+https://github.com/mcginty/nix?branch=ipv6-pktinfo)", "notify 4.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.5.0-pre.2 (registry+https://github.com/rust-lang/crates.io-index)", + "rayon 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "rips-packets 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "snow 0.1.12 (git+https://github.com/mcginty/snow?branch=wireguard)", "socket2 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1465,8 +1508,11 @@ dependencies = [ "checksum criterion 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "4f11151e2961d0483e5eb7a2ede5ed8071a460d04d2b7c89e8257aa5502b0e0b" "checksum criterion-plot 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "9f7f7c88a8d341dd9fd9e31a72ca2ca24428db79afb491852873b2c784e037e6" "checksum criterion-stats 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "dd48feb0253b2968ff3085e7f3fba6738c9ff859f420a2fb81a48986eb66da36" +"checksum crossbeam-deque 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f739f8c5363aca78cfb059edf753d8f0d36908c348f3d8d1503f03d8b75d9cf3" "checksum crossbeam-deque 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "fe8153ef04a7594ded05b427ffad46ddeaf22e63fd48d42b3e1e3bb4db07cae7" +"checksum crossbeam-epoch 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "927121f5407de9956180ff5e936fe3cf4324279280001cd56b669d28ee7e9150" "checksum crossbeam-epoch 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9b4e2817eb773f770dcb294127c011e22771899c21d18fce7dd739c0b9832e81" +"checksum crossbeam-utils 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "2760899e32a1d58d5abb31129f8fae5de75220bc2176e77ff7c627ae45c918d9" "checksum crossbeam-utils 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d636a8b3bcc1b409d7ffd3facef8f21dcb4009626adbd0c5e6c4305c07253c7b" "checksum curve25519-dalek 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e5808ccadbb61565fd184702be128ac43a6a33561bcd976a0d7a388b06ad696d" "checksum daemonize 0.2.3 (git+https://github.com/mcginty/daemonize)" = "" @@ -1483,7 +1529,6 @@ dependencies = [ "checksum fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" "checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" "checksum futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)" = "1a70b146671de62ec8c8ed572219ca5d594d9b06c0b364d5e67b722fc559b48c" -"checksum futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "ab90cde24b3319636588d0c35fe03b1333857621051837ed769faefb4c2162e4" "checksum gcc 0.3.54 (registry+https://github.com/rust-lang/crates.io-index)" = "5e33ec290da0d127825013597dbdfc28bee4964690c7ce1166cbc2a7bd08b1bb" "checksum generic-array 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ef25c5683767570c2bbd7deba372926a55eaae9982d7726ee2a1050239d45b9d" "checksum handlebars 0.31.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e7bdb08e879b8c78ee90f5022d121897c31ea022cb0cc6d13f2158c7a9fbabb1" @@ -1531,6 +1576,8 @@ dependencies = [ "checksum rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "eba5f8cb59cc50ed56be8880a5c7b496bfd9bd26394e176bc67884094145c2c5" "checksum rand 0.5.0-pre.2 (registry+https://github.com/rust-lang/crates.io-index)" = "3795e4701d9628a63a84d0289e66279883b40df165fca7caed7b87122447032a" "checksum rand_core 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1b7a5f27547c49e5ccf8a586db3f3782fd93cf849780b21853b9d981db203302" +"checksum rayon 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "80e811e76f1dbf68abf87a759083d34600017fc4e10b6bd5ad84a700f9dba4b1" +"checksum rayon-core 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9d24ad214285a7729b174ed6d3bcfcb80177807f959d95fafd5bfc5c4f201ac8" "checksum redox_syscall 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)" = "c214e91d3ecf43e9a4e41e578973adeb14b474f2bee858742d127af75a0112b1" "checksum redox_termios 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7e891cfe48e9100a70a3b6eb652fef28920c117d366339687bd5576160db0f76" "checksum regex 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "9329abc99e39129fcceabd24cf5d85b4671ef7c29c50e972bc5afe32438ec384" diff --git a/Cargo.toml b/Cargo.toml index 4bdba9c..0a6064b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,6 @@ chacha20-poly1305-aead = "^0.1" derive_deref = "^1.0" failure = "^0.1" futures = "^0.1" -futures-cpupool = "^0.1" lazy_static = "^1" libc = { git = "https://github.com/rust-lang/libc" } log = "^0.4" @@ -47,6 +46,7 @@ rand = "0.5.0-pre.2" nix = { git = "https://github.com/mcginty/nix", branch = "ipv6-pktinfo" } mio = "^0.6" rips-packets = "0.1" +rayon = "^1.0" snow = { git = "https://github.com/mcginty/snow", branch = "wireguard" } socket2 = "^0.3" subtle = "^0.6" diff --git a/src/interface/mod.rs b/src/interface/mod.rs index 7036a09..6d0fa85 100644 --- a/src/interface/mod.rs +++ b/src/interface/mod.rs @@ -12,7 +12,7 @@ use std::io; use std::rc::{Rc, Weak}; use std::cell::RefCell; use std::collections::HashMap; -use types::{InterfaceInfo}; +use types::{InterfaceInfo, PacketVec}; use rips_packets::ipv4::Ipv4Packet; @@ -44,39 +44,13 @@ pub struct Interface { } struct VecUtunCodec; -pub enum UtunPacket { - Inet4(Vec), - Inet6(Vec), -} - -impl UtunPacket { - pub fn payload(&self) -> &[u8] { - use self::UtunPacket::*; - match *self { - Inet4(ref payload) | Inet6(ref payload) => payload, - } - } - - pub fn from(raw_packet: Vec) -> Result { - match raw_packet[0] >> 4 { - 4 => Ok(UtunPacket::Inet4(raw_packet)), - 6 => Ok(UtunPacket::Inet6(raw_packet)), - _ => bail!("unrecognized IP version") - } - } -} impl UtunCodec for VecUtunCodec { - type In = UtunPacket; - type Out = Vec; + type In = PacketVec; + type Out = PacketVec; fn decode(&mut self, buf: &[u8]) -> io::Result { - trace!("utun packet type {}", buf[3]); - match buf[4] >> 4 { - 4 => Ok(UtunPacket::Inet4(buf[4..].to_vec())), - 6 => Ok(UtunPacket::Inet6(buf[4..].to_vec())), - _ => Err(io::ErrorKind::InvalidData.into()) - } + Ok(buf[4..].into()) } fn encode(&mut self, mut msg: Self::Out, buf: &mut Vec) { diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs index d03fa58..ee40061 100644 --- a/src/interface/peer_server.rs +++ b/src/interface/peer_server.rs @@ -2,7 +2,7 @@ use consts::{REKEY_TIMEOUT, KEEPALIVE_TIMEOUT, STALE_SESSION_TIMEOUT, MAX_CONTENT_SIZE, WIPE_AFTER_TIME, MAX_HANDSHAKE_ATTEMPTS, UNDER_LOAD_QUEUE_SIZE, UNDER_LOAD_TIME}; use cookie; -use interface::{SharedPeer, SharedState, State, UtunPacket}; +use interface::{SharedPeer, SharedState, State}; use message::{Message, Initiation, Response, CookieReply, Transport}; use peer::{Peer, SessionType, SessionTransition}; use ratelimiter::RateLimiter; @@ -12,10 +12,10 @@ use timer::{Timer, TimerMessage}; use byteorder::{ByteOrder, LittleEndian}; use failure::{Error, err_msg}; use futures::{Async, Future, Stream, Poll, unsync::mpsc, task}; -use futures_cpupool::CpuPool; use rand::{self, Rng, ThreadRng}; use udp::{Endpoint, UdpSocket, PeerServerMessage, UdpChannel}; use tokio_core::reactor::Handle; +use types::PacketVec; use std::collections::VecDeque; use std::convert::TryInto; @@ -51,17 +51,15 @@ pub struct PeerServer { shared_state : SharedState, udp : Option, port : Option, - outgoing : Channel, + outgoing : Channel, channel : Channel, handshakes : VecDeque<(Endpoint, Message)>, timer : Timer, - tunnel_tx : mpsc::UnboundedSender>, + tunnel_tx : mpsc::UnboundedSender, cookie : cookie::Validator, rate_limiter : RateLimiter, under_load_until : Instant, rng : ThreadRng, - cpu_pool : CpuPool, - decrypt_channel : Channel<(Endpoint, Transport, Vec, SessionType)>, } impl PeerServer { @@ -79,8 +77,6 @@ impl PeerServer { rate_limiter : RateLimiter::new(&handle)?, under_load_until : Instant::now(), rng : rand::thread_rng(), - cpu_pool : CpuPool::new_num_cpus(), - decrypt_channel : mpsc::unbounded().into(), }) } @@ -116,7 +112,7 @@ impl PeerServer { Ok(()) } - pub fn tunnel_tx(&self) -> mpsc::UnboundedSender { + pub fn tunnel_tx(&self) -> mpsc::UnboundedSender { self.outgoing.tx.clone() } @@ -266,11 +262,11 @@ impl PeerServer { if !peer.outgoing_queue.is_empty() { debug!("sending {} queued egress packets", peer.outgoing_queue.len()); while let Some(packet) = peer.outgoing_queue.pop_front() { - self.send_to_peer(peer.handle_outgoing_transport(packet.payload())?)?; + self.send_to_peer(peer.handle_outgoing_transport(packet)?)?; } } else { debug!("sending empty keepalive"); - self.send_to_peer(peer.handle_outgoing_transport(&[])?)?; + self.send_to_peer(peer.handle_outgoing_transport(vec![])?)?; } } else { error!("peer not ready for transport after processing handshake response. this shouldn't happen."); @@ -290,42 +286,23 @@ impl PeerServer { } fn handle_ingress_transport(&mut self, addr: Endpoint, packet: Transport) -> Result<(), Error> { - let peer_ref = self.shared_state.borrow().index_map.get(&packet.our_index()) .ok_or_else(|| err_msg("unknown our_index"))?.clone(); - let mut peer = peer_ref.borrow_mut(); - let tx = self.decrypt_channel.tx.clone(); - let f = self.cpu_pool.spawn(peer.handle_incoming_transport(addr, packet)?) - .and_then(move |result| { - tx.unbounded_send(result).expect("broken decrypt channel"); - Ok(()) - }) - .map_err(|e| warn!("{:?}", e)); - self.handle.spawn(f); - - Ok(()) - } - - fn handle_ingress_decrypted_transport(&mut self, addr: Endpoint, orig_packet: Transport, raw_packet: Vec, session_type: SessionType) - -> Result<(), Error> - { - let peer_ref = self.shared_state.borrow().index_map.get(&orig_packet.our_index()) - .ok_or_else(|| err_msg("unknown our_index"))?.clone(); - - let needs_handshake = { + let (raw_packet, needs_handshake) = { let mut peer = peer_ref.borrow_mut(); - let transition = peer.handle_incoming_decrypted_transport(addr, &raw_packet, session_type)?; let mut state = self.shared_state.borrow_mut(); + let (raw_packet, transition) = peer.handle_incoming_transport(addr, &packet)?; + if let SessionTransition::Transition(possible_dead_index) = transition { if let Some(index) = possible_dead_index { let _ = state.index_map.remove(&index); } - let outgoing: Vec = peer.outgoing_queue.drain(..).collect(); + let outgoing: Vec = peer.outgoing_queue.drain(..).collect(); for packet in outgoing { - match peer.handle_outgoing_transport(packet.payload()) { + match peer.handle_outgoing_transport(packet) { Ok(message) => self.send_to_peer(message)?, Err(e) => warn!("failed to encrypt packet: {}", e) } @@ -333,7 +310,7 @@ impl PeerServer { self.timer.send_after(*WIPE_AFTER_TIME, TimerMessage::Wipe(Rc::downgrade(&peer_ref))); } - peer.needs_new_handshake(false) + (raw_packet, peer.needs_new_handshake(false)) }; if needs_handshake { @@ -352,10 +329,10 @@ impl PeerServer { Ok(()) } - fn handle_egress_packet(&mut self, packet: UtunPacket) -> Result<(), Error> { - ensure!(!packet.payload().is_empty() && packet.payload().len() <= MAX_CONTENT_SIZE, "egress packet outside of size bounds"); + fn handle_egress_packet(&mut self, packet: PacketVec) -> Result<(), Error> { + ensure!(!packet.is_empty() && packet.len() <= MAX_CONTENT_SIZE, "egress packet outside of size bounds"); - let peer_ref = self.shared_state.borrow_mut().router.route_to_peer(packet.payload()) + let peer_ref = self.shared_state.borrow_mut().router.route_to_peer(&packet) .ok_or_else(|| err_msg("no route to peer"))?; let needs_handshake = { @@ -369,7 +346,7 @@ impl PeerServer { } while let Some(packet) = peer.outgoing_queue.pop_front() { - self.send_to_peer(peer.handle_outgoing_transport(packet.payload())?)?; + self.send_to_peer(peer.handle_outgoing_transport(packet)?)?; } } @@ -488,7 +465,7 @@ impl PeerServer { } } - self.send_to_peer(peer.handle_outgoing_transport(&[])?)?; + self.send_to_peer(peer.handle_outgoing_transport(vec![])?)?; debug!("sent passive keepalive packet"); self.timer.send_after(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone())); @@ -506,7 +483,7 @@ impl PeerServer { bail!("persistent keepalive tick (waiting ~{}s due to last authenticated packet time)", wait.as_secs()); } - self.send_to_peer(peer.handle_outgoing_transport(&[])?)?; + self.send_to_peer(peer.handle_outgoing_transport(vec![])?)?; let handle = self.timer.send_after(persistent_keepalive, PersistentKeepAlive(peer_ref.clone())); peer.timers.persistent_timer = Some(handle); debug!("sent persistent keepalive packet"); @@ -564,7 +541,7 @@ impl PeerServer { if let Some(keepalive) = peer.info.persistent_keepalive() { let handle = self.timer.send_after(keepalive, TimerMessage::PersistentKeepAlive(Rc::downgrade(&peer_ref))); peer.timers.persistent_timer = Some(handle); - self.send_to_peer(peer.handle_outgoing_transport(&[])?)?; + self.send_to_peer(peer.handle_outgoing_transport(vec![])?)?; debug!("set new keepalive timer and immediately sent new keepalive packet."); } } @@ -625,18 +602,6 @@ impl Future for PeerServer { } } - loop { - // Handle UDP packets from the outside world - match self.decrypt_channel.rx.poll() { - Ok(Async::Ready(Some((addr, orig_packet, decrypted, session_type)))) => { - let _ = self.handle_ingress_decrypted_transport(addr, orig_packet, decrypted, session_type).map_err(|e| warn!("UDP ERR: {:?}", e)); - }, - Ok(Async::NotReady) => { break; }, - Ok(Async::Ready(None)) => bail!("incoming udp stream ended unexpectedly"), - Err(e) => bail!("incoming udp stream error: {:?}", e) - } - } - loop { // Handle packets coming from the local tunnel match self.outgoing.rx.poll() { diff --git a/src/lib.rs b/src/lib.rs index 14e1f08..1187f59 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,12 +21,12 @@ extern crate blake2_rfc; extern crate byteorder; extern crate bytes; extern crate chacha20_poly1305_aead; -extern crate futures_cpupool; extern crate hex; extern crate libc; extern crate mio; extern crate nix; extern crate notify; +extern crate rayon; extern crate rand; extern crate rips_packets; extern crate snow; diff --git a/src/message.rs b/src/message.rs index 6a2b645..3a0d778 100644 --- a/src/message.rs +++ b/src/message.rs @@ -3,11 +3,12 @@ use failure::Error; use std::convert::{TryFrom, TryInto}; use byteorder::{ByteOrder, LittleEndian}; +use types::PacketVec; -#[derive(Deref, DerefMut)] pub struct Initiation(Vec); -#[derive(Deref, DerefMut)] pub struct Response(Vec); -#[derive(Deref, DerefMut)] pub struct CookieReply(Vec); -#[derive(Deref, DerefMut)] pub struct Transport(Vec); +#[derive(Deref, DerefMut)] pub struct Initiation(PacketVec); +#[derive(Deref, DerefMut)] pub struct Response(PacketVec); +#[derive(Deref, DerefMut)] pub struct CookieReply(PacketVec); +#[derive(Deref, DerefMut)] pub struct Transport(PacketVec); pub enum Message { Initiation(Initiation), @@ -16,10 +17,10 @@ pub enum Message { Transport(Transport), } -impl TryFrom> for Message { +impl TryFrom for Message { type Error = Error; - fn try_from(packet: Vec) -> Result { + fn try_from(packet: PacketVec) -> Result { Ok(match packet[0] { 1 => Message::Initiation(packet.try_into()?), 2 => Message::Response(packet.try_into()?), @@ -48,10 +49,10 @@ impl Initiation { } } -impl TryFrom> for Initiation { +impl TryFrom for Initiation { type Error = Error; - fn try_from(packet: Vec) -> Result { + fn try_from(packet: PacketVec) -> Result { ensure!(packet.len() == 148, "incorrect handshake initiation packet length."); Ok(Initiation(packet)) } @@ -83,10 +84,10 @@ impl Response { } } -impl TryFrom> for Response { +impl TryFrom for Response { type Error = Error; - fn try_from(packet: Vec) -> Result { + fn try_from(packet: PacketVec) -> Result { ensure!(packet.len() == 92, "incorrect handshake response packet length."); Ok(Response(packet)) } @@ -133,10 +134,10 @@ impl CookieReply { } } -impl TryFrom> for CookieReply { +impl TryFrom for CookieReply { type Error = Error; - fn try_from(packet: Vec) -> Result { + fn try_from(packet: PacketVec) -> Result { ensure!(packet.len() == 64, "incorrect cookie reply packet length."); Ok(CookieReply(packet)) } @@ -160,11 +161,17 @@ impl Transport { } } -impl TryFrom> for Transport { +impl TryFrom for Transport { type Error = Error; - fn try_from(packet: Vec) -> Result { + fn try_from(packet: PacketVec) -> Result { ensure!(packet.len() >= 32, "transport message smaller than minimum length."); Ok(Transport(packet)) } } + +impl From for PacketVec { + fn from(packet: Transport) -> PacketVec { + packet.0 + } +} \ No newline at end of file diff --git a/src/peer.rs b/src/peer.rs index c0b21eb..a2d3a0d 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -1,26 +1,25 @@ use anti_replay::AntiReplay; -use byteorder::{ByteOrder, LittleEndian}; -use consts::{TRANSPORT_OVERHEAD, TRANSPORT_HEADER_SIZE, REKEY_AFTER_MESSAGES, REKEY_AFTER_TIME, - REKEY_AFTER_TIME_RECV, REJECT_AFTER_TIME, REJECT_AFTER_MESSAGES, PADDING_MULTIPLE, - MAX_QUEUED_PACKETS, MAX_HANDSHAKE_ATTEMPTS}; +use consts::*; use cookie; -use failure::{Error, err_msg}; -use futures::{Future, future}; -use interface::UtunPacket; use ip_packet::IpPacket; use noise; use message::{Initiation, Response, CookieReply, Transport}; -use std::{self, mem}; -use std::collections::VecDeque; -use std::fmt::{self, Debug, Display, Formatter}; -use std::time::{SystemTime, UNIX_EPOCH}; -use hex; use timer::TimerHandle; use timestamp::{Tai64n, Timestamp}; -use snow; -use types::PeerInfo; +use types::{PacketVec, PeerInfo}; use udp::Endpoint; +use byteorder::{ByteOrder, LittleEndian}; +use failure::{Error, err_msg}; +use hex; +use rayon::prelude::*; +use std::{self, fmt, mem, + collections::VecDeque, + iter::Iterator, + fmt::{Debug, Display, Formatter}, + time::{SystemTime, UNIX_EPOCH}}; +use snow; + pub struct Peer { pub info : PeerInfo, pub sessions : Sessions, @@ -28,7 +27,7 @@ pub struct Peer { pub tx_bytes : u64, pub rx_bytes : u64, pub last_handshake_tai64n : Option, - pub outgoing_queue : VecDeque, + pub outgoing_queue : VecDeque, pub cookie : cookie::Generator, } @@ -176,7 +175,7 @@ impl Peer { } } - pub fn queue_egress(&mut self, packet: UtunPacket) { + pub fn queue_egress(&mut self, packet: PacketVec) { if self.outgoing_queue.len() < MAX_QUEUED_PACKETS { self.outgoing_queue.push_back(packet); self.timers.handshake_attempts = 0; @@ -339,31 +338,56 @@ impl Peer { Ok(dead.map(|session| session.our_index)) } - pub fn handle_incoming_transport(&mut self, addr: Endpoint, packet: Transport) - -> Result, SessionType), Error = Error> + 'static + Send>, Error> - { + pub fn handle_incoming_transport(&mut self, addr: Endpoint, packet: &Transport) + -> Result<(Vec, SessionTransition), Error> { + let mut raw_packet = vec![0u8; packet.len()]; let nonce = packet.nonce(); - let (session, session_type) = self.find_session(packet.our_index()).ok_or_else(|| err_msg("no session with index"))?; - ensure!(session.noise.is_handshake_finished(), "session is not ready for transport packets"); - ensure!(nonce < REJECT_AFTER_MESSAGES, "exceeded REJECT-AFTER-MESSAGES"); - ensure!(session.birthday.elapsed() < *REJECT_AFTER_TIME, "exceeded REJECT-AFTER-TIME"); + let session_type = { + let (session, session_type) = self.find_session(packet.our_index()).ok_or_else(|| err_msg("no session with index"))?; + ensure!(session.noise.is_handshake_finished(), "session is not ready for transport packets"); + ensure!(nonce < REJECT_AFTER_MESSAGES, "exceeded REJECT-AFTER-MESSAGES"); + ensure!(session.birthday.elapsed() < *REJECT_AFTER_TIME, "exceeded REJECT-AFTER-TIME"); - session.anti_replay.update(nonce)?; - let mut transport = session.noise.get_async_transport_state()?.clone(); - Ok(Box::new(future::lazy(move || { - let len = transport.read_transport_message(nonce, packet.payload(), &mut raw_packet).unwrap(); + session.anti_replay.update(nonce)?; + let len = session.noise.read_async_message(nonce, packet.payload(), &mut raw_packet)?; if len > 0 { let len = IpPacket::new(&raw_packet[..len]) - .ok_or_else(||format_err!("invalid IP packet (len {})", len)).unwrap() + .ok_or_else(||format_err!("invalid IP packet (len {})", len))? .length(); raw_packet.truncate(len as usize); } else { raw_packet.truncate(0); } - Ok((addr, packet, raw_packet, session_type)) - }))) + + session_type + }; + + if !raw_packet.is_empty() { + self.timers.data_received = Timestamp::now(); + } + self.timers.authenticated_received = Timestamp::now(); + self.timers.authenticated_traversed = Timestamp::now(); + self.timers.keepalive_sent = false; // reset passive keepalive token since received a valid ingress transport + + let transition = if session_type == SessionType::Next { + debug!("moving 'next' session to current after receiving first transport packet"); + let next = std::mem::replace(&mut self.sessions.next, None); + let current = std::mem::replace(&mut self.sessions.current, next); + let dead = std::mem::replace(&mut self.sessions.past, current); + + self.timers.handshake_completed = Timestamp::now(); + + SessionTransition::Transition(dead.map(|session| session.our_index)) + } else { + SessionTransition::NoTransition + }; + + self.rx_bytes += packet.len() as u64; + self.info.endpoint = Some(addr); // update peer endpoint after successful authentication + + Ok((raw_packet, transition)) } pub fn handle_incoming_decrypted_transport(&mut self, addr: Endpoint, raw_packet: &[u8], session_type: SessionType) @@ -395,33 +419,59 @@ impl Peer { Ok(transition) } - pub fn handle_outgoing_transport(&mut self, packet: &[u8]) -> Result<(Endpoint, Vec), Error> { - let session = self.sessions.current.as_mut().ok_or_else(|| err_msg("no current noise session"))?; - let endpoint = self.info.endpoint.ok_or_else(|| err_msg("no known peer endpoint"))?; - let padding = if packet.len() % PADDING_MULTIPLE != 0 { - PADDING_MULTIPLE - (packet.len() % PADDING_MULTIPLE) - } else { 0 }; - let padded_len = packet.len() + padding; - let mut out_packet = vec![0u8; padded_len + TRANSPORT_OVERHEAD]; - - ensure!(session.nonce < REJECT_AFTER_MESSAGES, "exceeded REJECT-AFTER-MESSAGES"); - ensure!(session.birthday.elapsed() < *REJECT_AFTER_TIME, "exceeded REJECT-AFTER-TIME"); - - out_packet[0] = 4; - LittleEndian::write_u32(&mut out_packet[4..], session.their_index); - LittleEndian::write_u64(&mut out_packet[8..], session.nonce); - let padded_packet = &[packet, &vec![0u8; padding]].concat(); - let len = session.noise.write_async_message(session.nonce, padded_packet, &mut out_packet[16..])?; - session.nonce += 1; - self.tx_bytes += len as u64; - - if !packet.is_empty() { - self.timers.data_sent = Timestamp::now(); - } - self.timers.authenticated_traversed = Timestamp::now(); + pub fn handle_outgoing_transport(&mut self, packet: T) -> Result<(Endpoint, PacketVec), Error> + where T: Into + { + let (endpoint, mut packets) = self.handle_outgoing_transports(vec![packet.into()])?; + Ok((endpoint, packets.remove(0))) + } - out_packet.truncate(TRANSPORT_HEADER_SIZE + len); - Ok((endpoint, out_packet)) + pub fn handle_outgoing_transports(&mut self, packets: T) -> Result<(Endpoint, Vec), Error> + where T: IntoIterator + { + let session = self.sessions.current.as_mut().ok_or_else(|| err_msg("no current noise session"))?; + let endpoint = self.info.endpoint.ok_or_else(|| err_msg("no known peer endpoint"))?; + + ensure!(session.birthday.elapsed() < *REJECT_AFTER_TIME, "exceeded REJECT-AFTER-TIME"); + + let transport = session.noise.get_async_transport_state()?.clone(); + let encrypted_packets = packets.into_iter() + .filter_map(|mut packet| { + if session.nonce > REJECT_AFTER_MESSAGES { + warn!("exceeded REJECT-AFTER-MESSAGES"); + None + } else { + let padding = if packet.len() % PADDING_MULTIPLE != 0 { + PADDING_MULTIPLE - (packet.len() % PADDING_MULTIPLE) + } else { 0 }; + let padded_len = packet.len() + padding; + let mut out_packet = vec![0u8; padded_len + TRANSPORT_OVERHEAD]; + packet.resize(padded_len, 0); + + out_packet[0] = 4; + LittleEndian::write_u32(&mut out_packet[4..], session.their_index); + LittleEndian::write_u64(&mut out_packet[8..], session.nonce); + session.nonce += 1; + Some((session.nonce - 1, packet, out_packet)) + } + }) + .collect::>() + .into_par_iter() + .map_with(transport, |transport, (nonce, in_packet, mut out_packet)| { + let len = transport.write_transport_message(nonce, &in_packet, &mut out_packet[16..]).unwrap(); + out_packet.truncate(TRANSPORT_HEADER_SIZE + len); + out_packet + }) + .collect::>(); + + // self.tx_bytes += len as u64; + + // if !packet.is_empty() { + // self.timers.data_sent = Timestamp::now(); + // } + // self.timers.authenticated_traversed = Timestamp::now(); + + Ok((endpoint, encrypted_packets)) } pub fn to_config_string(&self) -> String { diff --git a/src/types.rs b/src/types.rs index 77a104d..2b569e7 100644 --- a/src/types.rs +++ b/src/types.rs @@ -4,6 +4,8 @@ use std::net::IpAddr; use std::time::Duration; use udp::Endpoint; +pub type PacketVec = Vec; + #[derive(Clone, Debug, Default)] pub struct PeerInfo { pub pub_key: [u8; 32], -- cgit v1.2.3-59-g8ed1b