diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/interface/peer_server.rs | 50 | ||||
-rw-r--r-- | src/lib.rs | 1 | ||||
-rw-r--r-- | src/peer.rs | 39 |
3 files changed, 66 insertions, 24 deletions
diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs index 9616ba2..5d10406 100644 --- a/src/interface/peer_server.rs +++ b/src/interface/peer_server.rs @@ -12,6 +12,7 @@ use timer::{Timer, TimerMessage}; use byteorder::{ByteOrder, LittleEndian}; use failure::{Error, err_msg}; use futures::{Async, Future, Stream, Poll, unsync::mpsc, task}; +use futures_cpupool::CpuPool; use rand::{self, Rng, ThreadRng}; use udp::{Endpoint, UdpSocket, PeerServerMessage, UdpChannel}; use tokio_core::reactor::Handle; @@ -59,6 +60,8 @@ pub struct PeerServer { rate_limiter : RateLimiter, under_load_until : Instant, rng : ThreadRng, + cpu_pool : CpuPool, + decrypt_channel : Channel<(Endpoint, Transport, Vec<u8>, SessionType)>, } impl PeerServer { @@ -75,7 +78,9 @@ impl PeerServer { cookie : cookie::Validator::new(&[0u8; 32]), rate_limiter : RateLimiter::new(&handle)?, under_load_until : Instant::now(), - rng : rand::thread_rng() + rng : rand::thread_rng(), + cpu_pool : CpuPool::new_num_cpus(), + decrypt_channel : mpsc::unbounded().into(), }) } @@ -153,7 +158,7 @@ impl PeerServer { let message = packet.try_into()?; if let Message::Transport(packet) = message { - self.handle_ingress_transport(addr, &packet)?; + self.handle_ingress_transport(addr, packet)?; } else { self.queue_ingress_handshake(addr, message); } @@ -282,15 +287,34 @@ impl PeerServer { peer.consume_cookie_reply(packet) } - fn handle_ingress_transport(&mut self, addr: Endpoint, packet: &Transport) -> Result<(), Error> { + fn handle_ingress_transport(&mut self, addr: Endpoint, packet: Transport) -> Result<(), Error> { + let peer_ref = self.shared_state.borrow().index_map.get(&packet.our_index()) .ok_or_else(|| err_msg("unknown our_index"))?.clone(); - let (raw_packet, needs_handshake) = { + let mut peer = peer_ref.borrow_mut(); + let tx = self.decrypt_channel.tx.clone(); + let f = self.cpu_pool.spawn(peer.handle_incoming_transport(addr, packet)?) + .and_then(move |result| { + tx.unbounded_send(result).expect("broken decrypt channel"); + Ok(()) + }) + .map_err(|e| warn!("{:?}", e)); + self.handle.spawn(f); + + Ok(()) + } + + fn handle_ingress_decrypted_transport(&mut self, addr: Endpoint, orig_packet: Transport, raw_packet: Vec<u8>, session_type: SessionType) + -> Result<(), Error> + { + let peer_ref = self.shared_state.borrow().index_map.get(&orig_packet.our_index()) + .ok_or_else(|| err_msg("unknown our_index"))?.clone(); + + let needs_handshake = { let mut peer = peer_ref.borrow_mut(); + let transition = peer.handle_incoming_decrypted_transport(addr, &raw_packet, session_type)?; let mut state = self.shared_state.borrow_mut(); - let (raw_packet, transition) = peer.handle_incoming_transport(addr, packet)?; - if let SessionTransition::Transition(possible_dead_index) = transition { if let Some(index) = possible_dead_index { let _ = state.index_map.remove(&index); @@ -307,7 +331,7 @@ impl PeerServer { self.timer.send_after(*WIPE_AFTER_TIME, TimerMessage::Wipe(Rc::downgrade(&peer_ref))); } - (raw_packet, peer.needs_new_handshake(false)) + peer.needs_new_handshake(false) }; if needs_handshake { @@ -600,6 +624,18 @@ impl Future for PeerServer { } loop { + // Handle UDP packets from the outside world + match self.decrypt_channel.rx.poll() { + Ok(Async::Ready(Some((addr, orig_packet, decrypted, session_type)))) => { + let _ = self.handle_ingress_decrypted_transport(addr, orig_packet, decrypted, session_type).map_err(|e| warn!("UDP ERR: {:?}", e)); + }, + Ok(Async::NotReady) => { break; }, + Ok(Async::Ready(None)) => bail!("incoming udp stream ended unexpectedly"), + Err(e) => bail!("incoming udp stream error: {:?}", e) + } + } + + loop { // Handle packets coming from the local tunnel match self.outgoing.rx.poll() { Ok(Async::Ready(Some(packet))) => { @@ -21,6 +21,7 @@ extern crate blake2_rfc; extern crate byteorder; extern crate bytes; extern crate chacha20_poly1305_aead; +extern crate futures_cpupool; extern crate hex; extern crate libc; extern crate mio; diff --git a/src/peer.rs b/src/peer.rs index a6cc215..798d61a 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -5,6 +5,7 @@ use consts::{TRANSPORT_OVERHEAD, TRANSPORT_HEADER_SIZE, REKEY_AFTER_MESSAGES, RE MAX_QUEUED_PACKETS, MAX_HANDSHAKE_ATTEMPTS}; use cookie; use failure::{Error, err_msg}; +use futures::{Future, future}; use interface::UtunPacket; use ip_packet::IpPacket; use noise; @@ -331,33 +332,37 @@ impl Peer { Ok(dead.map(|session| session.our_index)) } - pub fn handle_incoming_transport(&mut self, addr: Endpoint, packet: &Transport) - -> Result<(Vec<u8>, SessionTransition), Error> { - + pub fn handle_incoming_transport(&mut self, addr: Endpoint, packet: Transport) + -> Result<Box<Future<Item = (Endpoint, Transport, Vec<u8>, SessionType), Error = Error> + 'static + Send>, Error> + { let mut raw_packet = vec![0u8; packet.len()]; let nonce = packet.nonce(); - let session_type = { - let (session, session_type) = self.find_session(packet.our_index()).ok_or_else(|| err_msg("no session with index"))?; - ensure!(session.noise.is_handshake_finished(), "session is not ready for transport packets"); - ensure!(nonce < REJECT_AFTER_MESSAGES, "exceeded REJECT-AFTER-MESSAGES"); - ensure!(session.birthday.elapsed() < *REJECT_AFTER_TIME, "exceeded REJECT-AFTER-TIME"); + let (session, session_type) = self.find_session(packet.our_index()).ok_or_else(|| err_msg("no session with index"))?; + ensure!(session.noise.is_handshake_finished(), "session is not ready for transport packets"); + ensure!(nonce < REJECT_AFTER_MESSAGES, "exceeded REJECT-AFTER-MESSAGES"); + ensure!(session.birthday.elapsed() < *REJECT_AFTER_TIME, "exceeded REJECT-AFTER-TIME"); - session.anti_replay.update(nonce)?; - session.noise.set_receiving_nonce(nonce)?; - let len = session.noise.read_message(packet.payload(), &mut raw_packet)?; + session.anti_replay.update(nonce)?; + let mut transport = session.noise.get_transport_state()?.clone(); + transport.set_receiving_nonce(nonce); + Ok(Box::new(future::lazy(move || { + let len = transport.read_transport_message(packet.payload(), &mut raw_packet).unwrap(); if len > 0 { let len = IpPacket::new(&raw_packet[..len]) - .ok_or_else(||format_err!("invalid IP packet (len {})", len))? + .ok_or_else(||format_err!("invalid IP packet (len {})", len)).unwrap() .length(); raw_packet.truncate(len as usize); } else { raw_packet.truncate(0); } + Ok((addr, packet, raw_packet, session_type)) + }))) + } - session_type - }; - + pub fn handle_incoming_decrypted_transport(&mut self, addr: Endpoint, raw_packet: &[u8], session_type: SessionType) + -> Result<SessionTransition, Error> + { if !raw_packet.is_empty() { self.timers.data_received = Timestamp::now(); } @@ -378,10 +383,10 @@ impl Peer { SessionTransition::NoTransition }; - self.rx_bytes += packet.len() as u64; + self.rx_bytes += raw_packet.len() as u64; self.info.endpoint = Some(addr); // update peer endpoint after successful authentication - Ok((raw_packet, transition)) + Ok(transition) } pub fn handle_outgoing_transport(&mut self, packet: &[u8]) -> Result<(Endpoint, Vec<u8>), Error> { |