aboutsummaryrefslogtreecommitdiffstats
path: root/src/interface/config.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/interface/config.rs')
-rw-r--r--src/interface/config.rs62
1 files changed, 38 insertions, 24 deletions
diff --git a/src/interface/config.rs b/src/interface/config.rs
index 488b5bb..882b6cb 100644
--- a/src/interface/config.rs
+++ b/src/interface/config.rs
@@ -3,26 +3,28 @@
// Dev notes:
// * Configuration service should use channels to report updates it receives over its interface.
+use std::net::SocketAddr;
+use std::{cell::RefCell, iter::Iterator, rc::Rc, mem, str};
+use std::fs::{create_dir, remove_file};
+use std::path::{Path, PathBuf};
+
use base64;
use bytes::BytesMut;
-use consts::MAX_PEERS_PER_DEVICE;
use failure::{Error, err_msg};
-use futures::{Async, Future, Poll, Stream, Sink, future, stream, unsync::mpsc};
-use hex;
+use futures::{Async, Future, Poll, Stream, Sink, future, unsync::mpsc};
+use hex::{self, FromHex};
+use tokio_core::reactor::Handle;
+use tokio_io::{AsyncRead, codec::{Encoder, Decoder}};
+use tokio_uds::UnixListener;
+use x25519_dalek as x25519;
+
+use consts::MAX_PEERS_PER_DEVICE;
use interface::{SharedState, State};
use interface::grim_reaper::GrimReaper;
+use interface::peer_server::ChannelMessage;
use peer::Peer;
-use std::net::SocketAddr;
-use std::{cell::RefCell, iter::Iterator, rc::Rc, mem, str};
-use std::fs::{create_dir, remove_file};
-use std::path::{Path, PathBuf};
-use tokio_core::reactor::Handle;
use types::PeerInfo;
-use hex::FromHex;
-use x25519_dalek as x25519;
-use tokio_io::{AsyncRead, codec::{Encoder, Decoder}};
-use tokio_uds::UnixListener;
#[derive(Debug)]
pub enum Command {
@@ -140,21 +142,20 @@ pub struct ConfigurationService {
interface_name: String,
config_server: Box<Future<Item = (), Error = ()>>,
reaper: Box<Future<Item = (), Error = ()>>,
- rx: mpsc::Receiver<UpdateEvent>,
+ rx: mpsc::Receiver<ChannelMessage>
}
impl ConfigurationService {
pub fn new(interface_name: &str, state: &SharedState, handle: &Handle) -> Result<Self, Error> {
let config_path = Self::get_path(interface_name).unwrap();
let listener = UnixListener::bind(config_path.clone(), handle).unwrap();
- let (tx, rx) = mpsc::channel::<UpdateEvent>(1024);
+ let (tx, rx) = mpsc::channel::<ChannelMessage>(1024);
// TODO only listen for own socket, verify behavior from `notify` crate
let reaper = GrimReaper::spawn(handle, &config_path).unwrap();
let config_server = listener.incoming().for_each({
let handle = handle.clone();
- let tx = tx.clone();
let state = state.clone();
move |(stream, _)| {
let (sink, stream) = stream.framed(ConfigurationCodec {}).split();
@@ -169,11 +170,12 @@ impl ConfigurationService {
match command {
Command::Set(_version, items) => {
for item in &items {
- if Self::handle_update(&mut state, item).is_err() {
- return future::ok("errno=1\nerrno=1\n\n".into());
+ match Self::handle_update(&mut state, item) {
+ Ok(Some(msg)) => { tx.clone().send(msg).wait().unwrap(); },
+ Err(_) => { return future::ok("errno=1\nerrno=1\n\n".into()); },
+ _ => {}
}
}
- tx.clone().send_all(stream::iter_ok(items)).wait().unwrap();
future::ok("errno=0\nerrno=0\n\n".into())
},
Command::Get(_version) => {
@@ -220,13 +222,14 @@ impl ConfigurationService {
state.router.remove_allowed_ips(&peer.info.allowed_ips);
}
- pub fn handle_update(state: &mut State, event: &UpdateEvent) -> Result<(), Error> {
+ pub fn handle_update(state: &mut State, event: &UpdateEvent) -> Result<Option<ChannelMessage>, Error> {
match *event {
UpdateEvent::PrivateKey(private_key) => {
if private_key == [0u8; 32] {
state.interface_info.private_key = None;
state.interface_info.pub_key = None;
debug!("unset private key");
+ Ok(Some(ChannelMessage::ClearPrivateKey))
} else {
let pub_key = x25519::generate_public(&private_key);
state.interface_info.private_key = Some(private_key);
@@ -237,16 +240,19 @@ impl ConfigurationService {
Self::clear_peer_refs(state, &peer_ref.borrow());
debug!("removed self from peers");
}
+ Ok(Some(ChannelMessage::NewPrivateKey))
}
},
UpdateEvent::ListenPort(port) => {
state.interface_info.listen_port = Some(port);
debug!("set listen port: {}", port);
+ Ok(Some(ChannelMessage::NewListenPort(port))) // TODO: only notify on listen port *change*
},
UpdateEvent::Fwmark(mark) => {
state.interface_info.fwmark = Some(mark);
debug!("set fwmark: {}", mark);
- }
+ Ok(Some(ChannelMessage::NewFwmark(mark))) // TODO: only notify on fwmark *change*
+ },
UpdateEvent::UpdatePeer(ref info, replace_allowed_ips) => {
let existing_peer = state.pubkey_map.get(&info.pub_key).cloned();
if let Some(peer_ref) = existing_peer {
@@ -258,16 +264,22 @@ impl ConfigurationService {
} else {
info.allowed_ips.extend_from_slice(&peer.info.allowed_ips);
}
+ let ret = if info.keepalive.is_some() && peer.info.keepalive != info.keepalive {
+ Some(ChannelMessage::NewPersistentKeepalive(info.keepalive.unwrap()))
+ } else {
+ None
+ };
info.endpoint = info.endpoint.or(peer.info.endpoint);
info.keepalive = info.keepalive.or(peer.info.keepalive);
info.psk = info.psk.or(peer.info.psk);
state.router.add_allowed_ips(&info.allowed_ips, &peer_ref);
peer.info = info;
+ Ok(ret)
} else {
if let Some(pub_key) = state.interface_info.pub_key {
if pub_key == info.pub_key {
debug!("ignoring self-peer add");
- return Ok(())
+ return Ok(None)
}
}
@@ -280,20 +292,22 @@ impl ConfigurationService {
let peer_ref = Rc::new(RefCell::new(peer));
let _ = state.pubkey_map.insert(info.pub_key, peer_ref.clone());
state.router.add_allowed_ips(&info.allowed_ips, &peer_ref);
- };
+ Ok(None) // TODO: notify specifically on details of these new peers
+ }
},
UpdateEvent::RemoveAllPeers => {
state.pubkey_map.clear();
state.index_map.clear();
state.router.clear();
+ Ok(None)
},
UpdateEvent::RemovePeer(pub_key) => {
let peer_ref = state.pubkey_map.remove(&pub_key)
.ok_or_else(|| err_msg("trying to remove nonexistent peer"))?;
Self::clear_peer_refs(state, &peer_ref.borrow());
+ Ok(None)
},
}
- Ok(())
}
pub fn get_path(interface_name: &str) -> Result<PathBuf, Error> {
@@ -342,7 +356,7 @@ impl ConfigurationService {
}
impl Stream for ConfigurationService {
- type Item = UpdateEvent;
+ type Item = ChannelMessage;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {