diff options
Diffstat (limited to 'src/interface/peer_server.rs')
-rw-r--r-- | src/interface/peer_server.rs | 50 |
1 files changed, 19 insertions, 31 deletions
diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs index ac6941c..309fd69 100644 --- a/src/interface/peer_server.rs +++ b/src/interface/peer_server.rs @@ -11,9 +11,9 @@ use std::time::Duration; use byteorder::{ByteOrder, LittleEndian}; use failure::{Error, err_msg}; -use futures::{Async, Future, Stream, Sink, Poll, unsync::mpsc, stream, future}; +use futures::{Async, Future, Stream, Sink, Poll, unsync::mpsc}; use rand::{self, Rng}; -use udp::{UdpSocket, UdpFramed, VecUdpCodec, PeerServerMessage}; +use udp::{UdpSocket, VecUdpCodec, PeerServerMessage, UdpChannel}; use tokio_core::reactor::Handle; struct Channel<T> { @@ -33,8 +33,7 @@ impl<T> From<(mpsc::Sender<T>, mpsc::Receiver<T>)> for Channel<T> { pub struct PeerServer { handle : Handle, shared_state : SharedState, - ingress : Option<stream::SplitStream<UdpFramed<VecUdpCodec>>>, - egress_tx : Option<mpsc::Sender<PeerServerMessage>>, + udp : Option<UdpChannel>, port : Option<u16>, outgoing : Channel<UtunPacket>, config : Channel<config::UpdateEvent>, @@ -47,14 +46,13 @@ impl PeerServer { pub fn new(handle: Handle, shared_state: SharedState, tunnel_tx: mpsc::Sender<Vec<u8>>) -> Result<Self, Error> { Ok(PeerServer { shared_state, tunnel_tx, - handle : handle.clone(), - timer : Timer::new(handle), - ingress : None, - egress_tx : None, - port : None, - outgoing : mpsc::channel(1024).into(), - config : mpsc::channel(1024).into(), - cookie : cookie::Validator::new(&[0u8; 32]) + handle : handle.clone(), + timer : Timer::new(handle), + udp : None, + port : None, + outgoing : mpsc::channel(1024).into(), + config : mpsc::channel(1024).into(), + cookie : cookie::Validator::new(&[0u8; 32]) }) } @@ -65,23 +63,13 @@ impl PeerServer { return Ok(()) } - let socket = UdpSocket::bind((Ipv6Addr::unspecified(), port).into(), &self.handle)?; + let socket = UdpSocket::bind((Ipv6Addr::unspecified(), port).into(), self.handle.clone())?; info!("listening on {:?}", socket.local_addr()?); - let (udp_sink, udp_stream) = socket.framed(VecUdpCodec{}).split(); - let (egress_tx, egress_rx) = mpsc::channel(1024); - let udp_writethrough = udp_sink.sink_map_err(|_| ()).send_all( - egress_rx.and_then(|(addr, packet)| { - trace!("sending UDP packet to {:?}", &addr); - future::ok((addr, packet)) - }).map_err(|_| { info!("udp sink error"); () })) - .then(|_| Ok(())); + let udp = socket.framed(VecUdpCodec{}).into(); - self.handle.spawn(udp_writethrough); - - self.port = Some(port); - self.ingress = Some(udp_stream); - self.egress_tx = Some(egress_tx); + self.udp = Some(udp); + self.port = Some(port); Ok(()) } @@ -94,8 +82,8 @@ impl PeerServer { } fn send_to_peer(&self, payload: PeerServerMessage) -> Result<(), Error> { - let tx = self.egress_tx.as_ref().ok_or_else(|| err_msg("no egress tx"))?.clone(); - self.handle.spawn(tx.send(payload).then(|_| Ok(()))); + self.udp.as_ref().ok_or_else(|| err_msg("no udp socket"))? + .send(payload); Ok(()) } @@ -413,7 +401,7 @@ impl Future for PeerServer { PrivateKey(_) => { let pub_key = &self.shared_state.borrow().interface_info.pub_key.unwrap(); self.cookie = cookie::Validator::new(pub_key); - if self.egress_tx.is_none() { + if self.udp.is_none() { self.rebind().unwrap(); } }, @@ -438,9 +426,9 @@ impl Future for PeerServer { } // Handle UDP packets from the outside world - if self.ingress.is_some() { + if self.udp.is_some() { loop { - match self.ingress.as_mut().unwrap().poll() { + match self.udp.as_mut().unwrap().ingress.poll() { Ok(Async::Ready(Some((addr, packet)))) => { let _ = self.handle_ingress_packet(addr, &packet).map_err(|e| warn!("UDP ERR: {:?}", e)); }, |