aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorJake McGinty <me@jake.su>2018-05-03 13:04:32 -0700
committerJake McGinty <me@jake.su>2018-05-03 13:04:32 -0700
commitf5d3490f6469cfa43a64c749b50305d61fccc0f0 (patch)
tree624753f8f36ba0aaea0b2f4114e404edd7b2b732 /src
parentnoise: use my updated fork of x25519-dalek to reduce old deps (diff)
downloadwireguard-rs-f5d3490f6469cfa43a64c749b50305d61fccc0f0.tar.xz
wireguard-rs-f5d3490f6469cfa43a64c749b50305d61fccc0f0.zip
config: refactor to send more specific messages to peer_server on change
Diffstat (limited to 'src')
-rw-r--r--src/interface/config.rs62
-rw-r--r--src/interface/mod.rs4
-rw-r--r--src/interface/peer_server.rs37
3 files changed, 64 insertions, 39 deletions
diff --git a/src/interface/config.rs b/src/interface/config.rs
index 488b5bb..882b6cb 100644
--- a/src/interface/config.rs
+++ b/src/interface/config.rs
@@ -3,26 +3,28 @@
// Dev notes:
// * Configuration service should use channels to report updates it receives over its interface.
+use std::net::SocketAddr;
+use std::{cell::RefCell, iter::Iterator, rc::Rc, mem, str};
+use std::fs::{create_dir, remove_file};
+use std::path::{Path, PathBuf};
+
use base64;
use bytes::BytesMut;
-use consts::MAX_PEERS_PER_DEVICE;
use failure::{Error, err_msg};
-use futures::{Async, Future, Poll, Stream, Sink, future, stream, unsync::mpsc};
-use hex;
+use futures::{Async, Future, Poll, Stream, Sink, future, unsync::mpsc};
+use hex::{self, FromHex};
+use tokio_core::reactor::Handle;
+use tokio_io::{AsyncRead, codec::{Encoder, Decoder}};
+use tokio_uds::UnixListener;
+use x25519_dalek as x25519;
+
+use consts::MAX_PEERS_PER_DEVICE;
use interface::{SharedState, State};
use interface::grim_reaper::GrimReaper;
+use interface::peer_server::ChannelMessage;
use peer::Peer;
-use std::net::SocketAddr;
-use std::{cell::RefCell, iter::Iterator, rc::Rc, mem, str};
-use std::fs::{create_dir, remove_file};
-use std::path::{Path, PathBuf};
-use tokio_core::reactor::Handle;
use types::PeerInfo;
-use hex::FromHex;
-use x25519_dalek as x25519;
-use tokio_io::{AsyncRead, codec::{Encoder, Decoder}};
-use tokio_uds::UnixListener;
#[derive(Debug)]
pub enum Command {
@@ -140,21 +142,20 @@ pub struct ConfigurationService {
interface_name: String,
config_server: Box<Future<Item = (), Error = ()>>,
reaper: Box<Future<Item = (), Error = ()>>,
- rx: mpsc::Receiver<UpdateEvent>,
+ rx: mpsc::Receiver<ChannelMessage>
}
impl ConfigurationService {
pub fn new(interface_name: &str, state: &SharedState, handle: &Handle) -> Result<Self, Error> {
let config_path = Self::get_path(interface_name).unwrap();
let listener = UnixListener::bind(config_path.clone(), handle).unwrap();
- let (tx, rx) = mpsc::channel::<UpdateEvent>(1024);
+ let (tx, rx) = mpsc::channel::<ChannelMessage>(1024);
// TODO only listen for own socket, verify behavior from `notify` crate
let reaper = GrimReaper::spawn(handle, &config_path).unwrap();
let config_server = listener.incoming().for_each({
let handle = handle.clone();
- let tx = tx.clone();
let state = state.clone();
move |(stream, _)| {
let (sink, stream) = stream.framed(ConfigurationCodec {}).split();
@@ -169,11 +170,12 @@ impl ConfigurationService {
match command {
Command::Set(_version, items) => {
for item in &items {
- if Self::handle_update(&mut state, item).is_err() {
- return future::ok("errno=1\nerrno=1\n\n".into());
+ match Self::handle_update(&mut state, item) {
+ Ok(Some(msg)) => { tx.clone().send(msg).wait().unwrap(); },
+ Err(_) => { return future::ok("errno=1\nerrno=1\n\n".into()); },
+ _ => {}
}
}
- tx.clone().send_all(stream::iter_ok(items)).wait().unwrap();
future::ok("errno=0\nerrno=0\n\n".into())
},
Command::Get(_version) => {
@@ -220,13 +222,14 @@ impl ConfigurationService {
state.router.remove_allowed_ips(&peer.info.allowed_ips);
}
- pub fn handle_update(state: &mut State, event: &UpdateEvent) -> Result<(), Error> {
+ pub fn handle_update(state: &mut State, event: &UpdateEvent) -> Result<Option<ChannelMessage>, Error> {
match *event {
UpdateEvent::PrivateKey(private_key) => {
if private_key == [0u8; 32] {
state.interface_info.private_key = None;
state.interface_info.pub_key = None;
debug!("unset private key");
+ Ok(Some(ChannelMessage::ClearPrivateKey))
} else {
let pub_key = x25519::generate_public(&private_key);
state.interface_info.private_key = Some(private_key);
@@ -237,16 +240,19 @@ impl ConfigurationService {
Self::clear_peer_refs(state, &peer_ref.borrow());
debug!("removed self from peers");
}
+ Ok(Some(ChannelMessage::NewPrivateKey))
}
},
UpdateEvent::ListenPort(port) => {
state.interface_info.listen_port = Some(port);
debug!("set listen port: {}", port);
+ Ok(Some(ChannelMessage::NewListenPort(port))) // TODO: only notify on listen port *change*
},
UpdateEvent::Fwmark(mark) => {
state.interface_info.fwmark = Some(mark);
debug!("set fwmark: {}", mark);
- }
+ Ok(Some(ChannelMessage::NewFwmark(mark))) // TODO: only notify on fwmark *change*
+ },
UpdateEvent::UpdatePeer(ref info, replace_allowed_ips) => {
let existing_peer = state.pubkey_map.get(&info.pub_key).cloned();
if let Some(peer_ref) = existing_peer {
@@ -258,16 +264,22 @@ impl ConfigurationService {
} else {
info.allowed_ips.extend_from_slice(&peer.info.allowed_ips);
}
+ let ret = if info.keepalive.is_some() && peer.info.keepalive != info.keepalive {
+ Some(ChannelMessage::NewPersistentKeepalive(info.keepalive.unwrap()))
+ } else {
+ None
+ };
info.endpoint = info.endpoint.or(peer.info.endpoint);
info.keepalive = info.keepalive.or(peer.info.keepalive);
info.psk = info.psk.or(peer.info.psk);
state.router.add_allowed_ips(&info.allowed_ips, &peer_ref);
peer.info = info;
+ Ok(ret)
} else {
if let Some(pub_key) = state.interface_info.pub_key {
if pub_key == info.pub_key {
debug!("ignoring self-peer add");
- return Ok(())
+ return Ok(None)
}
}
@@ -280,20 +292,22 @@ impl ConfigurationService {
let peer_ref = Rc::new(RefCell::new(peer));
let _ = state.pubkey_map.insert(info.pub_key, peer_ref.clone());
state.router.add_allowed_ips(&info.allowed_ips, &peer_ref);
- };
+ Ok(None) // TODO: notify specifically on details of these new peers
+ }
},
UpdateEvent::RemoveAllPeers => {
state.pubkey_map.clear();
state.index_map.clear();
state.router.clear();
+ Ok(None)
},
UpdateEvent::RemovePeer(pub_key) => {
let peer_ref = state.pubkey_map.remove(&pub_key)
.ok_or_else(|| err_msg("trying to remove nonexistent peer"))?;
Self::clear_peer_refs(state, &peer_ref.borrow());
+ Ok(None)
},
}
- Ok(())
}
pub fn get_path(interface_name: &str) -> Result<PathBuf, Error> {
@@ -342,7 +356,7 @@ impl ConfigurationService {
}
impl Stream for ConfigurationService {
- type Item = UpdateEvent;
+ type Item = ChannelMessage;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
diff --git a/src/interface/mod.rs b/src/interface/mod.rs
index c670bbe..0d12048 100644
--- a/src/interface/mod.rs
+++ b/src/interface/mod.rs
@@ -99,12 +99,12 @@ impl Interface {
let peer_server = PeerServer::new(core.handle(), self.state.clone(), utun_tx.clone())?;
let config_server = ConfigurationService::new(&self.name, &self.state, &core.handle())?;
- let config_server = config_server.forward(peer_server.config_tx()).map_err(|_|()); // TODO: don't just forward, this is so hacky.
+ let config_server = config_server.forward(peer_server.tx()).map_err(|_|()); // TODO: don't just forward, this is so hacky.
let utun_stream = UtunStream::connect(&self.name, &core.handle())?.framed(VecUtunCodec{});
let (utun_writer, utun_reader) = utun_stream.split();
- let utun_read_fut = peer_server.tx()
+ let utun_read_fut = peer_server.tunnel_tx()
.sink_map_err(|e| -> Error { e.into() })
.send_all(utun_reader.map_err(|e| -> Error { e.into() }))
.map_err(|e| { warn!("utun read error: {:?}", e); () });
diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs
index 9595deb..aa4f53e 100644
--- a/src/interface/peer_server.rs
+++ b/src/interface/peer_server.rs
@@ -1,7 +1,7 @@
use consts::{REKEY_TIMEOUT, REKEY_ATTEMPT_TIME, KEEPALIVE_TIMEOUT, STALE_SESSION_TIMEOUT,
MAX_CONTENT_SIZE, WIPE_AFTER_TIME};
use cookie;
-use interface::{SharedPeer, SharedState, State, UtunPacket, config};
+use interface::{SharedPeer, SharedState, State, UtunPacket};
use message::{Message, Initiation, Response, CookieReply, Transport};
use peer::{Peer, SessionType, SessionTransition};
use time::Timestamp;
@@ -14,7 +14,15 @@ use rand::{self, Rng};
use udp::{Endpoint, UdpSocket, PeerServerMessage, UdpChannel};
use tokio_core::reactor::Handle;
-use std::{collections::VecDeque, convert::TryInto, time::Duration};
+use std::{convert::TryInto, time::Duration};
+
+pub enum ChannelMessage {
+ ClearPrivateKey,
+ NewPrivateKey,
+ NewListenPort(u16),
+ NewFwmark(u32),
+ NewPersistentKeepalive(u16),
+}
struct Channel<T> {
tx: mpsc::Sender<T>,
@@ -36,7 +44,7 @@ pub struct PeerServer {
udp : Option<UdpChannel>,
port : Option<u16>,
outgoing : Channel<UtunPacket>,
- config : Channel<config::UpdateEvent>,
+ channel : Channel<ChannelMessage>,
timer : Timer,
tunnel_tx : mpsc::Sender<Vec<u8>>,
cookie : cookie::Validator,
@@ -51,7 +59,7 @@ impl PeerServer {
udp : None,
port : None,
outgoing : mpsc::channel(1024).into(),
- config : mpsc::channel(1024).into(),
+ channel : mpsc::channel(1024).into(),
cookie : cookie::Validator::new(&[0u8; 32])
})
}
@@ -88,12 +96,12 @@ impl PeerServer {
Ok(())
}
- pub fn tx(&self) -> mpsc::Sender<UtunPacket> {
+ pub fn tunnel_tx(&self) -> mpsc::Sender<UtunPacket> {
self.outgoing.tx.clone()
}
- pub fn config_tx(&self) -> mpsc::Sender<config::UpdateEvent> {
- self.config.tx.clone()
+ pub fn tx(&self) -> mpsc::Sender<ChannelMessage> {
+ self.channel.tx.clone()
}
fn send_to_peer(&self, payload: PeerServerMessage) -> Result<(), Error> {
@@ -376,6 +384,10 @@ impl PeerServer {
PersistentKeepAlive(peer_ref, our_index) => {
let mut peer = peer_ref.borrow_mut();
{
+ if peer.info.keepalive.is_none() {
+ bail!("no persistent keepalive set for peer.");
+ }
+
let (_, session_type) = peer.find_session(our_index).ok_or_else(|| err_msg("missing session for timer"))?;
ensure!(session_type == SessionType::Current, "expired session for persistent keepalive timer");
}
@@ -386,7 +398,6 @@ impl PeerServer {
if let Some(keepalive) = peer.info.keepalive {
self.timer.send_after(Duration::from_secs(u64::from(keepalive)),
PersistentKeepAlive(peer_ref.clone(), our_index));
-
}
},
Wipe(peer_ref) => {
@@ -413,11 +424,11 @@ impl Future for PeerServer {
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// Handle config events
loop {
- use self::config::UpdateEvent::*;
- match self.config.rx.poll() {
+ use self::ChannelMessage::*;
+ match self.channel.rx.poll() {
Ok(Async::Ready(Some(event))) => {
match event {
- PrivateKey(_) => {
+ NewPrivateKey => {
let pub_key = self.shared_state.borrow().interface_info.pub_key;
if let Some(ref pub_key) = pub_key {
self.cookie = cookie::Validator::new(pub_key);
@@ -429,8 +440,8 @@ impl Future for PeerServer {
self.port = None;
}
},
- ListenPort(_) => self.rebind().unwrap(),
- Fwmark(mark) => {
+ NewListenPort(_) => self.rebind().unwrap(),
+ NewFwmark(mark) => {
if let Some(ref udp) = self.udp {
udp.set_mark(mark).unwrap();
}