aboutsummaryrefslogtreecommitdiffstats
path: root/src/interface/peer_server.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/interface/peer_server.rs')
-rw-r--r--src/interface/peer_server.rs75
1 files changed, 20 insertions, 55 deletions
diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs
index d03fa58..ee40061 100644
--- a/src/interface/peer_server.rs
+++ b/src/interface/peer_server.rs
@@ -2,7 +2,7 @@ use consts::{REKEY_TIMEOUT, KEEPALIVE_TIMEOUT, STALE_SESSION_TIMEOUT,
MAX_CONTENT_SIZE, WIPE_AFTER_TIME, MAX_HANDSHAKE_ATTEMPTS,
UNDER_LOAD_QUEUE_SIZE, UNDER_LOAD_TIME};
use cookie;
-use interface::{SharedPeer, SharedState, State, UtunPacket};
+use interface::{SharedPeer, SharedState, State};
use message::{Message, Initiation, Response, CookieReply, Transport};
use peer::{Peer, SessionType, SessionTransition};
use ratelimiter::RateLimiter;
@@ -12,10 +12,10 @@ use timer::{Timer, TimerMessage};
use byteorder::{ByteOrder, LittleEndian};
use failure::{Error, err_msg};
use futures::{Async, Future, Stream, Poll, unsync::mpsc, task};
-use futures_cpupool::CpuPool;
use rand::{self, Rng, ThreadRng};
use udp::{Endpoint, UdpSocket, PeerServerMessage, UdpChannel};
use tokio_core::reactor::Handle;
+use types::PacketVec;
use std::collections::VecDeque;
use std::convert::TryInto;
@@ -51,17 +51,15 @@ pub struct PeerServer {
shared_state : SharedState,
udp : Option<UdpChannel>,
port : Option<u16>,
- outgoing : Channel<UtunPacket>,
+ outgoing : Channel<PacketVec>,
channel : Channel<ChannelMessage>,
handshakes : VecDeque<(Endpoint, Message)>,
timer : Timer,
- tunnel_tx : mpsc::UnboundedSender<Vec<u8>>,
+ tunnel_tx : mpsc::UnboundedSender<PacketVec>,
cookie : cookie::Validator,
rate_limiter : RateLimiter,
under_load_until : Instant,
rng : ThreadRng,
- cpu_pool : CpuPool,
- decrypt_channel : Channel<(Endpoint, Transport, Vec<u8>, SessionType)>,
}
impl PeerServer {
@@ -79,8 +77,6 @@ impl PeerServer {
rate_limiter : RateLimiter::new(&handle)?,
under_load_until : Instant::now(),
rng : rand::thread_rng(),
- cpu_pool : CpuPool::new_num_cpus(),
- decrypt_channel : mpsc::unbounded().into(),
})
}
@@ -116,7 +112,7 @@ impl PeerServer {
Ok(())
}
- pub fn tunnel_tx(&self) -> mpsc::UnboundedSender<UtunPacket> {
+ pub fn tunnel_tx(&self) -> mpsc::UnboundedSender<PacketVec> {
self.outgoing.tx.clone()
}
@@ -266,11 +262,11 @@ impl PeerServer {
if !peer.outgoing_queue.is_empty() {
debug!("sending {} queued egress packets", peer.outgoing_queue.len());
while let Some(packet) = peer.outgoing_queue.pop_front() {
- self.send_to_peer(peer.handle_outgoing_transport(packet.payload())?)?;
+ self.send_to_peer(peer.handle_outgoing_transport(packet)?)?;
}
} else {
debug!("sending empty keepalive");
- self.send_to_peer(peer.handle_outgoing_transport(&[])?)?;
+ self.send_to_peer(peer.handle_outgoing_transport(vec![])?)?;
}
} else {
error!("peer not ready for transport after processing handshake response. this shouldn't happen.");
@@ -290,42 +286,23 @@ impl PeerServer {
}
fn handle_ingress_transport(&mut self, addr: Endpoint, packet: Transport) -> Result<(), Error> {
-
let peer_ref = self.shared_state.borrow().index_map.get(&packet.our_index())
.ok_or_else(|| err_msg("unknown our_index"))?.clone();
- let mut peer = peer_ref.borrow_mut();
- let tx = self.decrypt_channel.tx.clone();
- let f = self.cpu_pool.spawn(peer.handle_incoming_transport(addr, packet)?)
- .and_then(move |result| {
- tx.unbounded_send(result).expect("broken decrypt channel");
- Ok(())
- })
- .map_err(|e| warn!("{:?}", e));
- self.handle.spawn(f);
-
- Ok(())
- }
-
- fn handle_ingress_decrypted_transport(&mut self, addr: Endpoint, orig_packet: Transport, raw_packet: Vec<u8>, session_type: SessionType)
- -> Result<(), Error>
- {
- let peer_ref = self.shared_state.borrow().index_map.get(&orig_packet.our_index())
- .ok_or_else(|| err_msg("unknown our_index"))?.clone();
-
- let needs_handshake = {
+ let (raw_packet, needs_handshake) = {
let mut peer = peer_ref.borrow_mut();
- let transition = peer.handle_incoming_decrypted_transport(addr, &raw_packet, session_type)?;
let mut state = self.shared_state.borrow_mut();
+ let (raw_packet, transition) = peer.handle_incoming_transport(addr, &packet)?;
+
if let SessionTransition::Transition(possible_dead_index) = transition {
if let Some(index) = possible_dead_index {
let _ = state.index_map.remove(&index);
}
- let outgoing: Vec<UtunPacket> = peer.outgoing_queue.drain(..).collect();
+ let outgoing: Vec<PacketVec> = peer.outgoing_queue.drain(..).collect();
for packet in outgoing {
- match peer.handle_outgoing_transport(packet.payload()) {
+ match peer.handle_outgoing_transport(packet) {
Ok(message) => self.send_to_peer(message)?,
Err(e) => warn!("failed to encrypt packet: {}", e)
}
@@ -333,7 +310,7 @@ impl PeerServer {
self.timer.send_after(*WIPE_AFTER_TIME, TimerMessage::Wipe(Rc::downgrade(&peer_ref)));
}
- peer.needs_new_handshake(false)
+ (raw_packet, peer.needs_new_handshake(false))
};
if needs_handshake {
@@ -352,10 +329,10 @@ impl PeerServer {
Ok(())
}
- fn handle_egress_packet(&mut self, packet: UtunPacket) -> Result<(), Error> {
- ensure!(!packet.payload().is_empty() && packet.payload().len() <= MAX_CONTENT_SIZE, "egress packet outside of size bounds");
+ fn handle_egress_packet(&mut self, packet: PacketVec) -> Result<(), Error> {
+ ensure!(!packet.is_empty() && packet.len() <= MAX_CONTENT_SIZE, "egress packet outside of size bounds");
- let peer_ref = self.shared_state.borrow_mut().router.route_to_peer(packet.payload())
+ let peer_ref = self.shared_state.borrow_mut().router.route_to_peer(&packet)
.ok_or_else(|| err_msg("no route to peer"))?;
let needs_handshake = {
@@ -369,7 +346,7 @@ impl PeerServer {
}
while let Some(packet) = peer.outgoing_queue.pop_front() {
- self.send_to_peer(peer.handle_outgoing_transport(packet.payload())?)?;
+ self.send_to_peer(peer.handle_outgoing_transport(packet)?)?;
}
}
@@ -488,7 +465,7 @@ impl PeerServer {
}
}
- self.send_to_peer(peer.handle_outgoing_transport(&[])?)?;
+ self.send_to_peer(peer.handle_outgoing_transport(vec![])?)?;
debug!("sent passive keepalive packet");
self.timer.send_after(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone()));
@@ -506,7 +483,7 @@ impl PeerServer {
bail!("persistent keepalive tick (waiting ~{}s due to last authenticated packet time)", wait.as_secs());
}
- self.send_to_peer(peer.handle_outgoing_transport(&[])?)?;
+ self.send_to_peer(peer.handle_outgoing_transport(vec![])?)?;
let handle = self.timer.send_after(persistent_keepalive, PersistentKeepAlive(peer_ref.clone()));
peer.timers.persistent_timer = Some(handle);
debug!("sent persistent keepalive packet");
@@ -564,7 +541,7 @@ impl PeerServer {
if let Some(keepalive) = peer.info.persistent_keepalive() {
let handle = self.timer.send_after(keepalive, TimerMessage::PersistentKeepAlive(Rc::downgrade(&peer_ref)));
peer.timers.persistent_timer = Some(handle);
- self.send_to_peer(peer.handle_outgoing_transport(&[])?)?;
+ self.send_to_peer(peer.handle_outgoing_transport(vec![])?)?;
debug!("set new keepalive timer and immediately sent new keepalive packet.");
}
}
@@ -626,18 +603,6 @@ impl Future for PeerServer {
}
loop {
- // Handle UDP packets from the outside world
- match self.decrypt_channel.rx.poll() {
- Ok(Async::Ready(Some((addr, orig_packet, decrypted, session_type)))) => {
- let _ = self.handle_ingress_decrypted_transport(addr, orig_packet, decrypted, session_type).map_err(|e| warn!("UDP ERR: {:?}", e));
- },
- Ok(Async::NotReady) => { break; },
- Ok(Async::Ready(None)) => bail!("incoming udp stream ended unexpectedly"),
- Err(e) => bail!("incoming udp stream error: {:?}", e)
- }
- }
-
- loop {
// Handle packets coming from the local tunnel
match self.outgoing.rx.poll() {
Ok(Async::Ready(Some(packet))) => {