aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJake McGinty <me@jake.su>2018-05-29 02:16:29 -0500
committerJake McGinty <me@jake.su>2018-06-03 12:01:52 -0500
commit3c51e38718f78c0be9e82804290dc0a97b735f8e (patch)
tree557cd21452bae32dc1f882b4d92dbb926c94a813
parentdeps: Update Cargo.lock for new snow and rips-packets (diff)
downloadwireguard-rs-3c51e38718f78c0be9e82804290dc0a97b735f8e.tar.xz
wireguard-rs-3c51e38718f78c0be9e82804290dc0a97b735f8e.zip
wip
-rw-r--r--src/interface/peer_server.rs50
-rw-r--r--src/lib.rs1
-rw-r--r--src/peer.rs39
3 files changed, 66 insertions, 24 deletions
diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs
index 9616ba2..5d10406 100644
--- a/src/interface/peer_server.rs
+++ b/src/interface/peer_server.rs
@@ -12,6 +12,7 @@ use timer::{Timer, TimerMessage};
use byteorder::{ByteOrder, LittleEndian};
use failure::{Error, err_msg};
use futures::{Async, Future, Stream, Poll, unsync::mpsc, task};
+use futures_cpupool::CpuPool;
use rand::{self, Rng, ThreadRng};
use udp::{Endpoint, UdpSocket, PeerServerMessage, UdpChannel};
use tokio_core::reactor::Handle;
@@ -59,6 +60,8 @@ pub struct PeerServer {
rate_limiter : RateLimiter,
under_load_until : Instant,
rng : ThreadRng,
+ cpu_pool : CpuPool,
+ decrypt_channel : Channel<(Endpoint, Transport, Vec<u8>, SessionType)>,
}
impl PeerServer {
@@ -75,7 +78,9 @@ impl PeerServer {
cookie : cookie::Validator::new(&[0u8; 32]),
rate_limiter : RateLimiter::new(&handle)?,
under_load_until : Instant::now(),
- rng : rand::thread_rng()
+ rng : rand::thread_rng(),
+ cpu_pool : CpuPool::new_num_cpus(),
+ decrypt_channel : mpsc::unbounded().into(),
})
}
@@ -153,7 +158,7 @@ impl PeerServer {
let message = packet.try_into()?;
if let Message::Transport(packet) = message {
- self.handle_ingress_transport(addr, &packet)?;
+ self.handle_ingress_transport(addr, packet)?;
} else {
self.queue_ingress_handshake(addr, message);
}
@@ -282,15 +287,34 @@ impl PeerServer {
peer.consume_cookie_reply(packet)
}
- fn handle_ingress_transport(&mut self, addr: Endpoint, packet: &Transport) -> Result<(), Error> {
+ fn handle_ingress_transport(&mut self, addr: Endpoint, packet: Transport) -> Result<(), Error> {
+
let peer_ref = self.shared_state.borrow().index_map.get(&packet.our_index())
.ok_or_else(|| err_msg("unknown our_index"))?.clone();
- let (raw_packet, needs_handshake) = {
+ let mut peer = peer_ref.borrow_mut();
+ let tx = self.decrypt_channel.tx.clone();
+ let f = self.cpu_pool.spawn(peer.handle_incoming_transport(addr, packet)?)
+ .and_then(move |result| {
+ tx.unbounded_send(result).expect("broken decrypt channel");
+ Ok(())
+ })
+ .map_err(|e| warn!("{:?}", e));
+ self.handle.spawn(f);
+
+ Ok(())
+ }
+
+ fn handle_ingress_decrypted_transport(&mut self, addr: Endpoint, orig_packet: Transport, raw_packet: Vec<u8>, session_type: SessionType)
+ -> Result<(), Error>
+ {
+ let peer_ref = self.shared_state.borrow().index_map.get(&orig_packet.our_index())
+ .ok_or_else(|| err_msg("unknown our_index"))?.clone();
+
+ let needs_handshake = {
let mut peer = peer_ref.borrow_mut();
+ let transition = peer.handle_incoming_decrypted_transport(addr, &raw_packet, session_type)?;
let mut state = self.shared_state.borrow_mut();
- let (raw_packet, transition) = peer.handle_incoming_transport(addr, packet)?;
-
if let SessionTransition::Transition(possible_dead_index) = transition {
if let Some(index) = possible_dead_index {
let _ = state.index_map.remove(&index);
@@ -307,7 +331,7 @@ impl PeerServer {
self.timer.send_after(*WIPE_AFTER_TIME, TimerMessage::Wipe(Rc::downgrade(&peer_ref)));
}
- (raw_packet, peer.needs_new_handshake(false))
+ peer.needs_new_handshake(false)
};
if needs_handshake {
@@ -600,6 +624,18 @@ impl Future for PeerServer {
}
loop {
+ // Handle UDP packets from the outside world
+ match self.decrypt_channel.rx.poll() {
+ Ok(Async::Ready(Some((addr, orig_packet, decrypted, session_type)))) => {
+ let _ = self.handle_ingress_decrypted_transport(addr, orig_packet, decrypted, session_type).map_err(|e| warn!("UDP ERR: {:?}", e));
+ },
+ Ok(Async::NotReady) => { break; },
+ Ok(Async::Ready(None)) => bail!("incoming udp stream ended unexpectedly"),
+ Err(e) => bail!("incoming udp stream error: {:?}", e)
+ }
+ }
+
+ loop {
// Handle packets coming from the local tunnel
match self.outgoing.rx.poll() {
Ok(Async::Ready(Some(packet))) => {
diff --git a/src/lib.rs b/src/lib.rs
index 5b41b8f..14e1f08 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -21,6 +21,7 @@ extern crate blake2_rfc;
extern crate byteorder;
extern crate bytes;
extern crate chacha20_poly1305_aead;
+extern crate futures_cpupool;
extern crate hex;
extern crate libc;
extern crate mio;
diff --git a/src/peer.rs b/src/peer.rs
index a6cc215..798d61a 100644
--- a/src/peer.rs
+++ b/src/peer.rs
@@ -5,6 +5,7 @@ use consts::{TRANSPORT_OVERHEAD, TRANSPORT_HEADER_SIZE, REKEY_AFTER_MESSAGES, RE
MAX_QUEUED_PACKETS, MAX_HANDSHAKE_ATTEMPTS};
use cookie;
use failure::{Error, err_msg};
+use futures::{Future, future};
use interface::UtunPacket;
use ip_packet::IpPacket;
use noise;
@@ -331,33 +332,37 @@ impl Peer {
Ok(dead.map(|session| session.our_index))
}
- pub fn handle_incoming_transport(&mut self, addr: Endpoint, packet: &Transport)
- -> Result<(Vec<u8>, SessionTransition), Error> {
-
+ pub fn handle_incoming_transport(&mut self, addr: Endpoint, packet: Transport)
+ -> Result<Box<Future<Item = (Endpoint, Transport, Vec<u8>, SessionType), Error = Error> + 'static + Send>, Error>
+ {
let mut raw_packet = vec![0u8; packet.len()];
let nonce = packet.nonce();
- let session_type = {
- let (session, session_type) = self.find_session(packet.our_index()).ok_or_else(|| err_msg("no session with index"))?;
- ensure!(session.noise.is_handshake_finished(), "session is not ready for transport packets");
- ensure!(nonce < REJECT_AFTER_MESSAGES, "exceeded REJECT-AFTER-MESSAGES");
- ensure!(session.birthday.elapsed() < *REJECT_AFTER_TIME, "exceeded REJECT-AFTER-TIME");
+ let (session, session_type) = self.find_session(packet.our_index()).ok_or_else(|| err_msg("no session with index"))?;
+ ensure!(session.noise.is_handshake_finished(), "session is not ready for transport packets");
+ ensure!(nonce < REJECT_AFTER_MESSAGES, "exceeded REJECT-AFTER-MESSAGES");
+ ensure!(session.birthday.elapsed() < *REJECT_AFTER_TIME, "exceeded REJECT-AFTER-TIME");
- session.anti_replay.update(nonce)?;
- session.noise.set_receiving_nonce(nonce)?;
- let len = session.noise.read_message(packet.payload(), &mut raw_packet)?;
+ session.anti_replay.update(nonce)?;
+ let mut transport = session.noise.get_transport_state()?.clone();
+ transport.set_receiving_nonce(nonce);
+ Ok(Box::new(future::lazy(move || {
+ let len = transport.read_transport_message(packet.payload(), &mut raw_packet).unwrap();
if len > 0 {
let len = IpPacket::new(&raw_packet[..len])
- .ok_or_else(||format_err!("invalid IP packet (len {})", len))?
+ .ok_or_else(||format_err!("invalid IP packet (len {})", len)).unwrap()
.length();
raw_packet.truncate(len as usize);
} else {
raw_packet.truncate(0);
}
+ Ok((addr, packet, raw_packet, session_type))
+ })))
+ }
- session_type
- };
-
+ pub fn handle_incoming_decrypted_transport(&mut self, addr: Endpoint, raw_packet: &[u8], session_type: SessionType)
+ -> Result<SessionTransition, Error>
+ {
if !raw_packet.is_empty() {
self.timers.data_received = Timestamp::now();
}
@@ -378,10 +383,10 @@ impl Peer {
SessionTransition::NoTransition
};
- self.rx_bytes += packet.len() as u64;
+ self.rx_bytes += raw_packet.len() as u64;
self.info.endpoint = Some(addr); // update peer endpoint after successful authentication
- Ok((raw_packet, transition))
+ Ok(transition)
}
pub fn handle_outgoing_transport(&mut self, packet: &[u8]) -> Result<(Endpoint, Vec<u8>), Error> {