aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock69
-rw-r--r--Cargo.toml2
-rw-r--r--src/interface/mod.rs34
-rw-r--r--src/interface/peer_server.rs75
-rw-r--r--src/lib.rs2
-rw-r--r--src/message.rs35
-rw-r--r--src/peer.rs162
-rw-r--r--src/types.rs2
8 files changed, 213 insertions, 168 deletions
diff --git a/Cargo.lock b/Cargo.lock
index a3bfb0c..5eb4250 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -230,6 +230,15 @@ dependencies = [
[[package]]
name = "crossbeam-deque"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "crossbeam-epoch 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "crossbeam-utils 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "crossbeam-deque"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
@@ -239,6 +248,20 @@ dependencies = [
[[package]]
name = "crossbeam-epoch"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "arrayvec 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)",
+ "cfg-if 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
+ "crossbeam-utils 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
+ "lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "memoffset 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "nodrop 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
+ "scopeguard 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "crossbeam-epoch"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
@@ -252,6 +275,14 @@ dependencies = [
[[package]]
name = "crossbeam-utils"
+version = "0.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "cfg-if 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "crossbeam-utils"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
@@ -382,15 +413,6 @@ version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
-name = "futures-cpupool"
-version = "0.1.8"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-dependencies = [
- "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
- "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
-]
-
-[[package]]
name = "gcc"
version = "0.3.54"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -791,6 +813,27 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
+name = "rayon"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "either 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "rayon-core 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "rayon-core"
+version = "1.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "crossbeam-deque 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "libc 0.2.42 (registry+https://github.com/rust-lang/crates.io-index)",
+ "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
name = "redox_syscall"
version = "0.1.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1393,7 +1436,6 @@ dependencies = [
"failure 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"fern 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
- "futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
"hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.42 (git+https://github.com/rust-lang/libc)",
@@ -1402,6 +1444,7 @@ dependencies = [
"nix 0.11.0-pre (git+https://github.com/mcginty/nix?branch=ipv6-pktinfo)",
"notify 4.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.5.0-pre.2 (registry+https://github.com/rust-lang/crates.io-index)",
+ "rayon 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"rips-packets 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"snow 0.1.12 (git+https://github.com/mcginty/snow?branch=wireguard)",
"socket2 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -1465,8 +1508,11 @@ dependencies = [
"checksum criterion 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "4f11151e2961d0483e5eb7a2ede5ed8071a460d04d2b7c89e8257aa5502b0e0b"
"checksum criterion-plot 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "9f7f7c88a8d341dd9fd9e31a72ca2ca24428db79afb491852873b2c784e037e6"
"checksum criterion-stats 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "dd48feb0253b2968ff3085e7f3fba6738c9ff859f420a2fb81a48986eb66da36"
+"checksum crossbeam-deque 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f739f8c5363aca78cfb059edf753d8f0d36908c348f3d8d1503f03d8b75d9cf3"
"checksum crossbeam-deque 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "fe8153ef04a7594ded05b427ffad46ddeaf22e63fd48d42b3e1e3bb4db07cae7"
+"checksum crossbeam-epoch 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "927121f5407de9956180ff5e936fe3cf4324279280001cd56b669d28ee7e9150"
"checksum crossbeam-epoch 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9b4e2817eb773f770dcb294127c011e22771899c21d18fce7dd739c0b9832e81"
+"checksum crossbeam-utils 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "2760899e32a1d58d5abb31129f8fae5de75220bc2176e77ff7c627ae45c918d9"
"checksum crossbeam-utils 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d636a8b3bcc1b409d7ffd3facef8f21dcb4009626adbd0c5e6c4305c07253c7b"
"checksum curve25519-dalek 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e5808ccadbb61565fd184702be128ac43a6a33561bcd976a0d7a388b06ad696d"
"checksum daemonize 0.2.3 (git+https://github.com/mcginty/daemonize)" = "<none>"
@@ -1483,7 +1529,6 @@ dependencies = [
"checksum fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82"
"checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
"checksum futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)" = "1a70b146671de62ec8c8ed572219ca5d594d9b06c0b364d5e67b722fc559b48c"
-"checksum futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "ab90cde24b3319636588d0c35fe03b1333857621051837ed769faefb4c2162e4"
"checksum gcc 0.3.54 (registry+https://github.com/rust-lang/crates.io-index)" = "5e33ec290da0d127825013597dbdfc28bee4964690c7ce1166cbc2a7bd08b1bb"
"checksum generic-array 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ef25c5683767570c2bbd7deba372926a55eaae9982d7726ee2a1050239d45b9d"
"checksum handlebars 0.31.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e7bdb08e879b8c78ee90f5022d121897c31ea022cb0cc6d13f2158c7a9fbabb1"
@@ -1531,6 +1576,8 @@ dependencies = [
"checksum rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "eba5f8cb59cc50ed56be8880a5c7b496bfd9bd26394e176bc67884094145c2c5"
"checksum rand 0.5.0-pre.2 (registry+https://github.com/rust-lang/crates.io-index)" = "3795e4701d9628a63a84d0289e66279883b40df165fca7caed7b87122447032a"
"checksum rand_core 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1b7a5f27547c49e5ccf8a586db3f3782fd93cf849780b21853b9d981db203302"
+"checksum rayon 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "80e811e76f1dbf68abf87a759083d34600017fc4e10b6bd5ad84a700f9dba4b1"
+"checksum rayon-core 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9d24ad214285a7729b174ed6d3bcfcb80177807f959d95fafd5bfc5c4f201ac8"
"checksum redox_syscall 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)" = "c214e91d3ecf43e9a4e41e578973adeb14b474f2bee858742d127af75a0112b1"
"checksum redox_termios 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7e891cfe48e9100a70a3b6eb652fef28920c117d366339687bd5576160db0f76"
"checksum regex 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "9329abc99e39129fcceabd24cf5d85b4671ef7c29c50e972bc5afe32438ec384"
diff --git a/Cargo.toml b/Cargo.toml
index 4bdba9c..0a6064b 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -37,7 +37,6 @@ chacha20-poly1305-aead = "^0.1"
derive_deref = "^1.0"
failure = "^0.1"
futures = "^0.1"
-futures-cpupool = "^0.1"
lazy_static = "^1"
libc = { git = "https://github.com/rust-lang/libc" }
log = "^0.4"
@@ -47,6 +46,7 @@ rand = "0.5.0-pre.2"
nix = { git = "https://github.com/mcginty/nix", branch = "ipv6-pktinfo" }
mio = "^0.6"
rips-packets = "0.1"
+rayon = "^1.0"
snow = { git = "https://github.com/mcginty/snow", branch = "wireguard" }
socket2 = "^0.3"
subtle = "^0.6"
diff --git a/src/interface/mod.rs b/src/interface/mod.rs
index 7036a09..6d0fa85 100644
--- a/src/interface/mod.rs
+++ b/src/interface/mod.rs
@@ -12,7 +12,7 @@ use std::io;
use std::rc::{Rc, Weak};
use std::cell::RefCell;
use std::collections::HashMap;
-use types::{InterfaceInfo};
+use types::{InterfaceInfo, PacketVec};
use rips_packets::ipv4::Ipv4Packet;
@@ -44,39 +44,13 @@ pub struct Interface {
}
struct VecUtunCodec;
-pub enum UtunPacket {
- Inet4(Vec<u8>),
- Inet6(Vec<u8>),
-}
-
-impl UtunPacket {
- pub fn payload(&self) -> &[u8] {
- use self::UtunPacket::*;
- match *self {
- Inet4(ref payload) | Inet6(ref payload) => payload,
- }
- }
-
- pub fn from(raw_packet: Vec<u8>) -> Result<UtunPacket, Error> {
- match raw_packet[0] >> 4 {
- 4 => Ok(UtunPacket::Inet4(raw_packet)),
- 6 => Ok(UtunPacket::Inet6(raw_packet)),
- _ => bail!("unrecognized IP version")
- }
- }
-}
impl UtunCodec for VecUtunCodec {
- type In = UtunPacket;
- type Out = Vec<u8>;
+ type In = PacketVec;
+ type Out = PacketVec;
fn decode(&mut self, buf: &[u8]) -> io::Result<Self::In> {
- trace!("utun packet type {}", buf[3]);
- match buf[4] >> 4 {
- 4 => Ok(UtunPacket::Inet4(buf[4..].to_vec())),
- 6 => Ok(UtunPacket::Inet6(buf[4..].to_vec())),
- _ => Err(io::ErrorKind::InvalidData.into())
- }
+ Ok(buf[4..].into())
}
fn encode(&mut self, mut msg: Self::Out, buf: &mut Vec<u8>) {
diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs
index d03fa58..ee40061 100644
--- a/src/interface/peer_server.rs
+++ b/src/interface/peer_server.rs
@@ -2,7 +2,7 @@ use consts::{REKEY_TIMEOUT, KEEPALIVE_TIMEOUT, STALE_SESSION_TIMEOUT,
MAX_CONTENT_SIZE, WIPE_AFTER_TIME, MAX_HANDSHAKE_ATTEMPTS,
UNDER_LOAD_QUEUE_SIZE, UNDER_LOAD_TIME};
use cookie;
-use interface::{SharedPeer, SharedState, State, UtunPacket};
+use interface::{SharedPeer, SharedState, State};
use message::{Message, Initiation, Response, CookieReply, Transport};
use peer::{Peer, SessionType, SessionTransition};
use ratelimiter::RateLimiter;
@@ -12,10 +12,10 @@ use timer::{Timer, TimerMessage};
use byteorder::{ByteOrder, LittleEndian};
use failure::{Error, err_msg};
use futures::{Async, Future, Stream, Poll, unsync::mpsc, task};
-use futures_cpupool::CpuPool;
use rand::{self, Rng, ThreadRng};
use udp::{Endpoint, UdpSocket, PeerServerMessage, UdpChannel};
use tokio_core::reactor::Handle;
+use types::PacketVec;
use std::collections::VecDeque;
use std::convert::TryInto;
@@ -51,17 +51,15 @@ pub struct PeerServer {
shared_state : SharedState,
udp : Option<UdpChannel>,
port : Option<u16>,
- outgoing : Channel<UtunPacket>,
+ outgoing : Channel<PacketVec>,
channel : Channel<ChannelMessage>,
handshakes : VecDeque<(Endpoint, Message)>,
timer : Timer,
- tunnel_tx : mpsc::UnboundedSender<Vec<u8>>,
+ tunnel_tx : mpsc::UnboundedSender<PacketVec>,
cookie : cookie::Validator,
rate_limiter : RateLimiter,
under_load_until : Instant,
rng : ThreadRng,
- cpu_pool : CpuPool,
- decrypt_channel : Channel<(Endpoint, Transport, Vec<u8>, SessionType)>,
}
impl PeerServer {
@@ -79,8 +77,6 @@ impl PeerServer {
rate_limiter : RateLimiter::new(&handle)?,
under_load_until : Instant::now(),
rng : rand::thread_rng(),
- cpu_pool : CpuPool::new_num_cpus(),
- decrypt_channel : mpsc::unbounded().into(),
})
}
@@ -116,7 +112,7 @@ impl PeerServer {
Ok(())
}
- pub fn tunnel_tx(&self) -> mpsc::UnboundedSender<UtunPacket> {
+ pub fn tunnel_tx(&self) -> mpsc::UnboundedSender<PacketVec> {
self.outgoing.tx.clone()
}
@@ -266,11 +262,11 @@ impl PeerServer {
if !peer.outgoing_queue.is_empty() {
debug!("sending {} queued egress packets", peer.outgoing_queue.len());
while let Some(packet) = peer.outgoing_queue.pop_front() {
- self.send_to_peer(peer.handle_outgoing_transport(packet.payload())?)?;
+ self.send_to_peer(peer.handle_outgoing_transport(packet)?)?;
}
} else {
debug!("sending empty keepalive");
- self.send_to_peer(peer.handle_outgoing_transport(&[])?)?;
+ self.send_to_peer(peer.handle_outgoing_transport(vec![])?)?;
}
} else {
error!("peer not ready for transport after processing handshake response. this shouldn't happen.");
@@ -290,42 +286,23 @@ impl PeerServer {
}
fn handle_ingress_transport(&mut self, addr: Endpoint, packet: Transport) -> Result<(), Error> {
-
let peer_ref = self.shared_state.borrow().index_map.get(&packet.our_index())
.ok_or_else(|| err_msg("unknown our_index"))?.clone();
- let mut peer = peer_ref.borrow_mut();
- let tx = self.decrypt_channel.tx.clone();
- let f = self.cpu_pool.spawn(peer.handle_incoming_transport(addr, packet)?)
- .and_then(move |result| {
- tx.unbounded_send(result).expect("broken decrypt channel");
- Ok(())
- })
- .map_err(|e| warn!("{:?}", e));
- self.handle.spawn(f);
-
- Ok(())
- }
-
- fn handle_ingress_decrypted_transport(&mut self, addr: Endpoint, orig_packet: Transport, raw_packet: Vec<u8>, session_type: SessionType)
- -> Result<(), Error>
- {
- let peer_ref = self.shared_state.borrow().index_map.get(&orig_packet.our_index())
- .ok_or_else(|| err_msg("unknown our_index"))?.clone();
-
- let needs_handshake = {
+ let (raw_packet, needs_handshake) = {
let mut peer = peer_ref.borrow_mut();
- let transition = peer.handle_incoming_decrypted_transport(addr, &raw_packet, session_type)?;
let mut state = self.shared_state.borrow_mut();
+ let (raw_packet, transition) = peer.handle_incoming_transport(addr, &packet)?;
+
if let SessionTransition::Transition(possible_dead_index) = transition {
if let Some(index) = possible_dead_index {
let _ = state.index_map.remove(&index);
}
- let outgoing: Vec<UtunPacket> = peer.outgoing_queue.drain(..).collect();
+ let outgoing: Vec<PacketVec> = peer.outgoing_queue.drain(..).collect();
for packet in outgoing {
- match peer.handle_outgoing_transport(packet.payload()) {
+ match peer.handle_outgoing_transport(packet) {
Ok(message) => self.send_to_peer(message)?,
Err(e) => warn!("failed to encrypt packet: {}", e)
}
@@ -333,7 +310,7 @@ impl PeerServer {
self.timer.send_after(*WIPE_AFTER_TIME, TimerMessage::Wipe(Rc::downgrade(&peer_ref)));
}
- peer.needs_new_handshake(false)
+ (raw_packet, peer.needs_new_handshake(false))
};
if needs_handshake {
@@ -352,10 +329,10 @@ impl PeerServer {
Ok(())
}
- fn handle_egress_packet(&mut self, packet: UtunPacket) -> Result<(), Error> {
- ensure!(!packet.payload().is_empty() && packet.payload().len() <= MAX_CONTENT_SIZE, "egress packet outside of size bounds");
+ fn handle_egress_packet(&mut self, packet: PacketVec) -> Result<(), Error> {
+ ensure!(!packet.is_empty() && packet.len() <= MAX_CONTENT_SIZE, "egress packet outside of size bounds");
- let peer_ref = self.shared_state.borrow_mut().router.route_to_peer(packet.payload())
+ let peer_ref = self.shared_state.borrow_mut().router.route_to_peer(&packet)
.ok_or_else(|| err_msg("no route to peer"))?;
let needs_handshake = {
@@ -369,7 +346,7 @@ impl PeerServer {
}
while let Some(packet) = peer.outgoing_queue.pop_front() {
- self.send_to_peer(peer.handle_outgoing_transport(packet.payload())?)?;
+ self.send_to_peer(peer.handle_outgoing_transport(packet)?)?;
}
}
@@ -488,7 +465,7 @@ impl PeerServer {
}
}
- self.send_to_peer(peer.handle_outgoing_transport(&[])?)?;
+ self.send_to_peer(peer.handle_outgoing_transport(vec![])?)?;
debug!("sent passive keepalive packet");
self.timer.send_after(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone()));
@@ -506,7 +483,7 @@ impl PeerServer {
bail!("persistent keepalive tick (waiting ~{}s due to last authenticated packet time)", wait.as_secs());
}
- self.send_to_peer(peer.handle_outgoing_transport(&[])?)?;
+ self.send_to_peer(peer.handle_outgoing_transport(vec![])?)?;
let handle = self.timer.send_after(persistent_keepalive, PersistentKeepAlive(peer_ref.clone()));
peer.timers.persistent_timer = Some(handle);
debug!("sent persistent keepalive packet");
@@ -564,7 +541,7 @@ impl PeerServer {
if let Some(keepalive) = peer.info.persistent_keepalive() {
let handle = self.timer.send_after(keepalive, TimerMessage::PersistentKeepAlive(Rc::downgrade(&peer_ref)));
peer.timers.persistent_timer = Some(handle);
- self.send_to_peer(peer.handle_outgoing_transport(&[])?)?;
+ self.send_to_peer(peer.handle_outgoing_transport(vec![])?)?;
debug!("set new keepalive timer and immediately sent new keepalive packet.");
}
}
@@ -626,18 +603,6 @@ impl Future for PeerServer {
}
loop {
- // Handle UDP packets from the outside world
- match self.decrypt_channel.rx.poll() {
- Ok(Async::Ready(Some((addr, orig_packet, decrypted, session_type)))) => {
- let _ = self.handle_ingress_decrypted_transport(addr, orig_packet, decrypted, session_type).map_err(|e| warn!("UDP ERR: {:?}", e));
- },
- Ok(Async::NotReady) => { break; },
- Ok(Async::Ready(None)) => bail!("incoming udp stream ended unexpectedly"),
- Err(e) => bail!("incoming udp stream error: {:?}", e)
- }
- }
-
- loop {
// Handle packets coming from the local tunnel
match self.outgoing.rx.poll() {
Ok(Async::Ready(Some(packet))) => {
diff --git a/src/lib.rs b/src/lib.rs
index 14e1f08..1187f59 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -21,12 +21,12 @@ extern crate blake2_rfc;
extern crate byteorder;
extern crate bytes;
extern crate chacha20_poly1305_aead;
-extern crate futures_cpupool;
extern crate hex;
extern crate libc;
extern crate mio;
extern crate nix;
extern crate notify;
+extern crate rayon;
extern crate rand;
extern crate rips_packets;
extern crate snow;
diff --git a/src/message.rs b/src/message.rs
index 6a2b645..3a0d778 100644
--- a/src/message.rs
+++ b/src/message.rs
@@ -3,11 +3,12 @@
use failure::Error;
use std::convert::{TryFrom, TryInto};
use byteorder::{ByteOrder, LittleEndian};
+use types::PacketVec;
-#[derive(Deref, DerefMut)] pub struct Initiation(Vec<u8>);
-#[derive(Deref, DerefMut)] pub struct Response(Vec<u8>);
-#[derive(Deref, DerefMut)] pub struct CookieReply(Vec<u8>);
-#[derive(Deref, DerefMut)] pub struct Transport(Vec<u8>);
+#[derive(Deref, DerefMut)] pub struct Initiation(PacketVec);
+#[derive(Deref, DerefMut)] pub struct Response(PacketVec);
+#[derive(Deref, DerefMut)] pub struct CookieReply(PacketVec);
+#[derive(Deref, DerefMut)] pub struct Transport(PacketVec);
pub enum Message {
Initiation(Initiation),
@@ -16,10 +17,10 @@ pub enum Message {
Transport(Transport),
}
-impl TryFrom<Vec<u8>> for Message {
+impl TryFrom<PacketVec> for Message {
type Error = Error;
- fn try_from(packet: Vec<u8>) -> Result<Self, Self::Error> {
+ fn try_from(packet: PacketVec) -> Result<Self, Self::Error> {
Ok(match packet[0] {
1 => Message::Initiation(packet.try_into()?),
2 => Message::Response(packet.try_into()?),
@@ -48,10 +49,10 @@ impl Initiation {
}
}
-impl TryFrom<Vec<u8>> for Initiation {
+impl TryFrom<PacketVec> for Initiation {
type Error = Error;
- fn try_from(packet: Vec<u8>) -> Result<Self, Self::Error> {
+ fn try_from(packet: PacketVec) -> Result<Self, Self::Error> {
ensure!(packet.len() == 148, "incorrect handshake initiation packet length.");
Ok(Initiation(packet))
}
@@ -83,10 +84,10 @@ impl Response {
}
}
-impl TryFrom<Vec<u8>> for Response {
+impl TryFrom<PacketVec> for Response {
type Error = Error;
- fn try_from(packet: Vec<u8>) -> Result<Self, Self::Error> {
+ fn try_from(packet: PacketVec) -> Result<Self, Self::Error> {
ensure!(packet.len() == 92, "incorrect handshake response packet length.");
Ok(Response(packet))
}
@@ -133,10 +134,10 @@ impl CookieReply {
}
}
-impl TryFrom<Vec<u8>> for CookieReply {
+impl TryFrom<PacketVec> for CookieReply {
type Error = Error;
- fn try_from(packet: Vec<u8>) -> Result<Self, Self::Error> {
+ fn try_from(packet: PacketVec) -> Result<Self, Self::Error> {
ensure!(packet.len() == 64, "incorrect cookie reply packet length.");
Ok(CookieReply(packet))
}
@@ -160,11 +161,17 @@ impl Transport {
}
}
-impl TryFrom<Vec<u8>> for Transport {
+impl TryFrom<PacketVec> for Transport {
type Error = Error;
- fn try_from(packet: Vec<u8>) -> Result<Self, Self::Error> {
+ fn try_from(packet: PacketVec) -> Result<Self, Self::Error> {
ensure!(packet.len() >= 32, "transport message smaller than minimum length.");
Ok(Transport(packet))
}
}
+
+impl From<Transport> for PacketVec {
+ fn from(packet: Transport) -> PacketVec {
+ packet.0
+ }
+} \ No newline at end of file
diff --git a/src/peer.rs b/src/peer.rs
index c0b21eb..a2d3a0d 100644
--- a/src/peer.rs
+++ b/src/peer.rs
@@ -1,26 +1,25 @@
use anti_replay::AntiReplay;
-use byteorder::{ByteOrder, LittleEndian};
-use consts::{TRANSPORT_OVERHEAD, TRANSPORT_HEADER_SIZE, REKEY_AFTER_MESSAGES, REKEY_AFTER_TIME,
- REKEY_AFTER_TIME_RECV, REJECT_AFTER_TIME, REJECT_AFTER_MESSAGES, PADDING_MULTIPLE,
- MAX_QUEUED_PACKETS, MAX_HANDSHAKE_ATTEMPTS};
+use consts::*;
use cookie;
-use failure::{Error, err_msg};
-use futures::{Future, future};
-use interface::UtunPacket;
use ip_packet::IpPacket;
use noise;
use message::{Initiation, Response, CookieReply, Transport};
-use std::{self, mem};
-use std::collections::VecDeque;
-use std::fmt::{self, Debug, Display, Formatter};
-use std::time::{SystemTime, UNIX_EPOCH};
-use hex;
use timer::TimerHandle;
use timestamp::{Tai64n, Timestamp};
-use snow;
-use types::PeerInfo;
+use types::{PacketVec, PeerInfo};
use udp::Endpoint;
+use byteorder::{ByteOrder, LittleEndian};
+use failure::{Error, err_msg};
+use hex;
+use rayon::prelude::*;
+use std::{self, fmt, mem,
+ collections::VecDeque,
+ iter::Iterator,
+ fmt::{Debug, Display, Formatter},
+ time::{SystemTime, UNIX_EPOCH}};
+use snow;
+
pub struct Peer {
pub info : PeerInfo,
pub sessions : Sessions,
@@ -28,7 +27,7 @@ pub struct Peer {
pub tx_bytes : u64,
pub rx_bytes : u64,
pub last_handshake_tai64n : Option<Tai64n>,
- pub outgoing_queue : VecDeque<UtunPacket>,
+ pub outgoing_queue : VecDeque<PacketVec>,
pub cookie : cookie::Generator,
}
@@ -176,7 +175,7 @@ impl Peer {
}
}
- pub fn queue_egress(&mut self, packet: UtunPacket) {
+ pub fn queue_egress(&mut self, packet: PacketVec) {
if self.outgoing_queue.len() < MAX_QUEUED_PACKETS {
self.outgoing_queue.push_back(packet);
self.timers.handshake_attempts = 0;
@@ -339,31 +338,56 @@ impl Peer {
Ok(dead.map(|session| session.our_index))
}
- pub fn handle_incoming_transport(&mut self, addr: Endpoint, packet: Transport)
- -> Result<Box<Future<Item = (Endpoint, Transport, Vec<u8>, SessionType), Error = Error> + 'static + Send>, Error>
- {
+ pub fn handle_incoming_transport(&mut self, addr: Endpoint, packet: &Transport)
+ -> Result<(Vec<u8>, SessionTransition), Error> {
+
let mut raw_packet = vec![0u8; packet.len()];
let nonce = packet.nonce();
- let (session, session_type) = self.find_session(packet.our_index()).ok_or_else(|| err_msg("no session with index"))?;
- ensure!(session.noise.is_handshake_finished(), "session is not ready for transport packets");
- ensure!(nonce < REJECT_AFTER_MESSAGES, "exceeded REJECT-AFTER-MESSAGES");
- ensure!(session.birthday.elapsed() < *REJECT_AFTER_TIME, "exceeded REJECT-AFTER-TIME");
+ let session_type = {
+ let (session, session_type) = self.find_session(packet.our_index()).ok_or_else(|| err_msg("no session with index"))?;
+ ensure!(session.noise.is_handshake_finished(), "session is not ready for transport packets");
+ ensure!(nonce < REJECT_AFTER_MESSAGES, "exceeded REJECT-AFTER-MESSAGES");
+ ensure!(session.birthday.elapsed() < *REJECT_AFTER_TIME, "exceeded REJECT-AFTER-TIME");
- session.anti_replay.update(nonce)?;
- let mut transport = session.noise.get_async_transport_state()?.clone();
- Ok(Box::new(future::lazy(move || {
- let len = transport.read_transport_message(nonce, packet.payload(), &mut raw_packet).unwrap();
+ session.anti_replay.update(nonce)?;
+ let len = session.noise.read_async_message(nonce, packet.payload(), &mut raw_packet)?;
if len > 0 {
let len = IpPacket::new(&raw_packet[..len])
- .ok_or_else(||format_err!("invalid IP packet (len {})", len)).unwrap()
+ .ok_or_else(||format_err!("invalid IP packet (len {})", len))?
.length();
raw_packet.truncate(len as usize);
} else {
raw_packet.truncate(0);
}
- Ok((addr, packet, raw_packet, session_type))
- })))
+
+ session_type
+ };
+
+ if !raw_packet.is_empty() {
+ self.timers.data_received = Timestamp::now();
+ }
+ self.timers.authenticated_received = Timestamp::now();
+ self.timers.authenticated_traversed = Timestamp::now();
+ self.timers.keepalive_sent = false; // reset passive keepalive token since received a valid ingress transport
+
+ let transition = if session_type == SessionType::Next {
+ debug!("moving 'next' session to current after receiving first transport packet");
+ let next = std::mem::replace(&mut self.sessions.next, None);
+ let current = std::mem::replace(&mut self.sessions.current, next);
+ let dead = std::mem::replace(&mut self.sessions.past, current);
+
+ self.timers.handshake_completed = Timestamp::now();
+
+ SessionTransition::Transition(dead.map(|session| session.our_index))
+ } else {
+ SessionTransition::NoTransition
+ };
+
+ self.rx_bytes += packet.len() as u64;
+ self.info.endpoint = Some(addr); // update peer endpoint after successful authentication
+
+ Ok((raw_packet, transition))
}
pub fn handle_incoming_decrypted_transport(&mut self, addr: Endpoint, raw_packet: &[u8], session_type: SessionType)
@@ -395,33 +419,59 @@ impl Peer {
Ok(transition)
}
- pub fn handle_outgoing_transport(&mut self, packet: &[u8]) -> Result<(Endpoint, Vec<u8>), Error> {
- let session = self.sessions.current.as_mut().ok_or_else(|| err_msg("no current noise session"))?;
- let endpoint = self.info.endpoint.ok_or_else(|| err_msg("no known peer endpoint"))?;
- let padding = if packet.len() % PADDING_MULTIPLE != 0 {
- PADDING_MULTIPLE - (packet.len() % PADDING_MULTIPLE)
- } else { 0 };
- let padded_len = packet.len() + padding;
- let mut out_packet = vec![0u8; padded_len + TRANSPORT_OVERHEAD];
-
- ensure!(session.nonce < REJECT_AFTER_MESSAGES, "exceeded REJECT-AFTER-MESSAGES");
- ensure!(session.birthday.elapsed() < *REJECT_AFTER_TIME, "exceeded REJECT-AFTER-TIME");
-
- out_packet[0] = 4;
- LittleEndian::write_u32(&mut out_packet[4..], session.their_index);
- LittleEndian::write_u64(&mut out_packet[8..], session.nonce);
- let padded_packet = &[packet, &vec![0u8; padding]].concat();
- let len = session.noise.write_async_message(session.nonce, padded_packet, &mut out_packet[16..])?;
- session.nonce += 1;
- self.tx_bytes += len as u64;
-
- if !packet.is_empty() {
- self.timers.data_sent = Timestamp::now();
- }
- self.timers.authenticated_traversed = Timestamp::now();
+ pub fn handle_outgoing_transport<T>(&mut self, packet: T) -> Result<(Endpoint, PacketVec), Error>
+ where T: Into<PacketVec>
+ {
+ let (endpoint, mut packets) = self.handle_outgoing_transports(vec![packet.into()])?;
+ Ok((endpoint, packets.remove(0)))
+ }
- out_packet.truncate(TRANSPORT_HEADER_SIZE + len);
- Ok((endpoint, out_packet))
+ pub fn handle_outgoing_transports<T>(&mut self, packets: T) -> Result<(Endpoint, Vec<PacketVec>), Error>
+ where T: IntoIterator<Item = PacketVec>
+ {
+ let session = self.sessions.current.as_mut().ok_or_else(|| err_msg("no current noise session"))?;
+ let endpoint = self.info.endpoint.ok_or_else(|| err_msg("no known peer endpoint"))?;
+
+ ensure!(session.birthday.elapsed() < *REJECT_AFTER_TIME, "exceeded REJECT-AFTER-TIME");
+
+ let transport = session.noise.get_async_transport_state()?.clone();
+ let encrypted_packets = packets.into_iter()
+ .filter_map(|mut packet| {
+ if session.nonce > REJECT_AFTER_MESSAGES {
+ warn!("exceeded REJECT-AFTER-MESSAGES");
+ None
+ } else {
+ let padding = if packet.len() % PADDING_MULTIPLE != 0 {
+ PADDING_MULTIPLE - (packet.len() % PADDING_MULTIPLE)
+ } else { 0 };
+ let padded_len = packet.len() + padding;
+ let mut out_packet = vec![0u8; padded_len + TRANSPORT_OVERHEAD];
+ packet.resize(padded_len, 0);
+
+ out_packet[0] = 4;
+ LittleEndian::write_u32(&mut out_packet[4..], session.their_index);
+ LittleEndian::write_u64(&mut out_packet[8..], session.nonce);
+ session.nonce += 1;
+ Some((session.nonce - 1, packet, out_packet))
+ }
+ })
+ .collect::<Vec<_>>()
+ .into_par_iter()
+ .map_with(transport, |transport, (nonce, in_packet, mut out_packet)| {
+ let len = transport.write_transport_message(nonce, &in_packet, &mut out_packet[16..]).unwrap();
+ out_packet.truncate(TRANSPORT_HEADER_SIZE + len);
+ out_packet
+ })
+ .collect::<Vec<_>>();
+
+ // self.tx_bytes += len as u64;
+
+ // if !packet.is_empty() {
+ // self.timers.data_sent = Timestamp::now();
+ // }
+ // self.timers.authenticated_traversed = Timestamp::now();
+
+ Ok((endpoint, encrypted_packets))
}
pub fn to_config_string(&self) -> String {
diff --git a/src/types.rs b/src/types.rs
index 77a104d..2b569e7 100644
--- a/src/types.rs
+++ b/src/types.rs
@@ -4,6 +4,8 @@ use std::net::IpAddr;
use std::time::Duration;
use udp::Endpoint;
+pub type PacketVec = Vec<u8>;
+
#[derive(Clone, Debug, Default)]
pub struct PeerInfo {
pub pub_key: [u8; 32],