From 723a1b8e858346ef98559788540915bc0cc93eb0 Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Mon, 12 Aug 2019 21:04:19 +0200 Subject: Port replay filter and sketch router state --- Cargo.lock | 14 ++++ Cargo.toml | 2 + src/handshake/device.rs | 44 ++++++------ src/handshake/ratelimiter.rs | 31 +++++---- src/main.rs | 3 + src/mod.rs | 3 - src/router/anti_replay.rs | 156 +++++++++++++++++++++++++++++++++++++++++++ src/router/buffer.rs | 48 +++++++------ src/router/device.rs | 79 +++++++++++++++------- src/router/mod.rs | 4 +- 10 files changed, 298 insertions(+), 86 deletions(-) delete mode 100644 src/mod.rs create mode 100644 src/router/anti_replay.rs diff --git a/Cargo.lock b/Cargo.lock index c9778b4..d25d5fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5,6 +5,11 @@ name = "adler32" version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "arraydeque" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "arrayvec" version = "0.4.11" @@ -952,6 +957,11 @@ dependencies = [ "tokio-reactor 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "treebitmap" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "typenum" version = "1.10.0" @@ -1013,6 +1023,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" name = "wireguard-rs" version = "0.1.0" dependencies = [ + "arraydeque 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "blake2 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "byteorder 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "digest 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1027,6 +1038,7 @@ dependencies = [ "spin 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "subtle 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", + "treebitmap 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "x25519-dalek 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "zerocopy 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1079,6 +1091,7 @@ dependencies = [ [metadata] "checksum adler32 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "7e522997b529f05601e05166c07ed17789691f562762c7f3b987263d2dedee5c" +"checksum arraydeque 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)" = "f0ffd3d69bd89910509a5d31d1f1353f38ccffdd116dd0099bbd6627f7bd8ad8" "checksum arrayvec 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)" = "b8d73f9beda665eaa98ab9e4f7442bd4e7de6652587de55b2525e52e29c1b0ba" "checksum autocfg 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "0e49efa51329a5fd37e7c79db4621af617cd4e3e5bc224939808d076077077bf" "checksum bit-set 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e84c238982c4b1e1ee668d136c510c67a13465279c0cb367ea6baf6310620a80" @@ -1188,6 +1201,7 @@ dependencies = [ "checksum tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "f2106812d500ed25a4f38235b9cae8f78a09edf43203e16e59c3b769a342a60e" "checksum tokio-udp 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "66268575b80f4a4a710ef83d087fdfeeabdce9b74c797535fbac18a2cb906e92" "checksum tokio-uds 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "037ffc3ba0e12a0ab4aca92e5234e0dedeb48fddf6ccd260f1f150a36a9f2445" +"checksum treebitmap 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6bf423939ac9ccf4083788879b883a7149176586f9cf8b0fb1fd88b66ad692b5" "checksum typenum 1.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "612d636f949607bdf9b123b4a6f6d966dedf3ff669f7f045890d3a4a73948169" "checksum ucd-util 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "fa9b3b49edd3468c0e6565d85783f51af95212b6fa3986a5500954f00b460874" "checksum unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fc72304796d0818e357ead4e000d19c9c174ab23dc11093ac919054d20a6a7fc" diff --git a/Cargo.toml b/Cargo.toml index c3af1c4..5b81617 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,8 @@ sodiumoxide = "0.2.2" lazy_static = "^1.3" tokio = "0.1.22" futures = "0.1.28" +arraydeque = "^0.4" +treebitmap = "^0.4" [dependencies.x25519-dalek] version = "^0.5" diff --git a/src/handshake/device.rs b/src/handshake/device.rs index 86a832a..1c7a30d 100644 --- a/src/handshake/device.rs +++ b/src/handshake/device.rs @@ -356,12 +356,14 @@ mod tests { use super::super::messages::*; use super::*; use hex; - use std::thread; use rand::rngs::OsRng; - use std::time::Duration; use std::net::SocketAddr; + use std::thread; + use std::time::Duration; - fn setup_devices(rng : &mut R) -> (PublicKey, Device, PublicKey, Device) { + fn setup_devices( + rng: &mut R, + ) -> (PublicKey, Device, PublicKey, Device) { // generate new keypairs let sk1 = StaticSecret::new(rng); @@ -390,7 +392,7 @@ mod tests { } /* Test longest possible handshake interaction (7 messages): - * + * * 1. I -> R (initation) * 2. I <- R (cookie reply) * 3. I -> R (initation) @@ -402,28 +404,28 @@ mod tests { #[test] fn handshake_under_load() { let mut rng = OsRng::new().unwrap(); - let (_pk1, dev1, pk2, dev2) = setup_devices(&mut rng); + let (_pk1, dev1, pk2, dev2) = setup_devices(&mut rng); - let src1 : SocketAddr = "172.16.0.1:8080".parse().unwrap(); - let src2 : SocketAddr = "172.16.0.2:7070".parse().unwrap(); + let src1: SocketAddr = "172.16.0.1:8080".parse().unwrap(); + let src2: SocketAddr = "172.16.0.2:7070".parse().unwrap(); // 1. device-1 : create first initation let msg_init = dev1.begin(&mut rng, &pk2).unwrap(); - + // 2. device-2 : responds with CookieReply let msg_cookie = match dev2.process(&mut rng, &msg_init, Some(&src1)).unwrap() { (None, Some(msg), None) => msg, - _ => panic!("unexpected response") + _ => panic!("unexpected response"), }; // device-1 : processes CookieReply (no response) match dev1.process(&mut rng, &msg_cookie, Some(&src2)).unwrap() { (None, None, None) => (), - _ => panic!("unexpected response") + _ => panic!("unexpected response"), } // avoid initation flood - thread::sleep(Duration::from_millis(20)); + thread::sleep(Duration::from_millis(20)); // 3. device-1 : create second initation let msg_init = dev1.begin(&mut rng, &pk2).unwrap(); @@ -433,24 +435,24 @@ mod tests { (Some(_), Some(msg), Some(kp)) => { assert_eq!(kp.confirmed, false); msg - }, - _ => panic!("unexpected response") + } + _ => panic!("unexpected response"), }; // 5. device-1 : responds with CookieReply let msg_cookie = match dev1.process(&mut rng, &msg_response, Some(&src2)).unwrap() { (None, Some(msg), None) => msg, - _ => panic!("unexpected response") + _ => panic!("unexpected response"), }; // device-2 : processes CookieReply (no response) match dev2.process(&mut rng, &msg_cookie, Some(&src1)).unwrap() { (None, None, None) => (), - _ => panic!("unexpected response") + _ => panic!("unexpected response"), } // avoid initation flood - thread::sleep(Duration::from_millis(20)); + thread::sleep(Duration::from_millis(20)); // 6. device-1 : create third initation let msg_init = dev1.begin(&mut rng, &pk2).unwrap(); @@ -460,8 +462,8 @@ mod tests { (Some(_), Some(msg), Some(kp)) => { assert_eq!(kp.confirmed, false); (msg, kp) - }, - _ => panic!("unexpected response") + } + _ => panic!("unexpected response"), }; // device-1 : process noise response @@ -469,8 +471,8 @@ mod tests { (Some(_), None, Some(kp)) => { assert_eq!(kp.confirmed, true); kp - }, - _ => panic!("unexpected response") + } + _ => panic!("unexpected response"), }; assert_eq!(kp1.send, kp2.recv); @@ -480,7 +482,7 @@ mod tests { #[test] fn handshake_no_load() { let mut rng = OsRng::new().unwrap(); - let (pk1, mut dev1, pk2, mut dev2) = setup_devices(&mut rng); + let (pk1, mut dev1, pk2, mut dev2) = setup_devices(&mut rng); // do a few handshakes (every handshake should succeed) diff --git a/src/handshake/ratelimiter.rs b/src/handshake/ratelimiter.rs index 02b82e7..6568b32 100644 --- a/src/handshake/ratelimiter.rs +++ b/src/handshake/ratelimiter.rs @@ -1,10 +1,10 @@ +use spin; use std::collections::HashMap; use std::net::IpAddr; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Condvar, Mutex, Arc}; +use std::sync::{Arc, Condvar, Mutex}; use std::thread; use std::time::{Duration, Instant}; -use spin; use lazy_static::lazy_static; @@ -24,7 +24,7 @@ struct Entry { pub struct RateLimiter(Arc); -struct RateLimiterInner{ +struct RateLimiterInner { gc_running: AtomicBool, gc_dropped: (Mutex, Condvar), table: spin::RwLock>>, @@ -42,13 +42,11 @@ impl Drop for RateLimiter { impl RateLimiter { pub fn new() -> Self { - RateLimiter ( - Arc::new(RateLimiterInner { - gc_dropped: (Mutex::new(false), Condvar::new()), - gc_running: AtomicBool::from(false), - table: spin::RwLock::new(HashMap::new()), - }) - ) + RateLimiter(Arc::new(RateLimiterInner { + gc_dropped: (Mutex::new(false), Condvar::new()), + gc_running: AtomicBool::from(false), + table: spin::RwLock::new(HashMap::new()), + })) } pub fn allow(&self, addr: &IpAddr) -> bool { @@ -60,8 +58,8 @@ impl RateLimiter { let mut entry = entry.lock(); // add tokens earned since last time - entry.tokens = - MAX_TOKENS.min(entry.tokens + u64::from(entry.last_time.elapsed().subsec_nanos())); + entry.tokens = MAX_TOKENS + .min(entry.tokens + u64::from(entry.last_time.elapsed().subsec_nanos())); entry.last_time = Instant::now(); // subtract cost of packet @@ -72,7 +70,7 @@ impl RateLimiter { return false; } } - + // add new entry (write lock) self.0.table.write().insert( *addr, @@ -94,7 +92,9 @@ impl RateLimiter { // garbage collect { let mut tw = limiter.table.write(); - tw.retain(|_, ref mut entry| entry.lock().last_time.elapsed() <= *GC_INTERVAL); + tw.retain(|_, ref mut entry| { + entry.lock().last_time.elapsed() <= *GC_INTERVAL + }); if tw.len() == 0 { limiter.gc_running.store(false, Ordering::Relaxed); return; @@ -102,7 +102,7 @@ impl RateLimiter { } // wait until stopped or new GC (~1 every sec) - let res = cvar.wait_timeout(dropped,*GC_INTERVAL).unwrap(); + let res = cvar.wait_timeout(dropped, *GC_INTERVAL).unwrap(); dropped = res.0; } }); @@ -110,7 +110,6 @@ impl RateLimiter { allowed } - } #[cfg(test)] diff --git a/src/main.rs b/src/main.rs index 26974a7..e49a217 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,7 @@ +#![feature(test)] + mod handshake; +mod router; mod types; use sodiumoxide; diff --git a/src/mod.rs b/src/mod.rs deleted file mode 100644 index 1829e6e..0000000 --- a/src/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod noise; -mod types; -mod router; \ No newline at end of file diff --git a/src/router/anti_replay.rs b/src/router/anti_replay.rs new file mode 100644 index 0000000..5d898ac --- /dev/null +++ b/src/router/anti_replay.rs @@ -0,0 +1,156 @@ +use std::mem; + +// Implementation of RFC 6479. +// https://tools.ietf.org/html/rfc6479 + +#[cfg(target_pointer_width = "64")] +type Word = u64; + +#[cfg(target_pointer_width = "64")] +const REDUNDANT_BIT_SHIFTS: usize = 6; + +#[cfg(target_pointer_width = "32")] +type Word = u32; + +#[cfg(target_pointer_width = "32")] +const REDUNDANT_BIT_SHIFTS: usize = 5; + +const SIZE_OF_WORD: usize = mem::size_of::() * 8; + +const BITMAP_BITLEN: usize = 2048; +const BITMAP_LEN: usize = (BITMAP_BITLEN / SIZE_OF_WORD); +const BITMAP_INDEX_MASK: u64 = BITMAP_LEN as u64 - 1; +const BITMAP_LOC_MASK: u64 = (SIZE_OF_WORD - 1) as u64; +const WINDOW_SIZE: u64 = (BITMAP_BITLEN - SIZE_OF_WORD) as u64; + +pub struct AntiReplay { + bitmap: [Word; BITMAP_LEN], + last: u64, +} + +impl Default for AntiReplay { + fn default() -> Self { + AntiReplay::new() + } +} + +impl AntiReplay { + pub fn new() -> Self { + debug_assert_eq!(1 << REDUNDANT_BIT_SHIFTS, SIZE_OF_WORD); + debug_assert_eq!(BITMAP_BITLEN % SIZE_OF_WORD, 0); + AntiReplay { + last: 0, + bitmap: [0; BITMAP_LEN], + } + } + + // Returns true if check is passed, i.e., not a replay or too old. + // + // Unlike RFC 6479, zero is allowed. + fn check(&self, seq: u64) -> bool { + // Larger is always good. + if seq > self.last { + return true; + } + + if self.last - seq > WINDOW_SIZE { + return false; + } + + let bit_location = seq & BITMAP_LOC_MASK; + let index = (seq >> REDUNDANT_BIT_SHIFTS) & BITMAP_INDEX_MASK; + + self.bitmap[index as usize] & (1 << bit_location) == 0 + } + + // Should only be called if check returns true. + fn update_store(&mut self, seq: u64) { + debug_assert!(self.check(seq)); + + let index = seq >> REDUNDANT_BIT_SHIFTS; + + if seq > self.last { + let index_cur = self.last >> REDUNDANT_BIT_SHIFTS; + let diff = index - index_cur; + + if diff >= BITMAP_LEN as u64 { + self.bitmap = [0; BITMAP_LEN]; + } else { + for i in 0..diff { + let real_index = (index_cur + i + 1) & BITMAP_INDEX_MASK; + self.bitmap[real_index as usize] = 0; + } + } + + self.last = seq; + } + + let index = index & BITMAP_INDEX_MASK; + let bit_location = seq & BITMAP_LOC_MASK; + self.bitmap[index as usize] |= 1 << bit_location; + } + + /// Checks and marks a sequence number in the replay filter + /// + /// # Arguments + /// + /// - seq: Sequence number check for replay and add to filter + /// + /// # Returns + /// + /// Ok(()) if sequence number is valid (not marked and not behind the moving window). + /// Err if the sequence number is invalid (already marked or "too old"). + pub fn update(&mut self, seq: u64) -> Result<(), ()> { + if self.check(seq) { + self.update_store(seq); + Ok(()) + } else { + Err(()) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn anti_replay() { + let mut ar = AntiReplay::new(); + + for i in 0..20000 { + ar.update(i).unwrap(); + } + + for i in (0..20000).rev() { + assert!(!ar.check(i)); + } + + ar.update(65536).unwrap(); + for i in (65536 - WINDOW_SIZE)..65535 { + ar.update(i).unwrap(); + } + for i in (65536 - 10 * WINDOW_SIZE)..65535 { + assert!(!ar.check(i)); + } + + ar.update(66000).unwrap(); + for i in 65537..66000 { + ar.update(i).unwrap(); + } + for i in 65537..66000 { + assert!(ar.update(i).is_err()); + } + + // Test max u64. + let next = u64::max_value(); + ar.update(next).unwrap(); + assert!(!ar.check(next)); + for i in (next - WINDOW_SIZE)..next { + ar.update(i).unwrap(); + } + for i in (next - 20 * WINDOW_SIZE)..next { + assert!(!ar.check(i)); + } + } +} diff --git a/src/router/buffer.rs b/src/router/buffer.rs index 3f348ea..96b16ab 100644 --- a/src/router/buffer.rs +++ b/src/router/buffer.rs @@ -4,20 +4,23 @@ * 2. Inserting into the buffer always succeeds, but might overwrite the oldest item */ -const BUFFER_SIZE : usize = 1024; +const BUFFER_SIZE: usize = 1024; pub struct DiscardingRingBuffer { - buf : [ Option ; BUFFER_SIZE], - idx : usize, - next : usize + buf: [Option; BUFFER_SIZE], + idx: usize, + next: usize, } -impl DiscardingRingBuffer where T: Copy { +impl DiscardingRingBuffer +where + T: Copy, +{ pub fn new() -> Self { - DiscardingRingBuffer{ - buf : [None; BUFFER_SIZE], - idx : 0, - next : 0 + DiscardingRingBuffer { + buf: [None; BUFFER_SIZE], + idx: 0, + next: 0, } } @@ -29,7 +32,7 @@ impl DiscardingRingBuffer where T: Copy { } } - pub fn push(&mut self, val : T) { + pub fn push(&mut self, val: T) { // assign next slot (free / oldest) self.buf[self.idx] = Some(val); self.idx += 1; @@ -57,20 +60,25 @@ impl DiscardingRingBuffer where T: Copy { pub fn has_element(&self) -> bool { match self.buf[self.next] { None => true, - _ => false + _ => false, } } } +#[cfg(test)] +mod tests { + use super::*; + use proptest::prelude::*; -proptest! { - #[test] - fn test_order(elems: Vec) { - let mut buf = DiscardingRingBuffer::new(); + proptest! { + #[test] + fn test_order(elems: Vec) { + let mut buf = DiscardingRingBuffer::new(); - for e in &elems { - buf.push(e); - } + for e in &elems { + buf.push(e); + } - } -} \ No newline at end of file + } + } +} diff --git a/src/router/device.rs b/src/router/device.rs index 67702cd..1296be6 100644 --- a/src/router/device.rs +++ b/src/router/device.rs @@ -1,39 +1,69 @@ -use std::net::SocketAddr; +use arraydeque::{ArrayDeque, Wrapping}; +use treebitmap::IpLookupTable; + +use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::sync::atomic::{AtomicPtr, AtomicU64}; +use std::sync::{Arc, Mutex}; +use std::time::Instant; + use super::super::types::KeyPair; +use super::anti_replay::AntiReplay; + +const MAX_STAGED_PACKETS: usize = 128; pub struct Device { + ipv4: IpLookupTable>, + ipv6: IpLookupTable>, +} + +struct KeyState(KeyPair, AntiReplay); +struct EncryptState { + key: [u8; 32], // encryption key + id: u64, // sender id + nonce: AtomicU64, // next available nonce + death: Instant, // can must the key no longer be used: + // (birth + reject-after-time - keepalive-timeout - rekey-timeout) } -pub struct Peer { +struct KeyWheel { + next: AtomicPtr>>, // next key state (unconfirmed) + current: AtomicPtr>>, // current key state (used for encryption) + previous: AtomicPtr>>, // old key state (used for decryption) +} +pub struct Peer { + staged_packets: Mutex; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake + rx_bytes: AtomicU64, // received bytes + tx_bytes: AtomicU64, // transmitted bytes + keys: KeyWheel, // key-wheel + encryption: AtomicPtr>, // current encryption key (starts expired) } -pub struct PeerRef {} +pub struct PeerRef(); impl Device { - pub fn new() -> Device { unimplemented!(); } /// Adds a new peer to the device - /// + /// /// # Returns - /// + /// /// An opaque value representing the peer. pub fn add(&self) -> PeerRef { unimplemented!(); } /// Cryptkey routes and sends a plaintext message (IP packet) - /// + /// /// # Arguments - /// + /// /// - pt_msg: IP packet to cryptkey route - /// + /// /// # Returns - /// + /// /// A peer reference for the peer if no key-pair is currently valid for the destination. /// This indicates that a handshake should be initated (see the handshake module). /// If this occurs the packet is copied to an internal buffer @@ -45,31 +75,30 @@ impl Device { /// Sends a message directly to the peer. /// The router device takes care of discovering/managing the endpoint. /// This is used for handshake initiation/response messages - /// + /// /// # Arguments - /// + /// /// - peer: Reference to the destination peer /// - msg: Message to transmit pub fn send_raw(&self, peer: PeerRef, msg: &mut [u8]) { unimplemented!(); } - /// Flush the queue of buffered messages awaiting transmission - /// + /// /// # Arguments - /// + /// /// - peer: Reference for the peer to flush pub fn flush_queue(&self, peer: PeerRef) { unimplemented!(); } - + /// Attempt to route, encrypt and send all elements buffered in the queue - /// + /// /// # Arguments - /// + /// /// # Returns - /// + /// /// A boolean indicating whether packages where sent. /// Note: This is used for implicit confirmation of handshakes. pub fn send_run_queue(&self, peer: PeerRef) -> bool { @@ -77,22 +106,22 @@ impl Device { } /// Receive an encrypted transport message - /// + /// /// # Arguments - /// + /// /// - ct_msg: Encrypted transport message pub fn recv(&self, ct_msg: &mut [u8]) { unimplemented!(); } /// Returns the current endpoint known for the peer - /// + /// /// # Arguments - /// + /// /// - peer: The peer to retrieve the endpoint for pub fn get_endpoint(&self, peer: PeerRef) -> SocketAddr { unimplemented!(); - } + } pub fn set_endpoint(&self, peer: PeerRef, endpoint: SocketAddr) { unimplemented!(); @@ -101,4 +130,4 @@ impl Device { pub fn new_keypair(&self, peer: PeerRef, keypair: KeyPair) { unimplemented!(); } -} \ No newline at end of file +} diff --git a/src/router/mod.rs b/src/router/mod.rs index 4d29220..646b03b 100644 --- a/src/router/mod.rs +++ b/src/router/mod.rs @@ -1,2 +1,4 @@ +mod anti_replay; mod buffer; -pub mod device; \ No newline at end of file + +pub mod device; -- cgit v1.2.3-59-g8ed1b