aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/interface/mod.rs4
-rw-r--r--src/interface/peer_server.rs50
-rw-r--r--src/udp/frame.rs49
-rw-r--r--src/udp/mod.rs13
4 files changed, 74 insertions, 42 deletions
diff --git a/src/interface/mod.rs b/src/interface/mod.rs
index 5826907..c6b673c 100644
--- a/src/interface/mod.rs
+++ b/src/interface/mod.rs
@@ -237,7 +237,9 @@ impl Interface {
let config_fut = peer_server.config_tx().sink_map_err(|_|()).send_all(config_fut).map_err(|e| { warn!("error {:?}", e); () });
- core.run(reaper.join(peer_server.join(utun_fut.join(config_fut.join(config_server))))).unwrap();
+ let fut = reaper.join(peer_server.join(utun_fut.join(config_fut.join(config_server))));
+ let _ = core.run(fut);
+ info!("reactor finished.");
}
}
diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs
index ac6941c..309fd69 100644
--- a/src/interface/peer_server.rs
+++ b/src/interface/peer_server.rs
@@ -11,9 +11,9 @@ use std::time::Duration;
use byteorder::{ByteOrder, LittleEndian};
use failure::{Error, err_msg};
-use futures::{Async, Future, Stream, Sink, Poll, unsync::mpsc, stream, future};
+use futures::{Async, Future, Stream, Sink, Poll, unsync::mpsc};
use rand::{self, Rng};
-use udp::{UdpSocket, UdpFramed, VecUdpCodec, PeerServerMessage};
+use udp::{UdpSocket, VecUdpCodec, PeerServerMessage, UdpChannel};
use tokio_core::reactor::Handle;
struct Channel<T> {
@@ -33,8 +33,7 @@ impl<T> From<(mpsc::Sender<T>, mpsc::Receiver<T>)> for Channel<T> {
pub struct PeerServer {
handle : Handle,
shared_state : SharedState,
- ingress : Option<stream::SplitStream<UdpFramed<VecUdpCodec>>>,
- egress_tx : Option<mpsc::Sender<PeerServerMessage>>,
+ udp : Option<UdpChannel>,
port : Option<u16>,
outgoing : Channel<UtunPacket>,
config : Channel<config::UpdateEvent>,
@@ -47,14 +46,13 @@ impl PeerServer {
pub fn new(handle: Handle, shared_state: SharedState, tunnel_tx: mpsc::Sender<Vec<u8>>) -> Result<Self, Error> {
Ok(PeerServer {
shared_state, tunnel_tx,
- handle : handle.clone(),
- timer : Timer::new(handle),
- ingress : None,
- egress_tx : None,
- port : None,
- outgoing : mpsc::channel(1024).into(),
- config : mpsc::channel(1024).into(),
- cookie : cookie::Validator::new(&[0u8; 32])
+ handle : handle.clone(),
+ timer : Timer::new(handle),
+ udp : None,
+ port : None,
+ outgoing : mpsc::channel(1024).into(),
+ config : mpsc::channel(1024).into(),
+ cookie : cookie::Validator::new(&[0u8; 32])
})
}
@@ -65,23 +63,13 @@ impl PeerServer {
return Ok(())
}
- let socket = UdpSocket::bind((Ipv6Addr::unspecified(), port).into(), &self.handle)?;
+ let socket = UdpSocket::bind((Ipv6Addr::unspecified(), port).into(), self.handle.clone())?;
info!("listening on {:?}", socket.local_addr()?);
- let (udp_sink, udp_stream) = socket.framed(VecUdpCodec{}).split();
- let (egress_tx, egress_rx) = mpsc::channel(1024);
- let udp_writethrough = udp_sink.sink_map_err(|_| ()).send_all(
- egress_rx.and_then(|(addr, packet)| {
- trace!("sending UDP packet to {:?}", &addr);
- future::ok((addr, packet))
- }).map_err(|_| { info!("udp sink error"); () }))
- .then(|_| Ok(()));
+ let udp = socket.framed(VecUdpCodec{}).into();
- self.handle.spawn(udp_writethrough);
-
- self.port = Some(port);
- self.ingress = Some(udp_stream);
- self.egress_tx = Some(egress_tx);
+ self.udp = Some(udp);
+ self.port = Some(port);
Ok(())
}
@@ -94,8 +82,8 @@ impl PeerServer {
}
fn send_to_peer(&self, payload: PeerServerMessage) -> Result<(), Error> {
- let tx = self.egress_tx.as_ref().ok_or_else(|| err_msg("no egress tx"))?.clone();
- self.handle.spawn(tx.send(payload).then(|_| Ok(())));
+ self.udp.as_ref().ok_or_else(|| err_msg("no udp socket"))?
+ .send(payload);
Ok(())
}
@@ -413,7 +401,7 @@ impl Future for PeerServer {
PrivateKey(_) => {
let pub_key = &self.shared_state.borrow().interface_info.pub_key.unwrap();
self.cookie = cookie::Validator::new(pub_key);
- if self.egress_tx.is_none() {
+ if self.udp.is_none() {
self.rebind().unwrap();
}
},
@@ -438,9 +426,9 @@ impl Future for PeerServer {
}
// Handle UDP packets from the outside world
- if self.ingress.is_some() {
+ if self.udp.is_some() {
loop {
- match self.ingress.as_mut().unwrap().poll() {
+ match self.udp.as_mut().unwrap().ingress.poll() {
Ok(Async::Ready(Some((addr, packet)))) => {
let _ = self.handle_ingress_packet(addr, &packet).map_err(|e| warn!("UDP ERR: {:?}", e));
},
diff --git a/src/udp/frame.rs b/src/udp/frame.rs
index 51d6c33..7bb1c8b 100644
--- a/src/udp/frame.rs
+++ b/src/udp/frame.rs
@@ -1,9 +1,9 @@
use std::io;
use std::net::{SocketAddr, Ipv4Addr, SocketAddrV4, IpAddr};
-use futures::{Async, Poll, Stream, Sink, StartSend, AsyncSink};
-
+use futures::{Async, Future, Poll, Stream, Sink, StartSend, AsyncSink, future, stream, unsync::mpsc};
use udp::{ConnectedUdpSocket, UdpSocket};
+use tokio_core::reactor::Handle;
/// Encoding of frames via buffers.
///
@@ -70,6 +70,15 @@ pub struct UdpFramed<C> {
flushed: bool,
}
+impl<C> UdpFramed<C> {
+ pub fn handle(&self) -> &Handle {
+ match self.socket {
+ Socket::Unconnected(ref socket) => &socket.handle,
+ Socket::Connected(ref socket) => &socket.inner.handle,
+ }
+ }
+}
+
impl<C: UdpCodec> Stream for UdpFramed<C> {
type Item = C::In;
type Error = io::Error;
@@ -80,7 +89,7 @@ impl<C: UdpCodec> Stream for UdpFramed<C> {
Socket::Connected(ref socket) => (try_nb!(socket.inner.recv(&mut self.rd)), socket.addr),
};
trace!("received {} bytes, decoding", n);
- let frame = try!(self.codec.decode(&addr, &self.rd[..n]));
+ let frame = self.codec.decode(&addr, &self.rd[..n])?;
trace!("frame decoded from buffer");
Ok(Async::Ready(Some(frame)))
}
@@ -94,7 +103,7 @@ impl<C: UdpCodec> Sink for UdpFramed<C> {
trace!("sending frame");
if !self.flushed {
- match try!(self.poll_complete()) {
+ match self.poll_complete()? {
Async::Ready(()) => {},
Async::NotReady => return Ok(AsyncSink::NotReady(item)),
}
@@ -218,3 +227,35 @@ impl UdpCodec for VecUdpCodec {
addr
}
}
+
+pub struct UdpChannel {
+ pub ingress : stream::SplitStream<UdpFramed<VecUdpCodec>>,
+ pub egress : mpsc::Sender<PeerServerMessage>,
+ handle : Handle,
+}
+
+impl From<UdpFramed<VecUdpCodec>> for UdpChannel {
+ fn from(framed: UdpFramed<VecUdpCodec>) -> Self {
+ let handle = framed.handle().clone();
+ let (udp_sink, ingress) = framed.split();
+ let (egress, egress_rx) = mpsc::channel(1024);
+ let udp_writethrough = udp_sink
+ .sink_map_err(|_| ())
+ .send_all(egress_rx.and_then(|(addr, packet)| {
+ trace!("sending UDP packet to {:?}", &addr);
+ future::ok((addr, packet))
+ })
+ .map_err(|_| { info!("udp sink error"); () }))
+ .then(|_| Ok(()));
+
+ handle.spawn(udp_writethrough);
+
+ UdpChannel { egress, ingress, handle }
+ }
+}
+
+impl UdpChannel {
+ pub fn send(&self, message: PeerServerMessage) {
+ self.handle.spawn(self.egress.clone().send(message).then(|_| Ok(())));
+ }
+}
diff --git a/src/udp/mod.rs b/src/udp/mod.rs
index 3f9b028..f3fc3ff 100644
--- a/src/udp/mod.rs
+++ b/src/udp/mod.rs
@@ -13,10 +13,11 @@ use tokio_core::reactor::{Handle, PollEvented};
/// An I/O object representing a UDP socket.
pub struct UdpSocket {
io: PollEvented<mio::net::UdpSocket>,
+ handle: Handle,
}
mod frame;
-pub use self::frame::{UdpFramed, UdpCodec, VecUdpCodec, PeerServerMessage};
+pub use self::frame::{UdpChannel, UdpFramed, UdpCodec, VecUdpCodec, PeerServerMessage};
pub struct ConnectedUdpSocket {
inner: UdpSocket,
@@ -34,7 +35,7 @@ impl UdpSocket {
///
/// This function will create a new UDP socket and attempt to bind it to the
/// `addr` provided. If the result is `Ok`, the socket has successfully bound.
- pub fn bind(addr: SocketAddr, handle: &Handle) -> io::Result<UdpSocket> {
+ pub fn bind(addr: SocketAddr, handle: Handle) -> io::Result<UdpSocket> {
let socket = Socket::new(Domain::ipv6(), Type::dgram(), Some(Protocol::udp()))?;
socket.set_only_v6(false)?;
socket.set_nonblocking(true)?;
@@ -43,9 +44,9 @@ impl UdpSocket {
Self::from_socket(socket.into_udp_socket(), handle)
}
- fn new(socket: mio::net::UdpSocket, handle: &Handle) -> io::Result<UdpSocket> {
- let io = PollEvented::new(socket, handle)?;
- Ok(UdpSocket { io })
+ fn new(socket: mio::net::UdpSocket, handle: Handle) -> io::Result<UdpSocket> {
+ let io = PollEvented::new(socket, &handle)?;
+ Ok(UdpSocket { io, handle })
}
/// Creates a new `UdpSocket` from the previously bound socket provided.
@@ -58,7 +59,7 @@ impl UdpSocket {
/// configure a socket before it's handed off, such as setting options like
/// `reuse_address` or binding to multiple addresses.
pub fn from_socket(socket: net::UdpSocket,
- handle: &Handle) -> io::Result<UdpSocket> {
+ handle: Handle) -> io::Result<UdpSocket> {
let udp = mio::net::UdpSocket::from_socket(socket)?;
UdpSocket::new(udp, handle)
}