aboutsummaryrefslogtreecommitdiffstats
path: root/src/interface/peer_server.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/interface/peer_server.rs')
-rw-r--r--src/interface/peer_server.rs56
1 files changed, 44 insertions, 12 deletions
diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs
index d03fa58..aa82fb7 100644
--- a/src/interface/peer_server.rs
+++ b/src/interface/peer_server.rs
@@ -62,6 +62,7 @@ pub struct PeerServer {
rng : ThreadRng,
cpu_pool : CpuPool,
decrypt_channel : Channel<(Endpoint, Transport, Vec<u8>, SessionType)>,
+ encrypt_channel : Channel<(SharedPeer, (Endpoint, Vec<u8>))>,
}
impl PeerServer {
@@ -81,6 +82,7 @@ impl PeerServer {
rng : rand::thread_rng(),
cpu_pool : CpuPool::new_num_cpus(),
decrypt_channel : mpsc::unbounded().into(),
+ encrypt_channel : mpsc::unbounded().into(),
})
}
@@ -250,7 +252,8 @@ impl PeerServer {
}
debug!("got handshake response (0x02)");
- let mut state = self.shared_state.borrow_mut();
+ let shared_state = self.shared_state.clone();
+ let mut state = shared_state.borrow_mut();
let our_index = LittleEndian::read_u32(&packet[8..]);
let peer_ref = state.index_map.get(&our_index)
.ok_or_else(|| format_err!("unknown our_index ({})", our_index))?
@@ -266,11 +269,11 @@ impl PeerServer {
if !peer.outgoing_queue.is_empty() {
debug!("sending {} queued egress packets", peer.outgoing_queue.len());
while let Some(packet) = peer.outgoing_queue.pop_front() {
- self.send_to_peer(peer.handle_outgoing_transport(packet.payload())?)?;
+ self.encrypt_and_send(peer_ref.clone(), &mut peer, packet)?;
}
} else {
debug!("sending empty keepalive");
- self.send_to_peer(peer.handle_outgoing_transport(&[])?)?;
+ self.encrypt_and_send(peer_ref.clone(), &mut peer, UtunPacket::Inet4(vec![]))?;
}
} else {
error!("peer not ready for transport after processing handshake response. this shouldn't happen.");
@@ -307,6 +310,18 @@ impl PeerServer {
Ok(())
}
+ fn encrypt_and_send(&mut self, peer_ref: SharedPeer, peer: &mut Peer, packet: UtunPacket) -> Result<(), Error> {
+ let tx = self.encrypt_channel.tx.clone();
+ let f = self.cpu_pool.spawn(peer.handle_outgoing_transport(packet)?)
+ .and_then(move |result| {
+ tx.unbounded_send((peer_ref, result)).expect("broken decrypt channel");
+ Ok(())
+ })
+ .map_err(|e| warn!("{:?}", e));
+ self.handle.spawn(f);
+ Ok(())
+ }
+
fn handle_ingress_decrypted_transport(&mut self, addr: Endpoint, orig_packet: Transport, raw_packet: Vec<u8>, session_type: SessionType)
-> Result<(), Error>
{
@@ -316,7 +331,8 @@ impl PeerServer {
let needs_handshake = {
let mut peer = peer_ref.borrow_mut();
let transition = peer.handle_incoming_decrypted_transport(addr, &raw_packet, session_type)?;
- let mut state = self.shared_state.borrow_mut();
+ let shared_state = self.shared_state.clone();
+ let mut state = shared_state.borrow_mut();
if let SessionTransition::Transition(possible_dead_index) = transition {
if let Some(index) = possible_dead_index {
let _ = state.index_map.remove(&index);
@@ -325,10 +341,7 @@ impl PeerServer {
let outgoing: Vec<UtunPacket> = peer.outgoing_queue.drain(..).collect();
for packet in outgoing {
- match peer.handle_outgoing_transport(packet.payload()) {
- Ok(message) => self.send_to_peer(message)?,
- Err(e) => warn!("failed to encrypt packet: {}", e)
- }
+ self.encrypt_and_send(peer_ref.clone(), &mut peer, packet)?;
}
self.timer.send_after(*WIPE_AFTER_TIME, TimerMessage::Wipe(Rc::downgrade(&peer_ref)));
@@ -369,7 +382,7 @@ impl PeerServer {
}
while let Some(packet) = peer.outgoing_queue.pop_front() {
- self.send_to_peer(peer.handle_outgoing_transport(packet.payload())?)?;
+ self.encrypt_and_send(peer_ref.clone(), &mut peer, packet)?;
}
}
@@ -383,6 +396,13 @@ impl PeerServer {
Ok(())
}
+ fn handle_egress_encrypted_packet(&mut self, peer_ref: SharedPeer, endpoint: Endpoint, packet: Vec<u8>) -> Result<(), Error> {
+ let mut peer = peer_ref.borrow_mut();
+ peer.handle_outgoing_encrypted_transport(&packet);
+
+ self.send_to_peer((endpoint, packet))
+ }
+
fn send_cookie_reply(&mut self, addr: Endpoint, mac1: &[u8], index: u32) -> Result<(), Error> {
let reply = match addr.ip() {
IpAddr::V4(ip) => self.cookie.generate_reply(index, mac1, &ip.octets())?,
@@ -488,7 +508,7 @@ impl PeerServer {
}
}
- self.send_to_peer(peer.handle_outgoing_transport(&[])?)?;
+ self.encrypt_and_send(upgraded_peer_ref.clone(), &mut peer, UtunPacket::Inet4(vec![]))?;
debug!("sent passive keepalive packet");
self.timer.send_after(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone()));
@@ -506,7 +526,7 @@ impl PeerServer {
bail!("persistent keepalive tick (waiting ~{}s due to last authenticated packet time)", wait.as_secs());
}
- self.send_to_peer(peer.handle_outgoing_transport(&[])?)?;
+ self.encrypt_and_send(upgraded_peer_ref.clone(), &mut peer, UtunPacket::Inet4(vec![]))?;
let handle = self.timer.send_after(persistent_keepalive, PersistentKeepAlive(peer_ref.clone()));
peer.timers.persistent_timer = Some(handle);
debug!("sent persistent keepalive packet");
@@ -564,7 +584,7 @@ impl PeerServer {
if let Some(keepalive) = peer.info.persistent_keepalive() {
let handle = self.timer.send_after(keepalive, TimerMessage::PersistentKeepAlive(Rc::downgrade(&peer_ref)));
peer.timers.persistent_timer = Some(handle);
- self.send_to_peer(peer.handle_outgoing_transport(&[])?)?;
+ self.encrypt_and_send(peer_ref.clone(), &mut peer, UtunPacket::Inet4(vec![]))?;
debug!("set new keepalive timer and immediately sent new keepalive packet.");
}
}
@@ -638,6 +658,18 @@ impl Future for PeerServer {
}
loop {
+ // Handle UDP packets from the outside world
+ match self.encrypt_channel.rx.poll() {
+ Ok(Async::Ready(Some((peer_ref, (endpoint, packet))))) => {
+ let _ = self.handle_egress_encrypted_packet(peer_ref, endpoint, packet).map_err(|e| warn!("UDP ERR: {:?}", e));
+ },
+ Ok(Async::NotReady) => { break; },
+ Ok(Async::Ready(None)) => bail!("incoming udp stream ended unexpectedly"),
+ Err(e) => bail!("incoming udp stream error: {:?}", e)
+ }
+ }
+
+ loop {
// Handle packets coming from the local tunnel
match self.outgoing.rx.poll() {
Ok(Async::Ready(Some(packet))) => {