aboutsummaryrefslogtreecommitdiffstats
path: root/src/interface
diff options
context:
space:
mode:
authorJake McGinty <me@jake.su>2018-03-10 18:21:09 +0000
committerJake McGinty <me@jake.su>2018-03-21 17:47:05 -0600
commit92b9d7afb1ce683f5781dbc75a5b9f671781dc3c (patch)
treedcd1895e7a4c8e5c7797338eb873730483c7534e /src/interface
parentmeta: change crate name to 'wireguard' not 'wgrs' (diff)
downloadwireguard-rs-92b9d7afb1ce683f5781dbc75a5b9f671781dc3c.tar.xz
wireguard-rs-92b9d7afb1ce683f5781dbc75a5b9f671781dc3c.zip
config: migrate configuration logic to config.rs
Diffstat (limited to 'src/interface')
-rw-r--r--src/interface/config.rs188
-rw-r--r--src/interface/mod.rs163
2 files changed, 190 insertions, 161 deletions
diff --git a/src/interface/config.rs b/src/interface/config.rs
index a80d0a5..7c24725 100644
--- a/src/interface/config.rs
+++ b/src/interface/config.rs
@@ -3,17 +3,24 @@
// Dev notes:
// * Configuration service should use channels to report updates it receives over its interface.
+use base64;
use bytes::BytesMut;
use failure::{Error, err_msg};
+use futures::{Async, Future, Poll, Stream, Sink, future, stream, unsync::mpsc};
+use hex;
+use interface::SharedState;
+use interface::grim_reaper::GrimReaper;
+use peer::Peer;
+use std::{cell::RefCell, iter::Iterator, rc::Rc, mem, str};
use std::fs::{create_dir, remove_file};
-use std::mem;
-use std::iter::Iterator;
use std::path::{Path, PathBuf};
-use std::str;
+use tokio_core::reactor::Handle;
use types::PeerInfo;
use hex::FromHex;
+use x25519_dalek as x25519;
-use tokio_io::codec::{Encoder, Decoder};
+use tokio_io::{AsyncRead, codec::{Encoder, Decoder}};
+use tokio_uds::UnixListener;
#[derive(Debug)]
pub enum Command {
@@ -127,18 +134,145 @@ impl Encoder for ConfigurationCodec {
}
}
-pub struct ConfigurationServiceManager {
+pub struct ConfigurationService {
interface_name: String,
+ shared_state: SharedState,
+ config_server: Box<Future<Item = (), Error = ()>>,
+ reaper: Box<Future<Item = (), Error = ()>>,
+ rx: mpsc::Receiver<UpdateEvent>,
}
-impl ConfigurationServiceManager {
- pub fn new(interface_name: &str) -> Self {
- ConfigurationServiceManager {
- interface_name: interface_name.into(),
+impl ConfigurationService {
+ pub fn new(interface_name: &str, state: &SharedState, handle: &Handle) -> Result<Self, Error> {
+ let config_path = Self::get_path(interface_name).unwrap();
+ let listener = UnixListener::bind(config_path.clone(), handle).unwrap();
+ let (tx, rx) = mpsc::channel::<UpdateEvent>(1024);
+
+ // TODO only listen for own socket, verify behavior from `notify` crate
+ let reaper = GrimReaper::spawn(handle, config_path.parent().unwrap()).unwrap();
+
+ let config_server = listener.incoming().for_each({
+ let handle = handle.clone();
+ let tx = tx.clone();
+ let state = state.clone();
+ move |(stream, _)| {
+ let (sink, stream) = stream.framed(ConfigurationCodec {}).split();
+ trace!("UnixServer connection.");
+
+ let handle = handle.clone();
+ let responses = stream.and_then({
+ let tx = tx.clone();
+ let state = state.clone();
+ move |command| {
+ let state = state.borrow();
+ match command {
+ Command::Set(_version, items) => {
+ tx.clone().send_all(stream::iter_ok(items)).wait().unwrap();
+ future::ok("errno=0\nerrno=0\n\n".to_string())
+ },
+ Command::Get(_version) => {
+ let info = &state.interface_info;
+ let peers = &state.pubkey_map;
+ let mut s = String::new();
+ if let Some(private_key) = info.private_key {
+ s.push_str(&format!("private_key={}\n", hex::encode(private_key)));
+ }
+ if let Some(port) = info.listen_port {
+ s.push_str(&format!("listen_port={}\n", port));
+ }
+ for (_, peer) in peers.iter() {
+ s.push_str(&peer.borrow().to_config_string());
+ }
+ future::ok(format!("{}errno=0\n\n", s))
+ }
+ }
+ }
+ });
+
+ let fut = sink.send_all(responses)
+ .map(|_| ())
+ .map_err(|_| ());
+
+ handle.spawn(fut);
+
+ Ok(())
+ }
+ }).map_err(|_| ());
+
+ Ok(ConfigurationService {
+ interface_name: interface_name.to_owned(),
+ config_server: Box::new(config_server),
+ reaper: Box::new(reaper),
+ shared_state: state.clone(),
+ rx
+ })
+ }
+
+ pub fn handle_update(&self, event: &UpdateEvent) -> Result<(), Error> {
+ let mut state = self.shared_state.borrow_mut();
+ match *event {
+ UpdateEvent::PrivateKey(private_key) => {
+ let pub_key = x25519::generate_public(&private_key);
+ info!("set pubkey: {}", base64::encode(pub_key.as_bytes()));
+ state.interface_info.private_key = Some(private_key);
+ state.interface_info.pub_key = Some(*pub_key.as_bytes());
+ debug!("set new private key.");
+ },
+ UpdateEvent::ListenPort(port) => {
+ state.interface_info.listen_port = Some(port);
+ info!("set listen port: {}", port);
+ },
+ UpdateEvent::Fwmark(mark) => {
+ state.interface_info.fwmark = Some(mark);
+ info!("set fwmark: {}", mark);
+ }
+ UpdateEvent::UpdatePeer(ref info, replace_allowed_ips) => {
+ let existing_peer = state.pubkey_map.get(&info.pub_key).cloned();
+ if let Some(peer_ref) = existing_peer {
+ info!("updating peer: {}", info);
+ let mut peer = peer_ref.borrow_mut();
+ let mut info = info.clone();
+ if replace_allowed_ips {
+ state.router.remove_allowed_ips(&peer.info.allowed_ips);
+ } else {
+ info.allowed_ips.extend_from_slice(&peer.info.allowed_ips);
+ }
+ info.endpoint = info.endpoint.or(peer.info.endpoint);
+ info.keepalive = info.keepalive.or(peer.info.keepalive);
+ state.router.add_allowed_ips(&info.allowed_ips, &peer_ref);
+ peer.info = info;
+ } else {
+ info!("adding new peer: {}", info);
+ let mut peer = Peer::new(info.clone());
+ let peer_ref = Rc::new(RefCell::new(peer));
+ let _ = state.pubkey_map.insert(info.pub_key, peer_ref.clone());
+ state.router.add_allowed_ips(&info.allowed_ips, &peer_ref);
+ };
+
+ },
+ UpdateEvent::RemoveAllPeers => {
+ state.pubkey_map.clear();
+ state.index_map.clear();
+ state.router.clear();
+ },
+ UpdateEvent::RemovePeer(pub_key) => {
+ if let Some(peer_ref) = state.pubkey_map.remove(&pub_key) {
+ let peer = peer_ref.borrow();
+ let indices = peer.get_mapped_indices();
+
+ for index in indices {
+ let _ = state.index_map.remove(&index);
+ }
+ state.router.remove_allowed_ips(&peer.info.allowed_ips);
+ } else {
+ info!("RemovePeer request for nonexistent peer.");
+ }
+ },
}
+ Ok(())
}
- pub fn get_path(&self) -> Result<PathBuf, Error> {
+ pub fn get_path(interface_name: &str) -> Result<PathBuf, Error> {
let mut socket_path = Self::get_run_path().join("wireguard");
if !socket_path.exists() {
@@ -150,7 +284,7 @@ impl ConfigurationServiceManager {
Self::chmod(&socket_path, 0o700)?;
// Finish the socket path
- socket_path.push(&self.interface_name);
+ socket_path.push(interface_name);
socket_path.set_extension("sock");
if socket_path.exists() {
debug!("Removing existing socket: {}", socket_path.display());
@@ -183,7 +317,37 @@ impl ConfigurationServiceManager {
}
}
-impl Drop for ConfigurationServiceManager {
+impl Stream for ConfigurationService {
+ type Item = UpdateEvent;
+ type Error = Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ match self.config_server.poll() {
+ Ok(Async::NotReady) => {},
+ _ => return Err(err_msg("config_server broken")),
+
+ }
+
+ match self.reaper.poll() {
+ Ok(Async::NotReady) => {},
+ _ => {
+ debug!("reaper triggered, closing ConfigurationService stream.");
+ return Ok(Async::Ready(None))
+ },
+ }
+
+ match self.rx.poll() {
+ Ok(Async::Ready(Some(packet))) => {
+ let _ = self.handle_update(&packet).map_err(|e| warn!("UDP ERR: {:?}", e));
+ Ok(Async::Ready(Some(packet)))
+ },
+ Ok(Async::Ready(None)) | Err(_) => Err(err_msg("err in config rx channel")),
+ Ok(Async::NotReady) => Ok(Async::NotReady)
+ }
+ }
+}
+
+impl Drop for ConfigurationService {
fn drop(&mut self) {
let mut socket_path = Self::get_run_path().join("wireguard");
socket_path.push(&self.interface_name);
diff --git a/src/interface/mod.rs b/src/interface/mod.rs
index e8c799c..110706a 100644
--- a/src/interface/mod.rs
+++ b/src/interface/mod.rs
@@ -2,30 +2,23 @@ mod config;
mod grim_reaper;
pub mod peer_server;
-use self::config::{ConfigurationServiceManager, UpdateEvent, Command, ConfigurationCodec};
-use self::grim_reaper::GrimReaper;
+use self::config::ConfigurationService;
use self::peer_server::PeerServer;
use router::Router;
-use base64;
-use hex;
use failure::{Error, err_msg};
use peer::Peer;
use std::io;
use std::rc::Rc;
use std::cell::RefCell;
use std::collections::HashMap;
-use std::fs::remove_file;
use types::{InterfaceInfo};
-use x25519_dalek as x25519;
use pnet_packet::ipv4::Ipv4Packet;
-use futures::{Future, Stream, Sink, future, unsync, stream};
+use futures::{Future, Stream, Sink, unsync};
use tokio_core::reactor::Core;
use tokio_utun::{UtunStream, UtunCodec};
-use tokio_io::{AsyncRead};
-use tokio_uds::{UnixListener};
pub fn trace_packet(header: &str, packet: &[u8]) {
@@ -99,162 +92,34 @@ impl Interface {
}
}
- pub fn start(&mut self) {
- let mut core = Core::new().unwrap();
+ pub fn start(&mut self) -> Result<(), Error> {
+ let mut core = Core::new()?;
let (utun_tx, utun_rx) = unsync::mpsc::channel::<Vec<u8>>(1024);
- let peer_server = PeerServer::new(core.handle(), self.state.clone(), utun_tx.clone()).unwrap();
+ let peer_server = PeerServer::new(core.handle(), self.state.clone(), utun_tx.clone())?;
+ let config_server = ConfigurationService::new(&self.name, &self.state, &core.handle())?;
+ let config_server = config_server.forward(peer_server.config_tx()).map_err(|_|());
+ let utun_stream = UtunStream::connect(&self.name, &core.handle())?.framed(VecUtunCodec{});
- let utun_stream = UtunStream::connect(&self.name, &core.handle()).unwrap().framed(VecUtunCodec{});
let (utun_writer, utun_reader) = utun_stream.split();
+
let utun_read_fut = peer_server.tx()
.sink_map_err(|e| -> Error { e.into() })
.send_all(utun_reader.map_err(|e| -> Error { e.into() }))
.map_err(|e| { warn!("utun read error: {:?}", e); () });
+
let utun_write_fut = utun_writer
.sink_map_err(|e| -> Error { e.into() })
.send_all(utun_rx.map_err(|()| -> Error { err_msg("utun rx failure") }))
.map_err(|e| { warn!("utun write error: {:?}", e); () });
- let utun_fut = utun_write_fut.join(utun_read_fut);
-
- let config_manager = ConfigurationServiceManager::new(&self.name);
- let handle = core.handle();
- let config_path = config_manager.get_path().unwrap();
- let listener = UnixListener::bind(config_path.clone(), &handle).unwrap();
- let reaper = GrimReaper::spawn(&handle, config_path.parent().unwrap()).unwrap();
- let (config_tx, config_rx) = unsync::mpsc::channel::<UpdateEvent>(1024);
- let h = handle.clone();
-
- let config_server = listener.incoming().for_each({
- let config_tx = config_tx.clone();
- let state = self.state.clone();
- move |(stream, _)| {
- let (sink, stream) = stream.framed(ConfigurationCodec {}).split();
- trace!("UnixServer connection.");
-
- let handle = h.clone();
- let responses = stream.and_then({
- let config_tx = config_tx.clone();
- let state = state.clone();
- move |command| {
- let state = state.borrow();
- match command {
- Command::Set(_version, items) => {
- config_tx.clone().send_all(stream::iter_ok(items)).wait().unwrap();
- future::ok("errno=0\nerrno=0\n\n".to_string())
- },
- Command::Get(_version) => {
- let info = &state.interface_info;
- let peers = &state.pubkey_map;
- let mut s = String::new();
- if let Some(private_key) = info.private_key {
- s.push_str(&format!("private_key={}\n", hex::encode(private_key)));
- }
- if let Some(port) = info.listen_port {
- s.push_str(&format!("listen_port={}\n", port));
- }
-
- for (_, peer) in peers.iter() {
- s.push_str(&peer.borrow().to_config_string());
- }
- future::ok(format!("{}errno=0\n\n", s))
- }
- }
- }
- });
-
- let fut = sink.send_all(responses).map(|_| ()).map_err(|_| ());
-
- handle.spawn(fut);
- Ok(())
- }
- }).map_err(|_| ());
+ let utun_futs = utun_write_fut.join(utun_read_fut);
- let config_fut = config_rx.and_then({
- let state = self.state.clone();
- move |event| {
- let mut state = state.borrow_mut();
- match event {
- UpdateEvent::PrivateKey(private_key) => {
- let pub_key = x25519::generate_public(&private_key);
- info!("set pubkey: {}", base64::encode(pub_key.as_bytes()));
- state.interface_info.private_key = Some(private_key);
- state.interface_info.pub_key = Some(*pub_key.as_bytes());
- debug!("set new private key.");
- },
- UpdateEvent::ListenPort(port) => {
- state.interface_info.listen_port = Some(port);
- info!("set listen port: {}", port);
- },
- UpdateEvent::Fwmark(mark) => {
- state.interface_info.fwmark = Some(mark);
- info!("set fwmark: {}", mark);
- }
- UpdateEvent::UpdatePeer(ref info, replace_allowed_ips) => {
- let existing_peer = state.pubkey_map.get(&info.pub_key).cloned();
- if let Some(peer_ref) = existing_peer {
- info!("updating peer: {}", info);
- let mut peer = peer_ref.borrow_mut();
- let mut info = info.clone();
- if replace_allowed_ips {
- state.router.remove_allowed_ips(&peer.info.allowed_ips);
- } else {
- info.allowed_ips.extend_from_slice(&peer.info.allowed_ips);
- }
- info.endpoint = info.endpoint.or(peer.info.endpoint);
- info.keepalive = info.keepalive.or(peer.info.keepalive);
- state.router.add_allowed_ips(&info.allowed_ips, &peer_ref);
- peer.info = info;
- } else {
- info!("adding new peer: {}", info);
- let mut peer = Peer::new(info.clone());
- let peer_ref = Rc::new(RefCell::new(peer));
- let _ = state.pubkey_map.insert(info.pub_key, peer_ref.clone());
- state.router.add_allowed_ips(&info.allowed_ips, &peer_ref);
- };
-
- },
- UpdateEvent::RemoveAllPeers => {
- state.pubkey_map.clear();
- state.index_map.clear();
- state.router.clear();
- },
- UpdateEvent::RemovePeer(pub_key) => {
- if let Some(peer_ref) = state.pubkey_map.remove(&pub_key) {
- let peer = peer_ref.borrow();
- let indices = peer.get_mapped_indices();
-
- for index in indices {
- let _ = state.index_map.remove(&index);
- }
- state.router.remove_allowed_ips(&peer.info.allowed_ips);
- } else {
- info!("RemovePeer request for nonexistent peer.");
- }
- },
- }
- future::ok(event)
- }
- }).map_err(|e| { warn!("error {:?}", e); () });
-
- let config_fut = peer_server.config_tx().sink_map_err(|_|()).send_all(config_fut).map_err(|e| { warn!("error {:?}", e); () });
-
- let fut = reaper.join(peer_server.join(utun_fut.join(config_fut.join(config_server))));
+ let fut = peer_server.join(config_server.join(utun_futs));
let _ = core.run(fut);
+
info!("reactor finished.");
+ Ok(())
}
}
-
-impl Drop for Interface {
- fn drop(&mut self) {
- let mut socket_path = ConfigurationServiceManager::get_run_path().join("wireguard");
- socket_path.push(&self.name);
- socket_path.set_extension("sock");
- if socket_path.exists() {
- info!("Removing socket on drop: {}", socket_path.display());
- let _ = remove_file(&socket_path);
- }
- }
-} \ No newline at end of file