From f5d3490f6469cfa43a64c749b50305d61fccc0f0 Mon Sep 17 00:00:00 2001 From: Jake McGinty Date: Thu, 3 May 2018 13:04:32 -0700 Subject: config: refactor to send more specific messages to peer_server on change --- src/interface/config.rs | 62 +++++++++++++++++++++++++++----------------- src/interface/mod.rs | 4 +-- src/interface/peer_server.rs | 37 ++++++++++++++++---------- 3 files changed, 64 insertions(+), 39 deletions(-) (limited to 'src') diff --git a/src/interface/config.rs b/src/interface/config.rs index 488b5bb..882b6cb 100644 --- a/src/interface/config.rs +++ b/src/interface/config.rs @@ -3,26 +3,28 @@ // Dev notes: // * Configuration service should use channels to report updates it receives over its interface. +use std::net::SocketAddr; +use std::{cell::RefCell, iter::Iterator, rc::Rc, mem, str}; +use std::fs::{create_dir, remove_file}; +use std::path::{Path, PathBuf}; + use base64; use bytes::BytesMut; -use consts::MAX_PEERS_PER_DEVICE; use failure::{Error, err_msg}; -use futures::{Async, Future, Poll, Stream, Sink, future, stream, unsync::mpsc}; -use hex; +use futures::{Async, Future, Poll, Stream, Sink, future, unsync::mpsc}; +use hex::{self, FromHex}; +use tokio_core::reactor::Handle; +use tokio_io::{AsyncRead, codec::{Encoder, Decoder}}; +use tokio_uds::UnixListener; +use x25519_dalek as x25519; + +use consts::MAX_PEERS_PER_DEVICE; use interface::{SharedState, State}; use interface::grim_reaper::GrimReaper; +use interface::peer_server::ChannelMessage; use peer::Peer; -use std::net::SocketAddr; -use std::{cell::RefCell, iter::Iterator, rc::Rc, mem, str}; -use std::fs::{create_dir, remove_file}; -use std::path::{Path, PathBuf}; -use tokio_core::reactor::Handle; use types::PeerInfo; -use hex::FromHex; -use x25519_dalek as x25519; -use tokio_io::{AsyncRead, codec::{Encoder, Decoder}}; -use tokio_uds::UnixListener; #[derive(Debug)] pub enum Command { @@ -140,21 +142,20 @@ pub struct ConfigurationService { interface_name: String, config_server: Box>, reaper: Box>, - rx: mpsc::Receiver, + rx: mpsc::Receiver } impl ConfigurationService { pub fn new(interface_name: &str, state: &SharedState, handle: &Handle) -> Result { let config_path = Self::get_path(interface_name).unwrap(); let listener = UnixListener::bind(config_path.clone(), handle).unwrap(); - let (tx, rx) = mpsc::channel::(1024); + let (tx, rx) = mpsc::channel::(1024); // TODO only listen for own socket, verify behavior from `notify` crate let reaper = GrimReaper::spawn(handle, &config_path).unwrap(); let config_server = listener.incoming().for_each({ let handle = handle.clone(); - let tx = tx.clone(); let state = state.clone(); move |(stream, _)| { let (sink, stream) = stream.framed(ConfigurationCodec {}).split(); @@ -169,11 +170,12 @@ impl ConfigurationService { match command { Command::Set(_version, items) => { for item in &items { - if Self::handle_update(&mut state, item).is_err() { - return future::ok("errno=1\nerrno=1\n\n".into()); + match Self::handle_update(&mut state, item) { + Ok(Some(msg)) => { tx.clone().send(msg).wait().unwrap(); }, + Err(_) => { return future::ok("errno=1\nerrno=1\n\n".into()); }, + _ => {} } } - tx.clone().send_all(stream::iter_ok(items)).wait().unwrap(); future::ok("errno=0\nerrno=0\n\n".into()) }, Command::Get(_version) => { @@ -220,13 +222,14 @@ impl ConfigurationService { state.router.remove_allowed_ips(&peer.info.allowed_ips); } - pub fn handle_update(state: &mut State, event: &UpdateEvent) -> Result<(), Error> { + pub fn handle_update(state: &mut State, event: &UpdateEvent) -> Result, Error> { match *event { UpdateEvent::PrivateKey(private_key) => { if private_key == [0u8; 32] { state.interface_info.private_key = None; state.interface_info.pub_key = None; debug!("unset private key"); + Ok(Some(ChannelMessage::ClearPrivateKey)) } else { let pub_key = x25519::generate_public(&private_key); state.interface_info.private_key = Some(private_key); @@ -237,16 +240,19 @@ impl ConfigurationService { Self::clear_peer_refs(state, &peer_ref.borrow()); debug!("removed self from peers"); } + Ok(Some(ChannelMessage::NewPrivateKey)) } }, UpdateEvent::ListenPort(port) => { state.interface_info.listen_port = Some(port); debug!("set listen port: {}", port); + Ok(Some(ChannelMessage::NewListenPort(port))) // TODO: only notify on listen port *change* }, UpdateEvent::Fwmark(mark) => { state.interface_info.fwmark = Some(mark); debug!("set fwmark: {}", mark); - } + Ok(Some(ChannelMessage::NewFwmark(mark))) // TODO: only notify on fwmark *change* + }, UpdateEvent::UpdatePeer(ref info, replace_allowed_ips) => { let existing_peer = state.pubkey_map.get(&info.pub_key).cloned(); if let Some(peer_ref) = existing_peer { @@ -258,16 +264,22 @@ impl ConfigurationService { } else { info.allowed_ips.extend_from_slice(&peer.info.allowed_ips); } + let ret = if info.keepalive.is_some() && peer.info.keepalive != info.keepalive { + Some(ChannelMessage::NewPersistentKeepalive(info.keepalive.unwrap())) + } else { + None + }; info.endpoint = info.endpoint.or(peer.info.endpoint); info.keepalive = info.keepalive.or(peer.info.keepalive); info.psk = info.psk.or(peer.info.psk); state.router.add_allowed_ips(&info.allowed_ips, &peer_ref); peer.info = info; + Ok(ret) } else { if let Some(pub_key) = state.interface_info.pub_key { if pub_key == info.pub_key { debug!("ignoring self-peer add"); - return Ok(()) + return Ok(None) } } @@ -280,20 +292,22 @@ impl ConfigurationService { let peer_ref = Rc::new(RefCell::new(peer)); let _ = state.pubkey_map.insert(info.pub_key, peer_ref.clone()); state.router.add_allowed_ips(&info.allowed_ips, &peer_ref); - }; + Ok(None) // TODO: notify specifically on details of these new peers + } }, UpdateEvent::RemoveAllPeers => { state.pubkey_map.clear(); state.index_map.clear(); state.router.clear(); + Ok(None) }, UpdateEvent::RemovePeer(pub_key) => { let peer_ref = state.pubkey_map.remove(&pub_key) .ok_or_else(|| err_msg("trying to remove nonexistent peer"))?; Self::clear_peer_refs(state, &peer_ref.borrow()); + Ok(None) }, } - Ok(()) } pub fn get_path(interface_name: &str) -> Result { @@ -342,7 +356,7 @@ impl ConfigurationService { } impl Stream for ConfigurationService { - type Item = UpdateEvent; + type Item = ChannelMessage; type Error = Error; fn poll(&mut self) -> Poll, Self::Error> { diff --git a/src/interface/mod.rs b/src/interface/mod.rs index c670bbe..0d12048 100644 --- a/src/interface/mod.rs +++ b/src/interface/mod.rs @@ -99,12 +99,12 @@ impl Interface { let peer_server = PeerServer::new(core.handle(), self.state.clone(), utun_tx.clone())?; let config_server = ConfigurationService::new(&self.name, &self.state, &core.handle())?; - let config_server = config_server.forward(peer_server.config_tx()).map_err(|_|()); // TODO: don't just forward, this is so hacky. + let config_server = config_server.forward(peer_server.tx()).map_err(|_|()); // TODO: don't just forward, this is so hacky. let utun_stream = UtunStream::connect(&self.name, &core.handle())?.framed(VecUtunCodec{}); let (utun_writer, utun_reader) = utun_stream.split(); - let utun_read_fut = peer_server.tx() + let utun_read_fut = peer_server.tunnel_tx() .sink_map_err(|e| -> Error { e.into() }) .send_all(utun_reader.map_err(|e| -> Error { e.into() })) .map_err(|e| { warn!("utun read error: {:?}", e); () }); diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs index 9595deb..aa4f53e 100644 --- a/src/interface/peer_server.rs +++ b/src/interface/peer_server.rs @@ -1,7 +1,7 @@ use consts::{REKEY_TIMEOUT, REKEY_ATTEMPT_TIME, KEEPALIVE_TIMEOUT, STALE_SESSION_TIMEOUT, MAX_CONTENT_SIZE, WIPE_AFTER_TIME}; use cookie; -use interface::{SharedPeer, SharedState, State, UtunPacket, config}; +use interface::{SharedPeer, SharedState, State, UtunPacket}; use message::{Message, Initiation, Response, CookieReply, Transport}; use peer::{Peer, SessionType, SessionTransition}; use time::Timestamp; @@ -14,7 +14,15 @@ use rand::{self, Rng}; use udp::{Endpoint, UdpSocket, PeerServerMessage, UdpChannel}; use tokio_core::reactor::Handle; -use std::{collections::VecDeque, convert::TryInto, time::Duration}; +use std::{convert::TryInto, time::Duration}; + +pub enum ChannelMessage { + ClearPrivateKey, + NewPrivateKey, + NewListenPort(u16), + NewFwmark(u32), + NewPersistentKeepalive(u16), +} struct Channel { tx: mpsc::Sender, @@ -36,7 +44,7 @@ pub struct PeerServer { udp : Option, port : Option, outgoing : Channel, - config : Channel, + channel : Channel, timer : Timer, tunnel_tx : mpsc::Sender>, cookie : cookie::Validator, @@ -51,7 +59,7 @@ impl PeerServer { udp : None, port : None, outgoing : mpsc::channel(1024).into(), - config : mpsc::channel(1024).into(), + channel : mpsc::channel(1024).into(), cookie : cookie::Validator::new(&[0u8; 32]) }) } @@ -88,12 +96,12 @@ impl PeerServer { Ok(()) } - pub fn tx(&self) -> mpsc::Sender { + pub fn tunnel_tx(&self) -> mpsc::Sender { self.outgoing.tx.clone() } - pub fn config_tx(&self) -> mpsc::Sender { - self.config.tx.clone() + pub fn tx(&self) -> mpsc::Sender { + self.channel.tx.clone() } fn send_to_peer(&self, payload: PeerServerMessage) -> Result<(), Error> { @@ -376,6 +384,10 @@ impl PeerServer { PersistentKeepAlive(peer_ref, our_index) => { let mut peer = peer_ref.borrow_mut(); { + if peer.info.keepalive.is_none() { + bail!("no persistent keepalive set for peer."); + } + let (_, session_type) = peer.find_session(our_index).ok_or_else(|| err_msg("missing session for timer"))?; ensure!(session_type == SessionType::Current, "expired session for persistent keepalive timer"); } @@ -386,7 +398,6 @@ impl PeerServer { if let Some(keepalive) = peer.info.keepalive { self.timer.send_after(Duration::from_secs(u64::from(keepalive)), PersistentKeepAlive(peer_ref.clone(), our_index)); - } }, Wipe(peer_ref) => { @@ -413,11 +424,11 @@ impl Future for PeerServer { fn poll(&mut self) -> Poll { // Handle config events loop { - use self::config::UpdateEvent::*; - match self.config.rx.poll() { + use self::ChannelMessage::*; + match self.channel.rx.poll() { Ok(Async::Ready(Some(event))) => { match event { - PrivateKey(_) => { + NewPrivateKey => { let pub_key = self.shared_state.borrow().interface_info.pub_key; if let Some(ref pub_key) = pub_key { self.cookie = cookie::Validator::new(pub_key); @@ -429,8 +440,8 @@ impl Future for PeerServer { self.port = None; } }, - ListenPort(_) => self.rebind().unwrap(), - Fwmark(mark) => { + NewListenPort(_) => self.rebind().unwrap(), + NewFwmark(mark) => { if let Some(ref udp) = self.udp { udp.set_mark(mark).unwrap(); } -- cgit v1.2.3-59-g8ed1b