diff options
Diffstat (limited to '')
-rw-r--r-- | src/handshake/noise.rs | 4 | ||||
-rw-r--r-- | src/router/anti_replay.rs | 23 | ||||
-rw-r--r-- | src/router/device.rs | 235 | ||||
-rw-r--r-- | src/router/inbound.rs | 52 | ||||
-rw-r--r-- | src/router/mod.rs | 2 | ||||
-rw-r--r-- | src/router/outbound.rs | 32 | ||||
-rw-r--r-- | src/types/mod.rs | 7 |
7 files changed, 253 insertions, 102 deletions
diff --git a/src/handshake/noise.rs b/src/handshake/noise.rs index 4eea627..6a1eb6d 100644 --- a/src/handshake/noise.rs +++ b/src/handshake/noise.rs @@ -23,6 +23,8 @@ use super::types::*; use crate::types::{Key, KeyPair}; +use std::time::Instant; + // HMAC hasher (generic construction) type HMACBlake2s = Hmac<Blake2s>; @@ -388,6 +390,7 @@ pub fn create_response<T: Copy, R: RngCore + CryptoRng>( // return unconfirmed key-pair Ok(KeyPair { + birth: Instant::now(), confirmed: false, send: Key { id: sender, @@ -462,6 +465,7 @@ pub fn consume_response<T: Copy>( Some(peer.identifier), // proves overship of the public key (e.g. for updating the endpoint) None, // no response message Some(KeyPair { + birth: Instant::now(), confirmed: true, send: Key { id: sender, diff --git a/src/router/anti_replay.rs b/src/router/anti_replay.rs index 5d898ac..b0838bd 100644 --- a/src/router/anti_replay.rs +++ b/src/router/anti_replay.rs @@ -100,12 +100,12 @@ impl AntiReplay { /// /// Ok(()) if sequence number is valid (not marked and not behind the moving window). /// Err if the sequence number is invalid (already marked or "too old"). - pub fn update(&mut self, seq: u64) -> Result<(), ()> { + pub fn update(&mut self, seq: u64) -> bool { if self.check(seq) { self.update_store(seq); - Ok(()) + true } else { - Err(()) + false } } } @@ -119,35 +119,36 @@ mod tests { let mut ar = AntiReplay::new(); for i in 0..20000 { - ar.update(i).unwrap(); + assert!(ar.update(i)); } for i in (0..20000).rev() { assert!(!ar.check(i)); } - ar.update(65536).unwrap(); + assert!(ar.update(65536)); for i in (65536 - WINDOW_SIZE)..65535 { - ar.update(i).unwrap(); + assert!(ar.update(i)); } + for i in (65536 - 10 * WINDOW_SIZE)..65535 { assert!(!ar.check(i)); } - ar.update(66000).unwrap(); + assert!(ar.update(66000)); for i in 65537..66000 { - ar.update(i).unwrap(); + assert!(ar.update(i)); } for i in 65537..66000 { - assert!(ar.update(i).is_err()); + assert_eq!(ar.update(i), false); } // Test max u64. let next = u64::max_value(); - ar.update(next).unwrap(); + assert!(ar.update(next)); assert!(!ar.check(next)); for i in (next - WINDOW_SIZE)..next { - ar.update(i).unwrap(); + assert!(ar.update(i)); } for i in (next - 20 * WINDOW_SIZE)..next { assert!(!ar.check(i)); diff --git a/src/router/device.rs b/src/router/device.rs index ec63111..3b29312 100644 --- a/src/router/device.rs +++ b/src/router/device.rs @@ -1,14 +1,17 @@ use arraydeque::{ArrayDeque, Saturating, Wrapping}; -use lifeguard::{Pool, Recycled}; use treebitmap::IpLookupTable; use std::collections::HashMap; +use std::error::Error; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; -use std::sync::atomic::{AtomicPtr, AtomicU64, Ordering}; +use std::ptr; +use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU64, Ordering}; +use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; use std::sync::{Arc, Mutex, Weak}; +use std::thread; use std::time::{Duration, Instant}; -use spin::RwLock; +use spin; use super::super::types::KeyPair; use super::anti_replay::AntiReplay; @@ -18,143 +21,197 @@ use std::u64; const REJECT_AFTER_MESSAGES: u64 = u64::MAX - (1 << 4); const MAX_STAGED_PACKETS: usize = 128; -pub struct Device<'a> { - recv: RwLock<HashMap<u32, Arc<Peer<'a>>>>, // map receiver id -> peer - ipv4: IpLookupTable<Ipv4Addr, Arc<Peer<'a>>>, // ipv4 trie - ipv6: IpLookupTable<Ipv6Addr, Arc<Peer<'a>>>, // ipv6 trie - pool: Pool<Vec<u8>>, // message buffer pool +pub struct Device { + recv: spin::RwLock<HashMap<u32, DecryptionState>>, + ipv4: IpLookupTable<Ipv4Addr, Weak<PeerInner>>, + ipv6: IpLookupTable<Ipv6Addr, Weak<PeerInner>>, } -struct KeyState(KeyPair, AntiReplay); - struct EncryptionState { - key: [u8; 32], // encryption key - id: u64, // sender id - nonce: AtomicU64, // next available nonce - death: Instant, // can must the key no longer be used: - // (birth + reject-after-time - keepalive-timeout - rekey-timeout) + key: [u8; 32], // encryption key + id: u32, // sender id + nonce: u64, // next available nonce + death: Instant, // time when the key no longer can be used for encryption + // (birth + reject-after-time - keepalive-timeout - rekey-timeout) +} + +struct DecryptionState { + key: [u8; 32], + protector: Arc<spin::Mutex<AntiReplay>>, + peer: Weak<PeerInner>, + death: Instant, // time when the key can no longer be used for decryption } struct KeyWheel { - next: AtomicPtr<Arc<Option<KeyState>>>, // next key state (unconfirmed) - current: AtomicPtr<Arc<Option<KeyState>>>, // current key state (used for encryption) - previous: AtomicPtr<Arc<Option<KeyState>>>, // old key state (used for decryption) + next: Option<KeyPair>, // next key state (unconfirmed) + current: Option<KeyPair>, // current key state (used for encryption) + previous: Option<KeyPair>, // old key state (used for decryption) } -pub struct Peer<'a> { - inorder: Mutex<ArrayDeque<[Option<Recycled<'a, Vec<u8>>>; MAX_STAGED_PACKETS], Saturating>>, // inorder queue +struct PeerInner { + inorder_outbound: SyncSender<()>, + inorder_inbound: SyncSender<()>, staged_packets: Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake rx_bytes: AtomicU64, // received bytes tx_bytes: AtomicU64, // transmitted bytes - keys: KeyWheel, // key-wheel - ekey: AtomicPtr<Arc<EncryptionState>>, // encryption state - endpoint: AtomicPtr<Arc<Option<SocketAddr>>>, + keys: spin::Mutex<KeyWheel>, // key-wheel + ekey: spin::Mutex<EncryptionState>, // encryption state + endpoint: spin::Mutex<Option<Arc<SocketAddr>>>, +} + +pub struct Peer(Arc<PeerInner>); + +impl Drop for Peer { + fn drop(&mut self) { + // stop threads and remove peer from device + } +} + +impl Drop for Device { + fn drop(&mut self) { + // stop threads + } } -impl<'a> Peer<'a> { - pub fn set_endpoint(&self, endpoint: SocketAddr) { - self.endpoint - .store(&mut Arc::new(Some(endpoint)), Ordering::Relaxed) - } - - pub fn add_keypair(&self, keypair: KeyPair) { - let confirmed = keypair.confirmed; - let mut st_new = Arc::new(Some(KeyState(keypair, AntiReplay::new()))); - let st_previous = self.keys.previous.load(Ordering::Relaxed); - if confirmed { - // previous <- current - self.keys.previous.compare_and_swap( - st_previous, - self.keys.current.load(Ordering::Relaxed), - Ordering::Relaxed, - ); - - // current <- new - self.keys.next.store(&mut st_new, Ordering::Relaxed) +impl Peer { + fn set_endpoint(&self, endpoint: SocketAddr) { + *self.0.endpoint.lock() = Some(Arc::new(endpoint)) + } + + pub fn keypair_confirm(&self, ks: Arc<KeyPair>) { + *self.0.ekey.lock() = EncryptionState { + id: ks.send.id, + key: ks.send.key, + nonce: 0, + death: ks.birth + Duration::from_millis(1337), // todo + }; + } + + pub fn keypair_add(&self, new: KeyPair) -> Option<u32> { + let mut keys = self.0.keys.lock(); + let release = keys.previous.map(|k| k.recv.id); + + // update key-wheel + if new.confirmed { + // start using key for encryption + *self.0.ekey.lock() = EncryptionState { + id: new.send.id, + key: new.send.key, + nonce: 0, + death: new.birth + Duration::from_millis(1337), // todo + }; + + // move current into previous + keys.previous = keys.current; + keys.current = Some(new); } else { - // previous <- next - self.keys.previous.compare_and_swap( - st_previous, - self.keys.next.load(Ordering::Relaxed), - Ordering::Relaxed, - ); - - // next <- new - self.keys.next.store(&mut st_new, Ordering::Relaxed) - } + // store the key and await confirmation + keys.previous = keys.next; + keys.next = Some(new); + }; + + // return the released id (for handshake state machine) + release } pub fn rx_bytes(&self) -> u64 { - self.rx_bytes.load(Ordering::Relaxed) + self.0.rx_bytes.load(Ordering::Relaxed) } pub fn tx_bytes(&self) -> u64 { - self.tx_bytes.load(Ordering::Relaxed) + self.0.tx_bytes.load(Ordering::Relaxed) } } -impl<'a> Device<'a> { - pub fn new() -> Device<'a> { +impl Device { + pub fn new() -> Device { Device { - recv: RwLock::new(HashMap::new()), + recv: spin::RwLock::new(HashMap::new()), ipv4: IpLookupTable::new(), ipv6: IpLookupTable::new(), - pool: Pool::with_size_and_max(0, MAX_STAGED_PACKETS * 2), } } - pub fn subnets(&self, peer: Arc<Peer<'a>>) -> Vec<(IpAddr, u32)> { + pub fn release(&self, id: u32) { + debug_assert!( + if let Some(_) = self.recv.read().get(&id) { + true + } else { + false + }, + true + ); + self.recv.write().remove(&id); + } + + pub fn add_subnet(&mut self, ip: IpAddr, masklen: u32, peer: Peer) { + match ip { + IpAddr::V4(v4) => self.ipv4.insert(v4, masklen, Arc::downgrade(&peer.0)), + IpAddr::V6(v6) => self.ipv6.insert(v6, masklen, Arc::downgrade(&peer.0)), + }; + } + + pub fn subnets(&self, peer: Peer) -> Vec<(IpAddr, u32)> { let mut subnets = Vec::new(); // extract ipv4 entries for subnet in self.ipv4.iter() { let (ip, masklen, p) = subnet; - if Arc::ptr_eq(&peer, p) { - subnets.push((IpAddr::V4(ip), masklen)) + if let Some(p) = p.upgrade() { + if Arc::ptr_eq(&p, &peer.0) { + subnets.push((IpAddr::V4(ip), masklen)) + } } } // extract ipv6 entries for subnet in self.ipv6.iter() { let (ip, masklen, p) = subnet; - if Arc::ptr_eq(&peer, p) { - subnets.push((IpAddr::V6(ip), masklen)) + if let Some(p) = p.upgrade() { + if Arc::ptr_eq(&p, &peer.0) { + subnets.push((IpAddr::V6(ip), masklen)) + } } } subnets } + pub fn keypair_add(&self, peer: Peer, new: KeyPair) -> Option<u32> { + // update key-wheel of peer + let release = peer.keypair_add(new); + + // update incoming packet id map + let mut recv = self.recv.write(); + + // release id of previous keypair + if let Some(id) = release { + debug_assert!(recv.contains_key(&id)); + recv.remove(&id); + }; + + // map new id to keypair + debug_assert!(!recv.contains_key(&new.recv.id)); + + recv.insert( + new.recv.id, + DecryptionState { + key: new.recv.key, + protector: Arc::new(spin::Mutex::new(AntiReplay::new())), + peer: Arc::downgrade(&peer.0), + death: new.birth + Duration::from_millis(2600), // todo + }, + ); + + release + } + /// Adds a new peer to the device /// /// # Returns /// /// A atomic ref. counted peer (with liftime matching the device) - pub fn add(&mut self) -> Arc<Peer<'a>> { - Arc::new(Peer { - inorder: Mutex::new(ArrayDeque::new()), - staged_packets: Mutex::new(ArrayDeque::new()), - rx_bytes: AtomicU64::new(0), - tx_bytes: AtomicU64::new(0), - keys: KeyWheel { - next: AtomicPtr::new(&mut Arc::new(None)), - current: AtomicPtr::new(&mut Arc::new(None)), - previous: AtomicPtr::new(&mut Arc::new(None)), - }, - // long expired encryption key - ekey: AtomicPtr::new(&mut Arc::new(EncryptionState { - key: [0u8; 32], - id: 0, - nonce: AtomicU64::new(REJECT_AFTER_MESSAGES), - death: Instant::now() - Duration::from_secs(31536000), - })), - endpoint: AtomicPtr::new(&mut Arc::new(None)), - }) - } - - pub fn get_buffer(&self) -> Recycled<Vec<u8>> { - self.pool.new() - } + pub fn add(&mut self) -> () {} /// Cryptkey routes and sends a plaintext message (IP packet) /// diff --git a/src/router/inbound.rs b/src/router/inbound.rs new file mode 100644 index 0000000..eba63c4 --- /dev/null +++ b/src/router/inbound.rs @@ -0,0 +1,52 @@ +use std::thread; +use spin; +use lifeguard::Recycled; +use super::anti_replay::AntiReplay; +use std::sync::mpsc::{Receiver, sync_channel}; +use std::sync::Arc; + +struct ParallelJobInner { + done : bool, + msg : Vec<u8>, + key : [u8; 32] +} + +type ParallelJob = spin::Mutex<ParallelJobInner>; + +struct InboundInorder { + job : Arc<ParallelJob>, + state : Arc<KeyState>, +} + +struct Inorder<'a> (Arc<spin::Mutex<Option<Job<'a>>>>); + +struct Job<'a> { + msg : Recycled<'a, Vec<u8>>, + arp : Arc<KeyState>, // replay protector and key-pair + key : Option<(Arc<Peer>, Arc<KeyPair>)> // provided if the key has not been confirmed +} + +fn worker_inorder<'a>(channel : Receiver<Inorder<'a>>) { + let mut current = 0; + + // reads from inorder channel + for ordered in channel.recv().iter() { + + loop { + // check if job is complete + match ordered.0.try_lock() { + None => (), + Some(guard) => if let Some(job) = *guard { + if job.arp.lock().update(6) { + // write to output + + break; + } + } + } + + // wait for job to complete + thread::park(); + } + } +}
\ No newline at end of file diff --git a/src/router/mod.rs b/src/router/mod.rs index 41e38ba..b53b10c 100644 --- a/src/router/mod.rs +++ b/src/router/mod.rs @@ -1,5 +1,7 @@ mod anti_replay; mod buffer; mod device; +// mod inbound; +// mod outbound; pub use device::Device;
\ No newline at end of file diff --git a/src/router/outbound.rs b/src/router/outbound.rs new file mode 100644 index 0000000..cc6a4bf --- /dev/null +++ b/src/router/outbound.rs @@ -0,0 +1,32 @@ +use spin; +use std::thread; +use std::sync::Arc; +use std::sync::mpsc::{Receiver, sync_channel}; + +struct JobInner { + done : bool, // is encryption complete? + msg : Vec<u8>, // transport message (id, nonce already set) + key : [u8; 32], // encryption key + handle : thread::JoinHandle +} + +type Job = Arc<spin::Mutex<JobInner>>; + +fn worker_parallel() + +fn worker_inorder(channel : Receiver<Job>) { + for ordered in channel.recv().iter() { + loop { + // check if job is complete + match ordered.try_lock() { + None => (), + Some(guard) => if guard.done { + // write to UDP interface + } + } + + // wait for job to complete + thread::park(); + } + } +}
\ No newline at end of file diff --git a/src/types/mod.rs b/src/types/mod.rs index ac6a307..ea7c570 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -1,8 +1,10 @@ +use std::time::Instant; + /* This file holds types passed between components. * Whenever a type cannot be held local to a single module. */ -#[derive(Debug)] +#[derive(Debug, Clone, Copy)] pub struct Key { pub key: [u8; 32], pub id: u32, @@ -15,8 +17,9 @@ impl PartialEq for Key { } } -#[derive(Debug)] +#[derive(Debug, Clone, Copy)] pub struct KeyPair { + pub birth: Instant, // when was the key-pair created pub confirmed: bool, // has the key-pair been confirmed? pub send: Key, // key for outbound messages pub recv: Key, // key for inbound messages |