diff options
author | Jake McGinty <me@jake.su> | 2018-03-10 18:21:09 +0000 |
---|---|---|
committer | Jake McGinty <me@jake.su> | 2018-03-21 17:47:05 -0600 |
commit | 92b9d7afb1ce683f5781dbc75a5b9f671781dc3c (patch) | |
tree | dcd1895e7a4c8e5c7797338eb873730483c7534e /src/interface | |
parent | meta: change crate name to 'wireguard' not 'wgrs' (diff) | |
download | wireguard-rs-92b9d7afb1ce683f5781dbc75a5b9f671781dc3c.tar.xz wireguard-rs-92b9d7afb1ce683f5781dbc75a5b9f671781dc3c.zip |
config: migrate configuration logic to config.rs
Diffstat (limited to 'src/interface')
-rw-r--r-- | src/interface/config.rs | 188 | ||||
-rw-r--r-- | src/interface/mod.rs | 163 |
2 files changed, 190 insertions, 161 deletions
diff --git a/src/interface/config.rs b/src/interface/config.rs index a80d0a5..7c24725 100644 --- a/src/interface/config.rs +++ b/src/interface/config.rs @@ -3,17 +3,24 @@ // Dev notes: // * Configuration service should use channels to report updates it receives over its interface. +use base64; use bytes::BytesMut; use failure::{Error, err_msg}; +use futures::{Async, Future, Poll, Stream, Sink, future, stream, unsync::mpsc}; +use hex; +use interface::SharedState; +use interface::grim_reaper::GrimReaper; +use peer::Peer; +use std::{cell::RefCell, iter::Iterator, rc::Rc, mem, str}; use std::fs::{create_dir, remove_file}; -use std::mem; -use std::iter::Iterator; use std::path::{Path, PathBuf}; -use std::str; +use tokio_core::reactor::Handle; use types::PeerInfo; use hex::FromHex; +use x25519_dalek as x25519; -use tokio_io::codec::{Encoder, Decoder}; +use tokio_io::{AsyncRead, codec::{Encoder, Decoder}}; +use tokio_uds::UnixListener; #[derive(Debug)] pub enum Command { @@ -127,18 +134,145 @@ impl Encoder for ConfigurationCodec { } } -pub struct ConfigurationServiceManager { +pub struct ConfigurationService { interface_name: String, + shared_state: SharedState, + config_server: Box<Future<Item = (), Error = ()>>, + reaper: Box<Future<Item = (), Error = ()>>, + rx: mpsc::Receiver<UpdateEvent>, } -impl ConfigurationServiceManager { - pub fn new(interface_name: &str) -> Self { - ConfigurationServiceManager { - interface_name: interface_name.into(), +impl ConfigurationService { + pub fn new(interface_name: &str, state: &SharedState, handle: &Handle) -> Result<Self, Error> { + let config_path = Self::get_path(interface_name).unwrap(); + let listener = UnixListener::bind(config_path.clone(), handle).unwrap(); + let (tx, rx) = mpsc::channel::<UpdateEvent>(1024); + + // TODO only listen for own socket, verify behavior from `notify` crate + let reaper = GrimReaper::spawn(handle, config_path.parent().unwrap()).unwrap(); + + let config_server = listener.incoming().for_each({ + let handle = handle.clone(); + let tx = tx.clone(); + let state = state.clone(); + move |(stream, _)| { + let (sink, stream) = stream.framed(ConfigurationCodec {}).split(); + trace!("UnixServer connection."); + + let handle = handle.clone(); + let responses = stream.and_then({ + let tx = tx.clone(); + let state = state.clone(); + move |command| { + let state = state.borrow(); + match command { + Command::Set(_version, items) => { + tx.clone().send_all(stream::iter_ok(items)).wait().unwrap(); + future::ok("errno=0\nerrno=0\n\n".to_string()) + }, + Command::Get(_version) => { + let info = &state.interface_info; + let peers = &state.pubkey_map; + let mut s = String::new(); + if let Some(private_key) = info.private_key { + s.push_str(&format!("private_key={}\n", hex::encode(private_key))); + } + if let Some(port) = info.listen_port { + s.push_str(&format!("listen_port={}\n", port)); + } + for (_, peer) in peers.iter() { + s.push_str(&peer.borrow().to_config_string()); + } + future::ok(format!("{}errno=0\n\n", s)) + } + } + } + }); + + let fut = sink.send_all(responses) + .map(|_| ()) + .map_err(|_| ()); + + handle.spawn(fut); + + Ok(()) + } + }).map_err(|_| ()); + + Ok(ConfigurationService { + interface_name: interface_name.to_owned(), + config_server: Box::new(config_server), + reaper: Box::new(reaper), + shared_state: state.clone(), + rx + }) + } + + pub fn handle_update(&self, event: &UpdateEvent) -> Result<(), Error> { + let mut state = self.shared_state.borrow_mut(); + match *event { + UpdateEvent::PrivateKey(private_key) => { + let pub_key = x25519::generate_public(&private_key); + info!("set pubkey: {}", base64::encode(pub_key.as_bytes())); + state.interface_info.private_key = Some(private_key); + state.interface_info.pub_key = Some(*pub_key.as_bytes()); + debug!("set new private key."); + }, + UpdateEvent::ListenPort(port) => { + state.interface_info.listen_port = Some(port); + info!("set listen port: {}", port); + }, + UpdateEvent::Fwmark(mark) => { + state.interface_info.fwmark = Some(mark); + info!("set fwmark: {}", mark); + } + UpdateEvent::UpdatePeer(ref info, replace_allowed_ips) => { + let existing_peer = state.pubkey_map.get(&info.pub_key).cloned(); + if let Some(peer_ref) = existing_peer { + info!("updating peer: {}", info); + let mut peer = peer_ref.borrow_mut(); + let mut info = info.clone(); + if replace_allowed_ips { + state.router.remove_allowed_ips(&peer.info.allowed_ips); + } else { + info.allowed_ips.extend_from_slice(&peer.info.allowed_ips); + } + info.endpoint = info.endpoint.or(peer.info.endpoint); + info.keepalive = info.keepalive.or(peer.info.keepalive); + state.router.add_allowed_ips(&info.allowed_ips, &peer_ref); + peer.info = info; + } else { + info!("adding new peer: {}", info); + let mut peer = Peer::new(info.clone()); + let peer_ref = Rc::new(RefCell::new(peer)); + let _ = state.pubkey_map.insert(info.pub_key, peer_ref.clone()); + state.router.add_allowed_ips(&info.allowed_ips, &peer_ref); + }; + + }, + UpdateEvent::RemoveAllPeers => { + state.pubkey_map.clear(); + state.index_map.clear(); + state.router.clear(); + }, + UpdateEvent::RemovePeer(pub_key) => { + if let Some(peer_ref) = state.pubkey_map.remove(&pub_key) { + let peer = peer_ref.borrow(); + let indices = peer.get_mapped_indices(); + + for index in indices { + let _ = state.index_map.remove(&index); + } + state.router.remove_allowed_ips(&peer.info.allowed_ips); + } else { + info!("RemovePeer request for nonexistent peer."); + } + }, } + Ok(()) } - pub fn get_path(&self) -> Result<PathBuf, Error> { + pub fn get_path(interface_name: &str) -> Result<PathBuf, Error> { let mut socket_path = Self::get_run_path().join("wireguard"); if !socket_path.exists() { @@ -150,7 +284,7 @@ impl ConfigurationServiceManager { Self::chmod(&socket_path, 0o700)?; // Finish the socket path - socket_path.push(&self.interface_name); + socket_path.push(interface_name); socket_path.set_extension("sock"); if socket_path.exists() { debug!("Removing existing socket: {}", socket_path.display()); @@ -183,7 +317,37 @@ impl ConfigurationServiceManager { } } -impl Drop for ConfigurationServiceManager { +impl Stream for ConfigurationService { + type Item = UpdateEvent; + type Error = Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + match self.config_server.poll() { + Ok(Async::NotReady) => {}, + _ => return Err(err_msg("config_server broken")), + + } + + match self.reaper.poll() { + Ok(Async::NotReady) => {}, + _ => { + debug!("reaper triggered, closing ConfigurationService stream."); + return Ok(Async::Ready(None)) + }, + } + + match self.rx.poll() { + Ok(Async::Ready(Some(packet))) => { + let _ = self.handle_update(&packet).map_err(|e| warn!("UDP ERR: {:?}", e)); + Ok(Async::Ready(Some(packet))) + }, + Ok(Async::Ready(None)) | Err(_) => Err(err_msg("err in config rx channel")), + Ok(Async::NotReady) => Ok(Async::NotReady) + } + } +} + +impl Drop for ConfigurationService { fn drop(&mut self) { let mut socket_path = Self::get_run_path().join("wireguard"); socket_path.push(&self.interface_name); diff --git a/src/interface/mod.rs b/src/interface/mod.rs index e8c799c..110706a 100644 --- a/src/interface/mod.rs +++ b/src/interface/mod.rs @@ -2,30 +2,23 @@ mod config; mod grim_reaper; pub mod peer_server; -use self::config::{ConfigurationServiceManager, UpdateEvent, Command, ConfigurationCodec}; -use self::grim_reaper::GrimReaper; +use self::config::ConfigurationService; use self::peer_server::PeerServer; use router::Router; -use base64; -use hex; use failure::{Error, err_msg}; use peer::Peer; use std::io; use std::rc::Rc; use std::cell::RefCell; use std::collections::HashMap; -use std::fs::remove_file; use types::{InterfaceInfo}; -use x25519_dalek as x25519; use pnet_packet::ipv4::Ipv4Packet; -use futures::{Future, Stream, Sink, future, unsync, stream}; +use futures::{Future, Stream, Sink, unsync}; use tokio_core::reactor::Core; use tokio_utun::{UtunStream, UtunCodec}; -use tokio_io::{AsyncRead}; -use tokio_uds::{UnixListener}; pub fn trace_packet(header: &str, packet: &[u8]) { @@ -99,162 +92,34 @@ impl Interface { } } - pub fn start(&mut self) { - let mut core = Core::new().unwrap(); + pub fn start(&mut self) -> Result<(), Error> { + let mut core = Core::new()?; let (utun_tx, utun_rx) = unsync::mpsc::channel::<Vec<u8>>(1024); - let peer_server = PeerServer::new(core.handle(), self.state.clone(), utun_tx.clone()).unwrap(); + let peer_server = PeerServer::new(core.handle(), self.state.clone(), utun_tx.clone())?; + let config_server = ConfigurationService::new(&self.name, &self.state, &core.handle())?; + let config_server = config_server.forward(peer_server.config_tx()).map_err(|_|()); + let utun_stream = UtunStream::connect(&self.name, &core.handle())?.framed(VecUtunCodec{}); - let utun_stream = UtunStream::connect(&self.name, &core.handle()).unwrap().framed(VecUtunCodec{}); let (utun_writer, utun_reader) = utun_stream.split(); + let utun_read_fut = peer_server.tx() .sink_map_err(|e| -> Error { e.into() }) .send_all(utun_reader.map_err(|e| -> Error { e.into() })) .map_err(|e| { warn!("utun read error: {:?}", e); () }); + let utun_write_fut = utun_writer .sink_map_err(|e| -> Error { e.into() }) .send_all(utun_rx.map_err(|()| -> Error { err_msg("utun rx failure") })) .map_err(|e| { warn!("utun write error: {:?}", e); () }); - let utun_fut = utun_write_fut.join(utun_read_fut); - - let config_manager = ConfigurationServiceManager::new(&self.name); - let handle = core.handle(); - let config_path = config_manager.get_path().unwrap(); - let listener = UnixListener::bind(config_path.clone(), &handle).unwrap(); - let reaper = GrimReaper::spawn(&handle, config_path.parent().unwrap()).unwrap(); - let (config_tx, config_rx) = unsync::mpsc::channel::<UpdateEvent>(1024); - let h = handle.clone(); - - let config_server = listener.incoming().for_each({ - let config_tx = config_tx.clone(); - let state = self.state.clone(); - move |(stream, _)| { - let (sink, stream) = stream.framed(ConfigurationCodec {}).split(); - trace!("UnixServer connection."); - - let handle = h.clone(); - let responses = stream.and_then({ - let config_tx = config_tx.clone(); - let state = state.clone(); - move |command| { - let state = state.borrow(); - match command { - Command::Set(_version, items) => { - config_tx.clone().send_all(stream::iter_ok(items)).wait().unwrap(); - future::ok("errno=0\nerrno=0\n\n".to_string()) - }, - Command::Get(_version) => { - let info = &state.interface_info; - let peers = &state.pubkey_map; - let mut s = String::new(); - if let Some(private_key) = info.private_key { - s.push_str(&format!("private_key={}\n", hex::encode(private_key))); - } - if let Some(port) = info.listen_port { - s.push_str(&format!("listen_port={}\n", port)); - } - - for (_, peer) in peers.iter() { - s.push_str(&peer.borrow().to_config_string()); - } - future::ok(format!("{}errno=0\n\n", s)) - } - } - } - }); - - let fut = sink.send_all(responses).map(|_| ()).map_err(|_| ()); - - handle.spawn(fut); - Ok(()) - } - }).map_err(|_| ()); + let utun_futs = utun_write_fut.join(utun_read_fut); - let config_fut = config_rx.and_then({ - let state = self.state.clone(); - move |event| { - let mut state = state.borrow_mut(); - match event { - UpdateEvent::PrivateKey(private_key) => { - let pub_key = x25519::generate_public(&private_key); - info!("set pubkey: {}", base64::encode(pub_key.as_bytes())); - state.interface_info.private_key = Some(private_key); - state.interface_info.pub_key = Some(*pub_key.as_bytes()); - debug!("set new private key."); - }, - UpdateEvent::ListenPort(port) => { - state.interface_info.listen_port = Some(port); - info!("set listen port: {}", port); - }, - UpdateEvent::Fwmark(mark) => { - state.interface_info.fwmark = Some(mark); - info!("set fwmark: {}", mark); - } - UpdateEvent::UpdatePeer(ref info, replace_allowed_ips) => { - let existing_peer = state.pubkey_map.get(&info.pub_key).cloned(); - if let Some(peer_ref) = existing_peer { - info!("updating peer: {}", info); - let mut peer = peer_ref.borrow_mut(); - let mut info = info.clone(); - if replace_allowed_ips { - state.router.remove_allowed_ips(&peer.info.allowed_ips); - } else { - info.allowed_ips.extend_from_slice(&peer.info.allowed_ips); - } - info.endpoint = info.endpoint.or(peer.info.endpoint); - info.keepalive = info.keepalive.or(peer.info.keepalive); - state.router.add_allowed_ips(&info.allowed_ips, &peer_ref); - peer.info = info; - } else { - info!("adding new peer: {}", info); - let mut peer = Peer::new(info.clone()); - let peer_ref = Rc::new(RefCell::new(peer)); - let _ = state.pubkey_map.insert(info.pub_key, peer_ref.clone()); - state.router.add_allowed_ips(&info.allowed_ips, &peer_ref); - }; - - }, - UpdateEvent::RemoveAllPeers => { - state.pubkey_map.clear(); - state.index_map.clear(); - state.router.clear(); - }, - UpdateEvent::RemovePeer(pub_key) => { - if let Some(peer_ref) = state.pubkey_map.remove(&pub_key) { - let peer = peer_ref.borrow(); - let indices = peer.get_mapped_indices(); - - for index in indices { - let _ = state.index_map.remove(&index); - } - state.router.remove_allowed_ips(&peer.info.allowed_ips); - } else { - info!("RemovePeer request for nonexistent peer."); - } - }, - } - future::ok(event) - } - }).map_err(|e| { warn!("error {:?}", e); () }); - - let config_fut = peer_server.config_tx().sink_map_err(|_|()).send_all(config_fut).map_err(|e| { warn!("error {:?}", e); () }); - - let fut = reaper.join(peer_server.join(utun_fut.join(config_fut.join(config_server)))); + let fut = peer_server.join(config_server.join(utun_futs)); let _ = core.run(fut); + info!("reactor finished."); + Ok(()) } } - -impl Drop for Interface { - fn drop(&mut self) { - let mut socket_path = ConfigurationServiceManager::get_run_path().join("wireguard"); - socket_path.push(&self.name); - socket_path.set_extension("sock"); - if socket_path.exists() { - info!("Removing socket on drop: {}", socket_path.display()); - let _ = remove_file(&socket_path); - } - } -}
\ No newline at end of file |