aboutsummaryrefslogtreecommitdiffstats
path: root/src/interface
diff options
context:
space:
mode:
authorJake McGinty <me@jake.su>2018-03-03 18:42:35 +0000
committerJake McGinty <me@jake.su>2018-03-04 01:23:50 +0000
commit93a24bb5a40f355bfc9ba5a26432a216cf4a787d (patch)
tree46962d497684662ce6f4f509cf7e642476a26962 /src/interface
parentudp: simplify/consolidate udp socket code (diff)
downloadwireguard-rs-93a24bb5a40f355bfc9ba5a26432a216cf4a787d.tar.xz
wireguard-rs-93a24bb5a40f355bfc9ba5a26432a216cf4a787d.zip
udp: move all the UDP-related internals into its own struct
Diffstat (limited to 'src/interface')
-rw-r--r--src/interface/mod.rs4
-rw-r--r--src/interface/peer_server.rs50
2 files changed, 22 insertions, 32 deletions
diff --git a/src/interface/mod.rs b/src/interface/mod.rs
index 5826907..c6b673c 100644
--- a/src/interface/mod.rs
+++ b/src/interface/mod.rs
@@ -237,7 +237,9 @@ impl Interface {
let config_fut = peer_server.config_tx().sink_map_err(|_|()).send_all(config_fut).map_err(|e| { warn!("error {:?}", e); () });
- core.run(reaper.join(peer_server.join(utun_fut.join(config_fut.join(config_server))))).unwrap();
+ let fut = reaper.join(peer_server.join(utun_fut.join(config_fut.join(config_server))));
+ let _ = core.run(fut);
+ info!("reactor finished.");
}
}
diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs
index ac6941c..309fd69 100644
--- a/src/interface/peer_server.rs
+++ b/src/interface/peer_server.rs
@@ -11,9 +11,9 @@ use std::time::Duration;
use byteorder::{ByteOrder, LittleEndian};
use failure::{Error, err_msg};
-use futures::{Async, Future, Stream, Sink, Poll, unsync::mpsc, stream, future};
+use futures::{Async, Future, Stream, Sink, Poll, unsync::mpsc};
use rand::{self, Rng};
-use udp::{UdpSocket, UdpFramed, VecUdpCodec, PeerServerMessage};
+use udp::{UdpSocket, VecUdpCodec, PeerServerMessage, UdpChannel};
use tokio_core::reactor::Handle;
struct Channel<T> {
@@ -33,8 +33,7 @@ impl<T> From<(mpsc::Sender<T>, mpsc::Receiver<T>)> for Channel<T> {
pub struct PeerServer {
handle : Handle,
shared_state : SharedState,
- ingress : Option<stream::SplitStream<UdpFramed<VecUdpCodec>>>,
- egress_tx : Option<mpsc::Sender<PeerServerMessage>>,
+ udp : Option<UdpChannel>,
port : Option<u16>,
outgoing : Channel<UtunPacket>,
config : Channel<config::UpdateEvent>,
@@ -47,14 +46,13 @@ impl PeerServer {
pub fn new(handle: Handle, shared_state: SharedState, tunnel_tx: mpsc::Sender<Vec<u8>>) -> Result<Self, Error> {
Ok(PeerServer {
shared_state, tunnel_tx,
- handle : handle.clone(),
- timer : Timer::new(handle),
- ingress : None,
- egress_tx : None,
- port : None,
- outgoing : mpsc::channel(1024).into(),
- config : mpsc::channel(1024).into(),
- cookie : cookie::Validator::new(&[0u8; 32])
+ handle : handle.clone(),
+ timer : Timer::new(handle),
+ udp : None,
+ port : None,
+ outgoing : mpsc::channel(1024).into(),
+ config : mpsc::channel(1024).into(),
+ cookie : cookie::Validator::new(&[0u8; 32])
})
}
@@ -65,23 +63,13 @@ impl PeerServer {
return Ok(())
}
- let socket = UdpSocket::bind((Ipv6Addr::unspecified(), port).into(), &self.handle)?;
+ let socket = UdpSocket::bind((Ipv6Addr::unspecified(), port).into(), self.handle.clone())?;
info!("listening on {:?}", socket.local_addr()?);
- let (udp_sink, udp_stream) = socket.framed(VecUdpCodec{}).split();
- let (egress_tx, egress_rx) = mpsc::channel(1024);
- let udp_writethrough = udp_sink.sink_map_err(|_| ()).send_all(
- egress_rx.and_then(|(addr, packet)| {
- trace!("sending UDP packet to {:?}", &addr);
- future::ok((addr, packet))
- }).map_err(|_| { info!("udp sink error"); () }))
- .then(|_| Ok(()));
+ let udp = socket.framed(VecUdpCodec{}).into();
- self.handle.spawn(udp_writethrough);
-
- self.port = Some(port);
- self.ingress = Some(udp_stream);
- self.egress_tx = Some(egress_tx);
+ self.udp = Some(udp);
+ self.port = Some(port);
Ok(())
}
@@ -94,8 +82,8 @@ impl PeerServer {
}
fn send_to_peer(&self, payload: PeerServerMessage) -> Result<(), Error> {
- let tx = self.egress_tx.as_ref().ok_or_else(|| err_msg("no egress tx"))?.clone();
- self.handle.spawn(tx.send(payload).then(|_| Ok(())));
+ self.udp.as_ref().ok_or_else(|| err_msg("no udp socket"))?
+ .send(payload);
Ok(())
}
@@ -413,7 +401,7 @@ impl Future for PeerServer {
PrivateKey(_) => {
let pub_key = &self.shared_state.borrow().interface_info.pub_key.unwrap();
self.cookie = cookie::Validator::new(pub_key);
- if self.egress_tx.is_none() {
+ if self.udp.is_none() {
self.rebind().unwrap();
}
},
@@ -438,9 +426,9 @@ impl Future for PeerServer {
}
// Handle UDP packets from the outside world
- if self.ingress.is_some() {
+ if self.udp.is_some() {
loop {
- match self.ingress.as_mut().unwrap().poll() {
+ match self.udp.as_mut().unwrap().ingress.poll() {
Ok(Async::Ready(Some((addr, packet)))) => {
let _ = self.handle_ingress_packet(addr, &packet).map_err(|e| warn!("UDP ERR: {:?}", e));
},