diff options
Diffstat (limited to 'src/interface/peer_server.rs')
-rw-r--r-- | src/interface/peer_server.rs | 56 |
1 files changed, 44 insertions, 12 deletions
diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs index d03fa58..aa82fb7 100644 --- a/src/interface/peer_server.rs +++ b/src/interface/peer_server.rs @@ -62,6 +62,7 @@ pub struct PeerServer { rng : ThreadRng, cpu_pool : CpuPool, decrypt_channel : Channel<(Endpoint, Transport, Vec<u8>, SessionType)>, + encrypt_channel : Channel<(SharedPeer, (Endpoint, Vec<u8>))>, } impl PeerServer { @@ -81,6 +82,7 @@ impl PeerServer { rng : rand::thread_rng(), cpu_pool : CpuPool::new_num_cpus(), decrypt_channel : mpsc::unbounded().into(), + encrypt_channel : mpsc::unbounded().into(), }) } @@ -250,7 +252,8 @@ impl PeerServer { } debug!("got handshake response (0x02)"); - let mut state = self.shared_state.borrow_mut(); + let shared_state = self.shared_state.clone(); + let mut state = shared_state.borrow_mut(); let our_index = LittleEndian::read_u32(&packet[8..]); let peer_ref = state.index_map.get(&our_index) .ok_or_else(|| format_err!("unknown our_index ({})", our_index))? @@ -266,11 +269,11 @@ impl PeerServer { if !peer.outgoing_queue.is_empty() { debug!("sending {} queued egress packets", peer.outgoing_queue.len()); while let Some(packet) = peer.outgoing_queue.pop_front() { - self.send_to_peer(peer.handle_outgoing_transport(packet.payload())?)?; + self.encrypt_and_send(peer_ref.clone(), &mut peer, packet)?; } } else { debug!("sending empty keepalive"); - self.send_to_peer(peer.handle_outgoing_transport(&[])?)?; + self.encrypt_and_send(peer_ref.clone(), &mut peer, UtunPacket::Inet4(vec![]))?; } } else { error!("peer not ready for transport after processing handshake response. this shouldn't happen."); @@ -307,6 +310,18 @@ impl PeerServer { Ok(()) } + fn encrypt_and_send(&mut self, peer_ref: SharedPeer, peer: &mut Peer, packet: UtunPacket) -> Result<(), Error> { + let tx = self.encrypt_channel.tx.clone(); + let f = self.cpu_pool.spawn(peer.handle_outgoing_transport(packet)?) + .and_then(move |result| { + tx.unbounded_send((peer_ref, result)).expect("broken decrypt channel"); + Ok(()) + }) + .map_err(|e| warn!("{:?}", e)); + self.handle.spawn(f); + Ok(()) + } + fn handle_ingress_decrypted_transport(&mut self, addr: Endpoint, orig_packet: Transport, raw_packet: Vec<u8>, session_type: SessionType) -> Result<(), Error> { @@ -316,7 +331,8 @@ impl PeerServer { let needs_handshake = { let mut peer = peer_ref.borrow_mut(); let transition = peer.handle_incoming_decrypted_transport(addr, &raw_packet, session_type)?; - let mut state = self.shared_state.borrow_mut(); + let shared_state = self.shared_state.clone(); + let mut state = shared_state.borrow_mut(); if let SessionTransition::Transition(possible_dead_index) = transition { if let Some(index) = possible_dead_index { let _ = state.index_map.remove(&index); @@ -325,10 +341,7 @@ impl PeerServer { let outgoing: Vec<UtunPacket> = peer.outgoing_queue.drain(..).collect(); for packet in outgoing { - match peer.handle_outgoing_transport(packet.payload()) { - Ok(message) => self.send_to_peer(message)?, - Err(e) => warn!("failed to encrypt packet: {}", e) - } + self.encrypt_and_send(peer_ref.clone(), &mut peer, packet)?; } self.timer.send_after(*WIPE_AFTER_TIME, TimerMessage::Wipe(Rc::downgrade(&peer_ref))); @@ -369,7 +382,7 @@ impl PeerServer { } while let Some(packet) = peer.outgoing_queue.pop_front() { - self.send_to_peer(peer.handle_outgoing_transport(packet.payload())?)?; + self.encrypt_and_send(peer_ref.clone(), &mut peer, packet)?; } } @@ -383,6 +396,13 @@ impl PeerServer { Ok(()) } + fn handle_egress_encrypted_packet(&mut self, peer_ref: SharedPeer, endpoint: Endpoint, packet: Vec<u8>) -> Result<(), Error> { + let mut peer = peer_ref.borrow_mut(); + peer.handle_outgoing_encrypted_transport(&packet); + + self.send_to_peer((endpoint, packet)) + } + fn send_cookie_reply(&mut self, addr: Endpoint, mac1: &[u8], index: u32) -> Result<(), Error> { let reply = match addr.ip() { IpAddr::V4(ip) => self.cookie.generate_reply(index, mac1, &ip.octets())?, @@ -488,7 +508,7 @@ impl PeerServer { } } - self.send_to_peer(peer.handle_outgoing_transport(&[])?)?; + self.encrypt_and_send(upgraded_peer_ref.clone(), &mut peer, UtunPacket::Inet4(vec![]))?; debug!("sent passive keepalive packet"); self.timer.send_after(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone())); @@ -506,7 +526,7 @@ impl PeerServer { bail!("persistent keepalive tick (waiting ~{}s due to last authenticated packet time)", wait.as_secs()); } - self.send_to_peer(peer.handle_outgoing_transport(&[])?)?; + self.encrypt_and_send(upgraded_peer_ref.clone(), &mut peer, UtunPacket::Inet4(vec![]))?; let handle = self.timer.send_after(persistent_keepalive, PersistentKeepAlive(peer_ref.clone())); peer.timers.persistent_timer = Some(handle); debug!("sent persistent keepalive packet"); @@ -564,7 +584,7 @@ impl PeerServer { if let Some(keepalive) = peer.info.persistent_keepalive() { let handle = self.timer.send_after(keepalive, TimerMessage::PersistentKeepAlive(Rc::downgrade(&peer_ref))); peer.timers.persistent_timer = Some(handle); - self.send_to_peer(peer.handle_outgoing_transport(&[])?)?; + self.encrypt_and_send(peer_ref.clone(), &mut peer, UtunPacket::Inet4(vec![]))?; debug!("set new keepalive timer and immediately sent new keepalive packet."); } } @@ -638,6 +658,18 @@ impl Future for PeerServer { } loop { + // Handle UDP packets from the outside world + match self.encrypt_channel.rx.poll() { + Ok(Async::Ready(Some((peer_ref, (endpoint, packet))))) => { + let _ = self.handle_egress_encrypted_packet(peer_ref, endpoint, packet).map_err(|e| warn!("UDP ERR: {:?}", e)); + }, + Ok(Async::NotReady) => { break; }, + Ok(Async::Ready(None)) => bail!("incoming udp stream ended unexpectedly"), + Err(e) => bail!("incoming udp stream error: {:?}", e) + } + } + + loop { // Handle packets coming from the local tunnel match self.outgoing.rx.poll() { Ok(Async::Ready(Some(packet))) => { |