diff options
-rw-r--r-- | src/interface/mod.rs | 4 | ||||
-rw-r--r-- | src/interface/peer_server.rs | 50 | ||||
-rw-r--r-- | src/udp/frame.rs | 49 | ||||
-rw-r--r-- | src/udp/mod.rs | 13 |
4 files changed, 74 insertions, 42 deletions
diff --git a/src/interface/mod.rs b/src/interface/mod.rs index 5826907..c6b673c 100644 --- a/src/interface/mod.rs +++ b/src/interface/mod.rs @@ -237,7 +237,9 @@ impl Interface { let config_fut = peer_server.config_tx().sink_map_err(|_|()).send_all(config_fut).map_err(|e| { warn!("error {:?}", e); () }); - core.run(reaper.join(peer_server.join(utun_fut.join(config_fut.join(config_server))))).unwrap(); + let fut = reaper.join(peer_server.join(utun_fut.join(config_fut.join(config_server)))); + let _ = core.run(fut); + info!("reactor finished."); } } diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs index ac6941c..309fd69 100644 --- a/src/interface/peer_server.rs +++ b/src/interface/peer_server.rs @@ -11,9 +11,9 @@ use std::time::Duration; use byteorder::{ByteOrder, LittleEndian}; use failure::{Error, err_msg}; -use futures::{Async, Future, Stream, Sink, Poll, unsync::mpsc, stream, future}; +use futures::{Async, Future, Stream, Sink, Poll, unsync::mpsc}; use rand::{self, Rng}; -use udp::{UdpSocket, UdpFramed, VecUdpCodec, PeerServerMessage}; +use udp::{UdpSocket, VecUdpCodec, PeerServerMessage, UdpChannel}; use tokio_core::reactor::Handle; struct Channel<T> { @@ -33,8 +33,7 @@ impl<T> From<(mpsc::Sender<T>, mpsc::Receiver<T>)> for Channel<T> { pub struct PeerServer { handle : Handle, shared_state : SharedState, - ingress : Option<stream::SplitStream<UdpFramed<VecUdpCodec>>>, - egress_tx : Option<mpsc::Sender<PeerServerMessage>>, + udp : Option<UdpChannel>, port : Option<u16>, outgoing : Channel<UtunPacket>, config : Channel<config::UpdateEvent>, @@ -47,14 +46,13 @@ impl PeerServer { pub fn new(handle: Handle, shared_state: SharedState, tunnel_tx: mpsc::Sender<Vec<u8>>) -> Result<Self, Error> { Ok(PeerServer { shared_state, tunnel_tx, - handle : handle.clone(), - timer : Timer::new(handle), - ingress : None, - egress_tx : None, - port : None, - outgoing : mpsc::channel(1024).into(), - config : mpsc::channel(1024).into(), - cookie : cookie::Validator::new(&[0u8; 32]) + handle : handle.clone(), + timer : Timer::new(handle), + udp : None, + port : None, + outgoing : mpsc::channel(1024).into(), + config : mpsc::channel(1024).into(), + cookie : cookie::Validator::new(&[0u8; 32]) }) } @@ -65,23 +63,13 @@ impl PeerServer { return Ok(()) } - let socket = UdpSocket::bind((Ipv6Addr::unspecified(), port).into(), &self.handle)?; + let socket = UdpSocket::bind((Ipv6Addr::unspecified(), port).into(), self.handle.clone())?; info!("listening on {:?}", socket.local_addr()?); - let (udp_sink, udp_stream) = socket.framed(VecUdpCodec{}).split(); - let (egress_tx, egress_rx) = mpsc::channel(1024); - let udp_writethrough = udp_sink.sink_map_err(|_| ()).send_all( - egress_rx.and_then(|(addr, packet)| { - trace!("sending UDP packet to {:?}", &addr); - future::ok((addr, packet)) - }).map_err(|_| { info!("udp sink error"); () })) - .then(|_| Ok(())); + let udp = socket.framed(VecUdpCodec{}).into(); - self.handle.spawn(udp_writethrough); - - self.port = Some(port); - self.ingress = Some(udp_stream); - self.egress_tx = Some(egress_tx); + self.udp = Some(udp); + self.port = Some(port); Ok(()) } @@ -94,8 +82,8 @@ impl PeerServer { } fn send_to_peer(&self, payload: PeerServerMessage) -> Result<(), Error> { - let tx = self.egress_tx.as_ref().ok_or_else(|| err_msg("no egress tx"))?.clone(); - self.handle.spawn(tx.send(payload).then(|_| Ok(()))); + self.udp.as_ref().ok_or_else(|| err_msg("no udp socket"))? + .send(payload); Ok(()) } @@ -413,7 +401,7 @@ impl Future for PeerServer { PrivateKey(_) => { let pub_key = &self.shared_state.borrow().interface_info.pub_key.unwrap(); self.cookie = cookie::Validator::new(pub_key); - if self.egress_tx.is_none() { + if self.udp.is_none() { self.rebind().unwrap(); } }, @@ -438,9 +426,9 @@ impl Future for PeerServer { } // Handle UDP packets from the outside world - if self.ingress.is_some() { + if self.udp.is_some() { loop { - match self.ingress.as_mut().unwrap().poll() { + match self.udp.as_mut().unwrap().ingress.poll() { Ok(Async::Ready(Some((addr, packet)))) => { let _ = self.handle_ingress_packet(addr, &packet).map_err(|e| warn!("UDP ERR: {:?}", e)); }, diff --git a/src/udp/frame.rs b/src/udp/frame.rs index 51d6c33..7bb1c8b 100644 --- a/src/udp/frame.rs +++ b/src/udp/frame.rs @@ -1,9 +1,9 @@ use std::io; use std::net::{SocketAddr, Ipv4Addr, SocketAddrV4, IpAddr}; -use futures::{Async, Poll, Stream, Sink, StartSend, AsyncSink}; - +use futures::{Async, Future, Poll, Stream, Sink, StartSend, AsyncSink, future, stream, unsync::mpsc}; use udp::{ConnectedUdpSocket, UdpSocket}; +use tokio_core::reactor::Handle; /// Encoding of frames via buffers. /// @@ -70,6 +70,15 @@ pub struct UdpFramed<C> { flushed: bool, } +impl<C> UdpFramed<C> { + pub fn handle(&self) -> &Handle { + match self.socket { + Socket::Unconnected(ref socket) => &socket.handle, + Socket::Connected(ref socket) => &socket.inner.handle, + } + } +} + impl<C: UdpCodec> Stream for UdpFramed<C> { type Item = C::In; type Error = io::Error; @@ -80,7 +89,7 @@ impl<C: UdpCodec> Stream for UdpFramed<C> { Socket::Connected(ref socket) => (try_nb!(socket.inner.recv(&mut self.rd)), socket.addr), }; trace!("received {} bytes, decoding", n); - let frame = try!(self.codec.decode(&addr, &self.rd[..n])); + let frame = self.codec.decode(&addr, &self.rd[..n])?; trace!("frame decoded from buffer"); Ok(Async::Ready(Some(frame))) } @@ -94,7 +103,7 @@ impl<C: UdpCodec> Sink for UdpFramed<C> { trace!("sending frame"); if !self.flushed { - match try!(self.poll_complete()) { + match self.poll_complete()? { Async::Ready(()) => {}, Async::NotReady => return Ok(AsyncSink::NotReady(item)), } @@ -218,3 +227,35 @@ impl UdpCodec for VecUdpCodec { addr } } + +pub struct UdpChannel { + pub ingress : stream::SplitStream<UdpFramed<VecUdpCodec>>, + pub egress : mpsc::Sender<PeerServerMessage>, + handle : Handle, +} + +impl From<UdpFramed<VecUdpCodec>> for UdpChannel { + fn from(framed: UdpFramed<VecUdpCodec>) -> Self { + let handle = framed.handle().clone(); + let (udp_sink, ingress) = framed.split(); + let (egress, egress_rx) = mpsc::channel(1024); + let udp_writethrough = udp_sink + .sink_map_err(|_| ()) + .send_all(egress_rx.and_then(|(addr, packet)| { + trace!("sending UDP packet to {:?}", &addr); + future::ok((addr, packet)) + }) + .map_err(|_| { info!("udp sink error"); () })) + .then(|_| Ok(())); + + handle.spawn(udp_writethrough); + + UdpChannel { egress, ingress, handle } + } +} + +impl UdpChannel { + pub fn send(&self, message: PeerServerMessage) { + self.handle.spawn(self.egress.clone().send(message).then(|_| Ok(()))); + } +} diff --git a/src/udp/mod.rs b/src/udp/mod.rs index 3f9b028..f3fc3ff 100644 --- a/src/udp/mod.rs +++ b/src/udp/mod.rs @@ -13,10 +13,11 @@ use tokio_core::reactor::{Handle, PollEvented}; /// An I/O object representing a UDP socket. pub struct UdpSocket { io: PollEvented<mio::net::UdpSocket>, + handle: Handle, } mod frame; -pub use self::frame::{UdpFramed, UdpCodec, VecUdpCodec, PeerServerMessage}; +pub use self::frame::{UdpChannel, UdpFramed, UdpCodec, VecUdpCodec, PeerServerMessage}; pub struct ConnectedUdpSocket { inner: UdpSocket, @@ -34,7 +35,7 @@ impl UdpSocket { /// /// This function will create a new UDP socket and attempt to bind it to the /// `addr` provided. If the result is `Ok`, the socket has successfully bound. - pub fn bind(addr: SocketAddr, handle: &Handle) -> io::Result<UdpSocket> { + pub fn bind(addr: SocketAddr, handle: Handle) -> io::Result<UdpSocket> { let socket = Socket::new(Domain::ipv6(), Type::dgram(), Some(Protocol::udp()))?; socket.set_only_v6(false)?; socket.set_nonblocking(true)?; @@ -43,9 +44,9 @@ impl UdpSocket { Self::from_socket(socket.into_udp_socket(), handle) } - fn new(socket: mio::net::UdpSocket, handle: &Handle) -> io::Result<UdpSocket> { - let io = PollEvented::new(socket, handle)?; - Ok(UdpSocket { io }) + fn new(socket: mio::net::UdpSocket, handle: Handle) -> io::Result<UdpSocket> { + let io = PollEvented::new(socket, &handle)?; + Ok(UdpSocket { io, handle }) } /// Creates a new `UdpSocket` from the previously bound socket provided. @@ -58,7 +59,7 @@ impl UdpSocket { /// configure a socket before it's handed off, such as setting options like /// `reuse_address` or binding to multiple addresses. pub fn from_socket(socket: net::UdpSocket, - handle: &Handle) -> io::Result<UdpSocket> { + handle: Handle) -> io::Result<UdpSocket> { let udp = mio::net::UdpSocket::from_socket(socket)?; UdpSocket::new(udp, handle) } |