aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorJake McGinty <me@jake.su>2017-12-30 14:43:23 -0800
committerJake McGinty <me@jake.su>2017-12-30 14:43:23 -0800
commit4f30a52e5fcb52679afa2c70dcf9cccd5a12afe3 (patch)
tree5b333eab87ecf2842a2c56b885b7ed3245d801a3 /src
parentfallback to previous noise hack (diff)
downloadwireguard-rs-4f30a52e5fcb52679afa2c70dcf9cccd5a12afe3.tar.xz
wireguard-rs-4f30a52e5fcb52679afa2c70dcf9cccd5a12afe3.zip
rudimentary timers!
Diffstat (limited to 'src')
-rw-r--r--src/consts.rs2
-rw-r--r--src/interface/peer_server.rs120
-rw-r--r--src/main.rs1
3 files changed, 90 insertions, 33 deletions
diff --git a/src/consts.rs b/src/consts.rs
new file mode 100644
index 0000000..ba9d0d0
--- /dev/null
+++ b/src/consts.rs
@@ -0,0 +1,2 @@
+pub const REKEY_AFTER_TIME: u64 = 120;
+pub const KEEPALIVE_TIMEOUT: u64 = 10;
diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs
index e1af433..b698f3c 100644
--- a/src/interface/peer_server.rs
+++ b/src/interface/peer_server.rs
@@ -1,4 +1,5 @@
use super::{SharedState, SharedPeer, debug_packet};
+use consts::{REKEY_AFTER_TIME, KEEPALIVE_TIMEOUT};
use std::io;
use std::net::SocketAddr;
@@ -30,23 +31,33 @@ impl UdpCodec for VecUdpCodec {
}
}
+#[derive(Debug)]
+pub enum TimerMessage {
+ KeepAlive(SharedPeer),
+ Rekey(SharedPeer),
+}
pub struct PeerServer {
handle: Handle,
shared_state: SharedState,
+ timer: Timer,
udp_stream: stream::SplitStream<UdpFramed<VecUdpCodec>>,
- rx: unsync::mpsc::Receiver<Vec<u8>>,
+ outgoing_tx: unsync::mpsc::Sender<Vec<u8>>,
+ outgoing_rx: unsync::mpsc::Receiver<Vec<u8>>,
+ timer_tx: unsync::mpsc::Sender<TimerMessage>,
+ timer_rx: unsync::mpsc::Receiver<TimerMessage>,
udp_tx: unsync::mpsc::Sender<(SocketAddr, Vec<u8>)>,
tunnel_tx: unsync::mpsc::Sender<Vec<u8>>,
- pub tx: unsync::mpsc::Sender<Vec<u8>>,
}
impl PeerServer {
pub fn bind(handle: Handle, shared_state: SharedState, tunnel_tx: unsync::mpsc::Sender<Vec<u8>>) -> Self {
let socket = UdpSocket::bind(&([0,0,0,0], 0).into(), &handle.clone()).unwrap();
let (udp_sink, udp_stream) = socket.framed(VecUdpCodec{}).split();
+ let (timer_tx, timer_rx) = unsync::mpsc::channel::<TimerMessage>(1024);
let (udp_tx, udp_rx) = unsync::mpsc::channel::<(SocketAddr, Vec<u8>)>(1024);
- let (tx, rx) = unsync::mpsc::channel::<Vec<u8>>(1024);
+ let (outgoing_tx, outgoing_rx) = unsync::mpsc::channel::<Vec<u8>>(1024);
+ let timer = Timer::default();
let udp_write_passthrough = udp_sink.sink_map_err(|_| ()).send_all(
udp_rx.map(|(addr, packet)| {
@@ -57,12 +68,12 @@ impl PeerServer {
handle.spawn(udp_write_passthrough);
PeerServer {
- handle, shared_state, udp_stream, udp_tx, tunnel_tx, tx, rx
+ handle, shared_state, timer, udp_stream, udp_tx, tunnel_tx, timer_tx, timer_rx, outgoing_tx, outgoing_rx
}
}
pub fn tx(&self) -> unsync::mpsc::Sender<Vec<u8>> {
- self.tx.clone()
+ self.outgoing_tx.clone()
}
pub fn udp_tx(&self) -> unsync::mpsc::Sender<(SocketAddr, Vec<u8>)> {
@@ -71,7 +82,7 @@ impl PeerServer {
fn handle_incoming_packet(&mut self, addr: SocketAddr, packet: Vec<u8>) {
debug!("got a UDP packet of length {}, packet type {}", packet.len(), packet[0]);
- let mut state = self.shared_state.borrow_mut();
+ let state = self.shared_state.borrow_mut();
match packet[0] {
1 => {
info!("got handshake initialization.");
@@ -88,34 +99,28 @@ impl PeerServer {
peer.ratchet_session().unwrap();
info!("got handshake response, ratcheted session.");
- let noise = NoiseBuilder::new("Noise_IKpsk2_25519_ChaChaPoly_BLAKE2s".parse().unwrap())
- .local_private_key(&state.interface_info.private_key.expect("no private key!"))
- .remote_public_key(&peer.info.pub_key)
- .prologue("WireGuard v1 zx2c4 Jason@zx2c4.com".as_bytes())
- .psk(2, &peer.info.psk.expect("no psk!"))
- .build_initiator().unwrap();
- peer.set_next_session(noise.into());
-
- let _ = state.index_map.insert(peer.our_next_index().unwrap(), peer_ref.clone());
-
- let init_packet = peer.get_handshake_packet();
- let endpoint = peer.info.endpoint.unwrap().clone();
+ // TODO neither of these timers are to spec, but are simple functional placeholders
+ let rekey_timer = self.timer.sleep(Duration::from_secs(REKEY_AFTER_TIME));
+ let rekey_future = rekey_timer.map_err(|_|()).and_then({
+ let timer_tx = self.timer_tx.clone();
+ let peer_ref = peer_ref.clone();
+ move |_| {
+ timer_tx.clone().send(TimerMessage::Rekey(peer_ref))
+ .then(|_| Ok(()))
+ }
+ }).then(|_| Ok(()));
+ self.handle.spawn(rekey_future);
- let timer = Timer::default();
- let sleep = timer.sleep(Duration::from_secs(120));
- let boop = sleep.and_then({
- let handle = self.handle.clone();
- let tx = self.udp_tx.clone();
+ let keepalive_interval = self.timer.interval(Duration::from_secs(KEEPALIVE_TIMEOUT));
+ let keepalive_future = keepalive_interval.map_err(|_|()).for_each({
+ let timer_tx = self.timer_tx.clone();
let peer_ref = peer_ref.clone();
move |_| {
- info!("sending rekey!");
- handle.spawn(tx.clone().send((endpoint, init_packet))
- .map(|_| ())
- .map_err(|_| ()));
- Ok(())
+ timer_tx.clone().send(TimerMessage::KeepAlive(peer_ref.clone()))
+ .then(|_| Ok(()))
}
- }).map_err(|_|());
- self.handle.spawn(boop);
+ });
+ self.handle.spawn(keepalive_future);
},
4 => {
let our_index_received = LittleEndian::read_u32(&packet[4..]);
@@ -127,6 +132,8 @@ impl PeerServer {
let mut peer = peer.borrow_mut();
peer.rx_bytes += packet.len();
+
+ // TODO: map index not just to peer, but to specific session instead of guessing
let res = {
let noise = peer.current_noise().expect("current noise session");
noise.set_receiving_nonce(nonce).unwrap();
@@ -143,14 +150,52 @@ impl PeerServer {
debug_packet("received TRANSPORT: ", &raw_packet[..payload_len]);
self.handle.spawn(self.tunnel_tx.clone().send(raw_packet[..payload_len].to_owned())
- .map(|_| ())
- .map_err(|_| ()));
+ .then(|_| Ok(())));
}
},
_ => unimplemented!()
}
}
+ fn handle_timer(&mut self, message: TimerMessage) {
+ let mut state = self.shared_state.borrow_mut();
+ match message {
+ TimerMessage::Rekey(peer_ref) => {
+ let mut peer = peer_ref.borrow_mut();
+ let noise = NoiseBuilder::new("Noise_IKpsk2_25519_ChaChaPoly_BLAKE2s".parse().unwrap())
+ .local_private_key(&state.interface_info.private_key.expect("no private key!"))
+ .remote_public_key(&peer.info.pub_key)
+ .prologue("WireGuard v1 zx2c4 Jason@zx2c4.com".as_bytes())
+ .psk(2, &peer.info.psk.expect("no psk!"))
+ .build_initiator().unwrap();
+ peer.set_next_session(noise.into());
+
+ let _ = state.index_map.insert(peer.our_next_index().unwrap(), peer_ref.clone());
+
+ let init_packet = peer.get_handshake_packet();
+ let endpoint = peer.info.endpoint.unwrap().clone();
+
+ self.handle.spawn(self.udp_tx.clone().send((endpoint, init_packet)).then(|_| Ok(())));
+ info!("sent rekey");
+ },
+ TimerMessage::KeepAlive(peer_ref) => {
+ let mut peer = peer_ref.borrow_mut();
+ let mut packet = vec![0u8; 1500];
+ packet[0] = 4;
+ let their_index = peer.their_current_index().expect("no current index for them");
+ let endpoint = peer.info.endpoint.unwrap();
+ peer.tx_bytes += packet.len();
+ let noise = peer.current_noise().expect("current noise session");
+ LittleEndian::write_u32(&mut packet[4..], their_index);
+ LittleEndian::write_u64(&mut packet[8..], noise.sending_nonce().unwrap());
+ let len = noise.write_message(&[], &mut packet[16..]).expect("failed to encrypt outgoing keepalive");
+ packet.truncate(len + 16);
+ self.handle.spawn(self.udp_tx.clone().send((endpoint, packet)).then(|_| Ok(())));
+ info!("sent keepalive");
+ }
+ }
+ }
+
fn handle_outgoing_packet(&mut self, packet: Vec<u8>) {
debug!("handle_outgoing_packet()");
@@ -162,6 +207,15 @@ impl Future for PeerServer {
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ // Handle pending state-changing timers
+ loop {
+ match self.timer_rx.poll() {
+ Ok(Async::Ready(Some(message))) => self.handle_timer(message),
+ Ok(Async::NotReady) => break,
+ Ok(Async::Ready(None)) | Err(_) => return Err(()),
+ }
+ }
+
// Handle UDP packets from the outside world
loop {
match self.udp_stream.poll() {
@@ -173,7 +227,7 @@ impl Future for PeerServer {
// Handle packets coming from the local tunnel
loop {
- match self.rx.poll() {
+ match self.outgoing_rx.poll() {
Ok(Async::Ready(Some(packet))) => self.handle_outgoing_packet(packet),
Ok(Async::NotReady) => break,
Ok(Async::Ready(None)) | Err(_) => return Err(()),
diff --git a/src/main.rs b/src/main.rs
index 240e6e6..7f159cc 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -26,6 +26,7 @@ extern crate byteorder;
extern crate crypto;
extern crate pnet;
+mod consts;
mod error;
mod interface;
mod protocol;