From 794933d6ddda5f45c240b66998b636bdfc484098 Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Sun, 22 Sep 2019 21:35:06 +0200 Subject: Work on peer timers --- src/constants.rs | 2 + src/handshake/macs.rs | 13 ++--- src/handshake/peer.rs | 7 +-- src/handshake/ratelimiter.rs | 10 ++-- src/timers.rs | 112 ++++++++++++++++++++++++++++++++----------- src/wireguard.rs | 45 ++++++++++++----- 6 files changed, 131 insertions(+), 58 deletions(-) (limited to 'src') diff --git a/src/constants.rs b/src/constants.rs index 829deac..5a895e5 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -9,3 +9,5 @@ pub const REJECT_AFTER_TIME: Duration = Duration::from_secs(180); pub const REKEY_ATTEMPT_TIME: Duration = Duration::from_secs(90); pub const REKEY_TIMEOUT: Duration = Duration::from_secs(5); pub const KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(10); + +pub const MAX_TIMER_HANDSHAKES: usize = 18; diff --git a/src/handshake/macs.rs b/src/handshake/macs.rs index 516b9dc..689826b 100644 --- a/src/handshake/macs.rs +++ b/src/handshake/macs.rs @@ -1,5 +1,4 @@ use generic_array::GenericArray; -use lazy_static::lazy_static; use rand::{CryptoRng, RngCore}; use spin::RwLock; use std::time::{Duration, Instant}; @@ -27,9 +26,7 @@ const SIZE_SECRET: usize = 32; const SIZE_MAC: usize = 16; // blake2s-mac128 const SIZE_TAG: usize = 16; // xchacha20poly1305 tag -lazy_static! { - pub static ref COOKIE_UPDATE_INTERVAL: Duration = Duration::new(120, 0); -} +const COOKIE_UPDATE_INTERVAL: Duration = Duration::from_secs(120); macro_rules! HASH { ( $($input:expr),* ) => {{ @@ -168,7 +165,7 @@ impl Generator { macs.f_mac1 = MAC!(&self.mac1_key, inner); macs.f_mac2 = match &self.cookie { Some(cookie) => { - if cookie.birth.elapsed() > *COOKIE_UPDATE_INTERVAL { + if cookie.birth.elapsed() > COOKIE_UPDATE_INTERVAL { self.cookie = None; [0u8; SIZE_MAC] } else { @@ -206,7 +203,7 @@ impl Validator { fn get_tau(&self, src: &[u8]) -> Option<[u8; SIZE_COOKIE]> { let secret = self.secret.read(); - if secret.birth.elapsed() < *COOKIE_UPDATE_INTERVAL { + if secret.birth.elapsed() < COOKIE_UPDATE_INTERVAL { Some(MAC!(&secret.value, src)) } else { None @@ -217,7 +214,7 @@ impl Validator { // check if current value is still valid { let secret = self.secret.read(); - if secret.birth.elapsed() < *COOKIE_UPDATE_INTERVAL { + if secret.birth.elapsed() < COOKIE_UPDATE_INTERVAL { return MAC!(&secret.value, src); }; } @@ -225,7 +222,7 @@ impl Validator { // take write lock, check again { let mut secret = self.secret.write(); - if secret.birth.elapsed() < *COOKIE_UPDATE_INTERVAL { + if secret.birth.elapsed() < COOKIE_UPDATE_INTERVAL { return MAC!(&secret.value, src); }; diff --git a/src/handshake/peer.rs b/src/handshake/peer.rs index 6a85cee..c9e1c40 100644 --- a/src/handshake/peer.rs +++ b/src/handshake/peer.rs @@ -1,4 +1,3 @@ -use lazy_static::lazy_static; use spin::Mutex; use std::mem; @@ -18,9 +17,7 @@ use super::macs; use super::timestamp; use super::types::*; -lazy_static! { - pub static ref TIME_BETWEEN_INITIATIONS: Duration = Duration::from_millis(20); -} +const TIME_BETWEEN_INITIATIONS: Duration = Duration::from_millis(20); /* Represents the recomputation and state of a peer. * @@ -123,7 +120,7 @@ impl Peer { // check flood attack match *last_initiation_consumption { Some(last) => { - if last.elapsed() < *TIME_BETWEEN_INITIATIONS { + if last.elapsed() < TIME_BETWEEN_INITIATIONS { return Err(HandshakeError::InitiationFlood); } } diff --git a/src/handshake/ratelimiter.rs b/src/handshake/ratelimiter.rs index 6568b32..63d728c 100644 --- a/src/handshake/ratelimiter.rs +++ b/src/handshake/ratelimiter.rs @@ -6,16 +6,12 @@ use std::sync::{Arc, Condvar, Mutex}; use std::thread; use std::time::{Duration, Instant}; -use lazy_static::lazy_static; - const PACKETS_PER_SECOND: u64 = 20; const PACKETS_BURSTABLE: u64 = 5; const PACKET_COST: u64 = 1_000_000_000 / PACKETS_PER_SECOND; const MAX_TOKENS: u64 = PACKET_COST * PACKETS_BURSTABLE; -lazy_static! { - pub static ref GC_INTERVAL: Duration = Duration::new(1, 0); -} +const GC_INTERVAL: Duration = Duration::from_secs(1); struct Entry { pub last_time: Instant, @@ -93,7 +89,7 @@ impl RateLimiter { { let mut tw = limiter.table.write(); tw.retain(|_, ref mut entry| { - entry.lock().last_time.elapsed() <= *GC_INTERVAL + entry.lock().last_time.elapsed() <= GC_INTERVAL }); if tw.len() == 0 { limiter.gc_running.store(false, Ordering::Relaxed); @@ -102,7 +98,7 @@ impl RateLimiter { } // wait until stopped or new GC (~1 every sec) - let res = cvar.wait_timeout(dropped, *GC_INTERVAL).unwrap(); + let res = cvar.wait_timeout(dropped, GC_INTERVAL).unwrap(); dropped = res.0; } }); diff --git a/src/timers.rs b/src/timers.rs index 0d69c3f..fc00d85 100644 --- a/src/timers.rs +++ b/src/timers.rs @@ -1,14 +1,15 @@ -use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; -use std::sync::Arc; +use std::marker::PhantomData; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::time::Duration; use hjul::{Runner, Timer}; +use crate::constants::*; use crate::router::Callbacks; +use crate::types::{Bind, Tun}; +use crate::wireguard::Peer; -const ZERO_DURATION: Duration = Duration::from_micros(0); - -pub struct TimersInner { +pub struct Timers { handshake_pending: AtomicBool, handshake_attempts: AtomicUsize, @@ -16,24 +17,80 @@ pub struct TimersInner { send_keepalive: Timer, zero_key_material: Timer, new_handshake: Timer, - - // stats - rx_bytes: AtomicU64, - tx_bytes: AtomicU64, } -impl TimersInner { - pub fn new(runner: &Runner) -> Timers { - Arc::new(TimersInner { +impl Timers { + pub fn new(runner: &Runner, peer: Peer) -> Timers + where + T: Tun, + B: Bind, + { + // create a timer instance for the provided peer + Timers { + handshake_pending: AtomicBool::new(false), + handshake_attempts: AtomicUsize::new(0), + retransmit_handshake: { + let peer = peer.clone(); + runner.timer(move || { + if peer.timers.read().handshake_retry() { + peer.new_handshake(); + } + }) + }, + new_handshake: { + let peer = peer.clone(); + runner.timer(move || { + peer.new_handshake(); + peer.timers.read().handshake_begun(); + }) + }, + send_keepalive: { + let peer = peer.clone(); + runner.timer(move || { + peer.router.keepalive(); + let keepalive = peer.keepalive.load(Ordering::Acquire); + if keepalive > 0 { + peer.timers + .read() + .send_keepalive + .reset(Duration::from_secs(keepalive as u64)) + } + }) + }, + zero_key_material: { + let peer = peer.clone(); + runner.timer(move || { + peer.router.zero_keys(); + }) + }, + } + } + + fn handshake_begun(&self) { + self.handshake_pending.store(true, Ordering::SeqCst); + self.handshake_attempts.store(0, Ordering::SeqCst); + self.retransmit_handshake.reset(REKEY_TIMEOUT); + } + + fn handshake_retry(&self) -> bool { + if self.handshake_attempts.fetch_add(1, Ordering::SeqCst) <= MAX_TIMER_HANDSHAKES { + self.retransmit_handshake.reset(REKEY_TIMEOUT); + true + } else { + self.handshake_pending.store(false, Ordering::SeqCst); + false + } + } + + pub fn dummy(runner: &Runner) -> Timers { + Timers { handshake_pending: AtomicBool::new(false), handshake_attempts: AtomicUsize::new(0), retransmit_handshake: runner.timer(|| {}), new_handshake: runner.timer(|| {}), send_keepalive: runner.timer(|| {}), zero_key_material: runner.timer(|| {}), - rx_bytes: AtomicU64::new(0), - tx_bytes: AtomicU64::new(0), - }) + } } pub fn handshake_sent(&self) { @@ -41,25 +98,26 @@ impl TimersInner { } } -pub type Timers = Arc; +/* Instance of the router callbacks */ -pub struct Events(); +pub struct Events(PhantomData<(T, B)>); -impl Callbacks for Events { - type Opaque = Timers; +impl Callbacks for Events { + type Opaque = Peer; - fn send(t: &Timers, size: usize, data: bool, sent: bool) { - t.tx_bytes.fetch_add(size as u64, Ordering::Relaxed); + fn send(peer: &Peer, size: usize, data: bool, sent: bool) { + peer.tx_bytes.fetch_add(size as u64, Ordering::Relaxed); } - fn recv(t: &Timers, size: usize, data: bool, sent: bool) { - t.rx_bytes.fetch_add(size as u64, Ordering::Relaxed); + fn recv(peer: &Peer, size: usize, data: bool, sent: bool) { + peer.rx_bytes.fetch_add(size as u64, Ordering::Relaxed); } - fn need_key(t: &Timers) { - if !t.handshake_pending.swap(true, Ordering::SeqCst) { - t.handshake_attempts.store(0, Ordering::SeqCst); - t.new_handshake.reset(ZERO_DURATION); + fn need_key(peer: &Peer) { + let timers = peer.timers.read(); + if !timers.handshake_pending.swap(true, Ordering::SeqCst) { + timers.handshake_attempts.store(0, Ordering::SeqCst); + timers.new_handshake.fire(); } } } diff --git a/src/wireguard.rs b/src/wireguard.rs index 3b4724e..cd61cf0 100644 --- a/src/wireguard.rs +++ b/src/wireguard.rs @@ -22,27 +22,36 @@ const SIZE_HANDSHAKE_QUEUE: usize = 128; const THRESHOLD_UNDER_LOAD: usize = SIZE_HANDSHAKE_QUEUE / 4; const DURATION_UNDER_LOAD: Duration = Duration::from_millis(10_000); -type Peer = Arc>; +pub type Peer = Arc>; pub struct PeerInner { - queue: Mutex>>, // handshake queue - router: router::Peer, // router peer - timers: Option, // + pub keepalive: AtomicUsize, // keepalive interval + pub rx_bytes: AtomicU64, + pub tx_bytes: AtomicU64, + pub pk: PublicKey, // DISCUSS: Change layout in handshake module (adopt pattern of router), to avoid this. + pub queue: Mutex>>, // handshake queue + pub router: router::Peer, T, B>, // router peer + pub timers: RwLock, // } impl PeerInner { - #[inline(always)] - fn timers(&self) -> &Timers { - self.timers.as_ref().unwrap() + pub fn new_handshake(&self) { + self.queue.lock().send(HandshakeJob::New(self.pk)).unwrap(); } } +macro_rules! timers { + ($peer:expr) => { + $peer.timers.read() + }; +} + struct Handshake { device: handshake::Device, active: bool, } -enum HandshakeJob { +pub enum HandshakeJob { Message(Vec, E), New(PublicKey), } @@ -51,8 +60,8 @@ struct WireguardInner { // identify and configuration map peers: RwLock>>, - // cryptkey routing - router: router::Device, + // cryptkey router + router: router::Device, T, B>, // handshake related state handshake: RwLock, @@ -84,6 +93,20 @@ impl Wireguard { } } + /* + fn new_peer(&self, pk: PublicKey) -> Peer { + let router = self.state.router.new_peer(); + + Arc::new(PeerInner { + pk, + queue: Mutex::new(self.state.queue.lock().clone()), + keepalive: AtomicUsize::new(0), + rx_bytes: AtomicU64::new(0), + tx_bytes: AtomicU64::new(0), + }) + } + */ + fn new(tun: T, bind: B) -> Wireguard { // create device state let mut rng = OsRng::new().unwrap(); @@ -166,7 +189,7 @@ impl Wireguard { let msg = state.device.begin(&mut rng, &pk).unwrap(); // TODO handle if let Some(peer) = wg.peers.read().get(pk.as_bytes()) { peer.router.send(&msg[..]); - peer.timers().handshake_sent(); + timers!(peer).handshake_sent(); } } } -- cgit v1.2.3-59-g8ed1b