aboutsummaryrefslogtreecommitdiffstats
path: root/src/interface/peer_server.rs
diff options
context:
space:
mode:
authorJake McGinty <me@jake.su>2018-02-14 13:36:41 +0000
committerJake McGinty <me@jake.su>2018-02-14 13:36:41 +0000
commit59687b82503f30ccfc169a83dc699177b0baba85 (patch)
tree12a4886c59e7b00c4df857655610bd3b8230e108 /src/interface/peer_server.rs
parentuse constant time comparison for mac (diff)
downloadwireguard-rs-59687b82503f30ccfc169a83dc699177b0baba85.tar.xz
wireguard-rs-59687b82503f30ccfc169a83dc699177b0baba85.zip
timer module
Diffstat (limited to 'src/interface/peer_server.rs')
-rw-r--r--src/interface/peer_server.rs103
1 files changed, 43 insertions, 60 deletions
diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs
index c53ecc4..9e3ab63 100644
--- a/src/interface/peer_server.rs
+++ b/src/interface/peer_server.rs
@@ -1,11 +1,12 @@
use super::{SharedState, SharedPeer, UtunPacket, trace_packet};
-use consts::{REKEY_AFTER_TIME, KEEPALIVE_TIMEOUT, MAX_CONTENT_SIZE, TRANSPORT_HEADER_SIZE, TRANSPORT_OVERHEAD};
-use protocol::Session;
+use consts::{REKEY_TIMEOUT, REKEY_AFTER_TIME, KEEPALIVE_TIMEOUT, MAX_CONTENT_SIZE, TRANSPORT_HEADER_SIZE, TRANSPORT_OVERHEAD};
+use protocol::{Session, SessionType};
use noise::Noise;
+use timer::{Timer, TimerMessage};
use std::io;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
-use std::time::Duration;
+use std::time::{Duration, SystemTime};
use base64;
use byteorder::{ByteOrder, BigEndian, LittleEndian};
@@ -19,7 +20,6 @@ use snow;
use tokio_core::net::{UdpSocket, UdpCodec, UdpFramed};
use tokio_core::reactor::Handle;
use tokio_io::codec::Framed;
-use tokio_timer::{Interval, Timer};
use treebitmap::{IpLookupTable, IpLookupTableOps};
@@ -55,21 +55,13 @@ impl UdpCodec for VecUdpCodec {
}
}
-#[derive(Debug)]
-pub enum TimerMessage {
- KeepAlive(SharedPeer, u32),
- Rekey(SharedPeer, u32),
-}
-
pub struct PeerServer {
handle: Handle,
shared_state: SharedState,
- timer: Timer,
udp_stream: stream::SplitStream<UdpFramed<VecUdpCodec>>,
+ timer: Timer,
outgoing_tx: unsync::mpsc::Sender<UtunPacket>,
outgoing_rx: futures::stream::Peekable<unsync::mpsc::Receiver<UtunPacket>>,
- timer_tx: unsync::mpsc::Sender<TimerMessage>,
- timer_rx: unsync::mpsc::Receiver<TimerMessage>,
udp_tx: unsync::mpsc::Sender<(SocketAddr, Vec<u8>)>,
tunnel_tx: unsync::mpsc::Sender<UtunPacket>,
}
@@ -80,13 +72,12 @@ impl PeerServer {
socket.set_only_v6(false)?;
socket.set_nonblocking(true)?;
socket.bind(&SocketAddr::from((Ipv6Addr::unspecified(), 0)).into())?;
+ let timer = Timer::new();
let socket = UdpSocket::from_socket(socket.into_udp_socket(), &handle.clone())?;
let (udp_sink, udp_stream) = socket.framed(VecUdpCodec{}).split();
- let (timer_tx, timer_rx) = unsync::mpsc::channel::<TimerMessage>(1024);
let (udp_tx, udp_rx) = unsync::mpsc::channel::<(SocketAddr, Vec<u8>)>(1024);
let (outgoing_tx, outgoing_rx) = unsync::mpsc::channel::<UtunPacket>(1024);
let outgoing_rx = outgoing_rx.peekable();
- let timer = Timer::default();
let udp_write_passthrough = udp_sink.sink_map_err(|_| ()).send_all(
udp_rx.map(|(addr, packet)| {
@@ -97,7 +88,7 @@ impl PeerServer {
handle.spawn(udp_write_passthrough);
Ok(PeerServer {
- handle, shared_state, timer, udp_stream, udp_tx, tunnel_tx, timer_tx, timer_rx, outgoing_tx, outgoing_rx
+ handle, shared_state, timer, udp_stream, udp_tx, tunnel_tx, outgoing_tx, outgoing_rx
})
}
@@ -118,7 +109,7 @@ impl PeerServer {
}
fn handle_incoming_packet(&mut self, addr: SocketAddr, packet: Vec<u8>) -> Result<(), Error> {
- debug!("got a UDP packet from {:?} of length {}, packet type {}", &addr, packet.len(), packet[0]);
+ trace!("got a UDP packet from {:?} of length {}, packet type {}", &addr, packet.len(), packet[0]);
let mut state = self.shared_state.borrow_mut();
match packet[0] {
1 => {
@@ -130,7 +121,7 @@ impl PeerServer {
Noise::verify_mac1(pubkey, mac_in, &mac_out[..16])?;
}
- info!("got handshake initiation request");
+ info!("got handshake initiation request (0x01)");
let their_index = LittleEndian::read_u32(&packet[4..]);
@@ -165,6 +156,8 @@ impl PeerServer {
let (mac_in, mac_out) = packet.split_at(60);
Noise::verify_mac1(pubkey, mac_in, &mac_out[..16])?;
}
+ info!("got handshake response (0x02)");
+
let our_index = LittleEndian::read_u32(&packet[8..]);
let peer_ref = state.index_map.get(&our_index)
.ok_or_else(|| format_err!("unknown our_index ({})", our_index))?
@@ -174,35 +167,16 @@ impl PeerServer {
if let Some(index) = dead_index {
let _ = state.index_map.remove(&index);
}
- info!("got handshake response, ratcheted session.");
-
- // TODO neither of these timers are to spec, but are simple functional placeholders
- let rekey_timer = self.timer.sleep(Duration::from_secs(REKEY_AFTER_TIME));
- let rekey_future = rekey_timer.map_err(|_|()).and_then({
- let timer_tx = self.timer_tx.clone();
- let peer_ref = peer_ref.clone();
- move |_| {
- timer_tx.clone().send(TimerMessage::Rekey(peer_ref, our_index))
- .then(|_| Ok(()))
- }
- }).then(|_| Ok(()));
- self.handle.spawn(rekey_future);
-
- let keepalive_interval = self.timer.interval(Duration::from_secs(KEEPALIVE_TIMEOUT));
- let keepalive_future = keepalive_interval.map_err(|_|()).for_each({
- let timer_tx = self.timer_tx.clone();
- let peer_ref = peer_ref.clone();
- move |_| -> Box<Future<Item = _, Error = _>> {
- if peer_ref.borrow().our_current_index().unwrap() != our_index {
- debug!("cancelling old keepalive_timer");
- Box::new(future::err(()))
- } else {
- Box::new(timer_tx.clone().send(TimerMessage::KeepAlive(peer_ref.clone(), our_index))
- .then(|_| Ok(())))
- }
- }
- });
- self.handle.spawn(keepalive_future);
+ info!("handshake response processed, current session now {}", our_index);
+
+ // Start the timers for this new session
+ self.timer.spawn_delayed(&self.handle,
+ REKEY_AFTER_TIME,
+ TimerMessage::Rekey(peer_ref.clone(), our_index));
+
+ self.timer.spawn_delayed(&self.handle,
+ KEEPALIVE_TIMEOUT,
+ TimerMessage::KeepAlive(peer_ref.clone(), our_index));
},
3 => {
warn!("cookie messages not yet implemented.");
@@ -244,6 +218,13 @@ impl PeerServer {
TimerMessage::Rekey(peer_ref, _our_index) => {
let mut peer = peer_ref.borrow_mut();
+ let now = SystemTime::now();
+ if let Some(last_init) = peer.last_rekey_init {
+ if now.duration_since(last_init).unwrap() < Duration::from_secs(REKEY_TIMEOUT) {
+ debug!("too soon since last rekey attempt");
+ }
+ }
+
let private_key = &state.interface_info.private_key.expect("no private key!");
let (init_packet, our_index) = peer.initiate_new_session(private_key).unwrap();
let _ = state.index_map.insert(our_index, peer_ref.clone());
@@ -253,19 +234,21 @@ impl PeerServer {
self.send_to_peer((endpoint, init_packet));
info!("sent rekey");
},
- TimerMessage::KeepAlive(peer_ref, _our_index) => {
+ TimerMessage::KeepAlive(peer_ref, our_index) => {
let mut peer = peer_ref.borrow_mut();
- let mut packet = vec![0u8; TRANSPORT_OVERHEAD];
- packet[0] = 4;
- let their_index = peer.their_current_index().expect("no current index for them");
- let endpoint = peer.info.endpoint.unwrap();
- peer.tx_bytes += packet.len() as u64;
- let noise = peer.current_noise().expect("current noise session");
- LittleEndian::write_u32(&mut packet[4..], their_index);
- LittleEndian::write_u64(&mut packet[8..], noise.sending_nonce().unwrap());
- let _ = noise.write_message(&[], &mut packet[16..]).map_err(SyncFailure::new)?;
- self.send_to_peer((endpoint, packet));
- debug!("sent keepalive");
+ match peer.find_session(our_index) {
+ Some((_, SessionType::Current)) => {
+ self.send_to_peer(peer.handle_outgoing_transport(&[])?);
+ debug!("sent keepalive packet ({})", our_index);
+
+ self.timer.spawn_delayed(&self.handle,
+ KEEPALIVE_TIMEOUT,
+ TimerMessage::KeepAlive(peer_ref.clone(), our_index));
+ }
+ _ => {
+ debug!("keepalive timer received for non-current session, ignoring.");
+ }
+ }
}
}
Ok(())
@@ -304,7 +287,7 @@ impl Future for PeerServer {
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// Handle pending state-changing timers
loop {
- match self.timer_rx.poll() {
+ match self.timer.poll() {
Ok(Async::Ready(Some(message))) => {
let _ = self.handle_timer(message).map_err(|e| warn!("TIMER ERR: {:?}", e));
},