From b0d70da996abafd739d0b76a41164b2bc47c9558 Mon Sep 17 00:00:00 2001 From: Jake McGinty Date: Fri, 1 Jun 2018 16:59:37 -0500 Subject: encryption wip --- src/interface/peer_server.rs | 56 ++++++++++++++++++++++++++++++++++---------- src/peer.rs | 36 ++++++++++++++++++---------- 2 files changed, 67 insertions(+), 25 deletions(-) diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs index d03fa58..aa82fb7 100644 --- a/src/interface/peer_server.rs +++ b/src/interface/peer_server.rs @@ -62,6 +62,7 @@ pub struct PeerServer { rng : ThreadRng, cpu_pool : CpuPool, decrypt_channel : Channel<(Endpoint, Transport, Vec, SessionType)>, + encrypt_channel : Channel<(SharedPeer, (Endpoint, Vec))>, } impl PeerServer { @@ -81,6 +82,7 @@ impl PeerServer { rng : rand::thread_rng(), cpu_pool : CpuPool::new_num_cpus(), decrypt_channel : mpsc::unbounded().into(), + encrypt_channel : mpsc::unbounded().into(), }) } @@ -250,7 +252,8 @@ impl PeerServer { } debug!("got handshake response (0x02)"); - let mut state = self.shared_state.borrow_mut(); + let shared_state = self.shared_state.clone(); + let mut state = shared_state.borrow_mut(); let our_index = LittleEndian::read_u32(&packet[8..]); let peer_ref = state.index_map.get(&our_index) .ok_or_else(|| format_err!("unknown our_index ({})", our_index))? @@ -266,11 +269,11 @@ impl PeerServer { if !peer.outgoing_queue.is_empty() { debug!("sending {} queued egress packets", peer.outgoing_queue.len()); while let Some(packet) = peer.outgoing_queue.pop_front() { - self.send_to_peer(peer.handle_outgoing_transport(packet.payload())?)?; + self.encrypt_and_send(peer_ref.clone(), &mut peer, packet)?; } } else { debug!("sending empty keepalive"); - self.send_to_peer(peer.handle_outgoing_transport(&[])?)?; + self.encrypt_and_send(peer_ref.clone(), &mut peer, UtunPacket::Inet4(vec![]))?; } } else { error!("peer not ready for transport after processing handshake response. this shouldn't happen."); @@ -307,6 +310,18 @@ impl PeerServer { Ok(()) } + fn encrypt_and_send(&mut self, peer_ref: SharedPeer, peer: &mut Peer, packet: UtunPacket) -> Result<(), Error> { + let tx = self.encrypt_channel.tx.clone(); + let f = self.cpu_pool.spawn(peer.handle_outgoing_transport(packet)?) + .and_then(move |result| { + tx.unbounded_send((peer_ref, result)).expect("broken decrypt channel"); + Ok(()) + }) + .map_err(|e| warn!("{:?}", e)); + self.handle.spawn(f); + Ok(()) + } + fn handle_ingress_decrypted_transport(&mut self, addr: Endpoint, orig_packet: Transport, raw_packet: Vec, session_type: SessionType) -> Result<(), Error> { @@ -316,7 +331,8 @@ impl PeerServer { let needs_handshake = { let mut peer = peer_ref.borrow_mut(); let transition = peer.handle_incoming_decrypted_transport(addr, &raw_packet, session_type)?; - let mut state = self.shared_state.borrow_mut(); + let shared_state = self.shared_state.clone(); + let mut state = shared_state.borrow_mut(); if let SessionTransition::Transition(possible_dead_index) = transition { if let Some(index) = possible_dead_index { let _ = state.index_map.remove(&index); @@ -325,10 +341,7 @@ impl PeerServer { let outgoing: Vec = peer.outgoing_queue.drain(..).collect(); for packet in outgoing { - match peer.handle_outgoing_transport(packet.payload()) { - Ok(message) => self.send_to_peer(message)?, - Err(e) => warn!("failed to encrypt packet: {}", e) - } + self.encrypt_and_send(peer_ref.clone(), &mut peer, packet)?; } self.timer.send_after(*WIPE_AFTER_TIME, TimerMessage::Wipe(Rc::downgrade(&peer_ref))); @@ -369,7 +382,7 @@ impl PeerServer { } while let Some(packet) = peer.outgoing_queue.pop_front() { - self.send_to_peer(peer.handle_outgoing_transport(packet.payload())?)?; + self.encrypt_and_send(peer_ref.clone(), &mut peer, packet)?; } } @@ -383,6 +396,13 @@ impl PeerServer { Ok(()) } + fn handle_egress_encrypted_packet(&mut self, peer_ref: SharedPeer, endpoint: Endpoint, packet: Vec) -> Result<(), Error> { + let mut peer = peer_ref.borrow_mut(); + peer.handle_outgoing_encrypted_transport(&packet); + + self.send_to_peer((endpoint, packet)) + } + fn send_cookie_reply(&mut self, addr: Endpoint, mac1: &[u8], index: u32) -> Result<(), Error> { let reply = match addr.ip() { IpAddr::V4(ip) => self.cookie.generate_reply(index, mac1, &ip.octets())?, @@ -488,7 +508,7 @@ impl PeerServer { } } - self.send_to_peer(peer.handle_outgoing_transport(&[])?)?; + self.encrypt_and_send(upgraded_peer_ref.clone(), &mut peer, UtunPacket::Inet4(vec![]))?; debug!("sent passive keepalive packet"); self.timer.send_after(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone())); @@ -506,7 +526,7 @@ impl PeerServer { bail!("persistent keepalive tick (waiting ~{}s due to last authenticated packet time)", wait.as_secs()); } - self.send_to_peer(peer.handle_outgoing_transport(&[])?)?; + self.encrypt_and_send(upgraded_peer_ref.clone(), &mut peer, UtunPacket::Inet4(vec![]))?; let handle = self.timer.send_after(persistent_keepalive, PersistentKeepAlive(peer_ref.clone())); peer.timers.persistent_timer = Some(handle); debug!("sent persistent keepalive packet"); @@ -564,7 +584,7 @@ impl PeerServer { if let Some(keepalive) = peer.info.persistent_keepalive() { let handle = self.timer.send_after(keepalive, TimerMessage::PersistentKeepAlive(Rc::downgrade(&peer_ref))); peer.timers.persistent_timer = Some(handle); - self.send_to_peer(peer.handle_outgoing_transport(&[])?)?; + self.encrypt_and_send(peer_ref.clone(), &mut peer, UtunPacket::Inet4(vec![]))?; debug!("set new keepalive timer and immediately sent new keepalive packet."); } } @@ -637,6 +657,18 @@ impl Future for PeerServer { } } + loop { + // Handle UDP packets from the outside world + match self.encrypt_channel.rx.poll() { + Ok(Async::Ready(Some((peer_ref, (endpoint, packet))))) => { + let _ = self.handle_egress_encrypted_packet(peer_ref, endpoint, packet).map_err(|e| warn!("UDP ERR: {:?}", e)); + }, + Ok(Async::NotReady) => { break; }, + Ok(Async::Ready(None)) => bail!("incoming udp stream ended unexpectedly"), + Err(e) => bail!("incoming udp stream error: {:?}", e) + } + } + loop { // Handle packets coming from the local tunnel match self.outgoing.rx.poll() { diff --git a/src/peer.rs b/src/peer.rs index c0b21eb..3a853ae 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -395,33 +395,43 @@ impl Peer { Ok(transition) } - pub fn handle_outgoing_transport(&mut self, packet: &[u8]) -> Result<(Endpoint, Vec), Error> { + pub fn handle_outgoing_transport(&mut self, packet: UtunPacket) + -> Result), Error = Error> + 'static + Send>, Error> + { let session = self.sessions.current.as_mut().ok_or_else(|| err_msg("no current noise session"))?; let endpoint = self.info.endpoint.ok_or_else(|| err_msg("no known peer endpoint"))?; - let padding = if packet.len() % PADDING_MULTIPLE != 0 { - PADDING_MULTIPLE - (packet.len() % PADDING_MULTIPLE) + let padding = if packet.payload().len() % PADDING_MULTIPLE != 0 { + PADDING_MULTIPLE - (packet.payload().len() % PADDING_MULTIPLE) } else { 0 }; - let padded_len = packet.len() + padding; + let padded_len = packet.payload().len() + padding; let mut out_packet = vec![0u8; padded_len + TRANSPORT_OVERHEAD]; ensure!(session.nonce < REJECT_AFTER_MESSAGES, "exceeded REJECT-AFTER-MESSAGES"); ensure!(session.birthday.elapsed() < *REJECT_AFTER_TIME, "exceeded REJECT-AFTER-TIME"); + let mut transport = session.noise.get_async_transport_state()?.clone(); + session.nonce += 1; + let nonce = session.nonce - 1; + out_packet[0] = 4; LittleEndian::write_u32(&mut out_packet[4..], session.their_index); - LittleEndian::write_u64(&mut out_packet[8..], session.nonce); - let padded_packet = &[packet, &vec![0u8; padding]].concat(); - let len = session.noise.write_async_message(session.nonce, padded_packet, &mut out_packet[16..])?; - session.nonce += 1; - self.tx_bytes += len as u64; + LittleEndian::write_u64(&mut out_packet[8..], nonce); - if !packet.is_empty() { + Ok(Box::new(future::lazy(move || { + let padded_packet = &[packet.payload(), &vec![0u8; padding]].concat(); + let len = transport.write_transport_message(nonce, padded_packet, &mut out_packet[16..])?; + out_packet.truncate(TRANSPORT_HEADER_SIZE + len); + Ok((endpoint, out_packet)) + }))) + } + + pub fn handle_outgoing_encrypted_transport(&mut self, packet: &[u8]) { + self.tx_bytes += packet.len() as u64; + + if packet.len() > 32 { // TODO make constant self.timers.data_sent = Timestamp::now(); } self.timers.authenticated_traversed = Timestamp::now(); - - out_packet.truncate(TRANSPORT_HEADER_SIZE + len); - Ok((endpoint, out_packet)) } pub fn to_config_string(&self) -> String { -- cgit v1.2.3-59-g8ed1b