From 31ef3e287114228afeb4be47f96f60402f5b9001 Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Sun, 18 Aug 2019 15:44:20 +0200 Subject: Implemented keypair_confirm --- src/main.rs | 2 +- src/router/device.rs | 262 +++++++++++++++++++++++++++++++++------------------ 2 files changed, 173 insertions(+), 91 deletions(-) (limited to 'src') diff --git a/src/main.rs b/src/main.rs index 82a4b0c..272c5e2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,5 +16,5 @@ fn main() { let mut rdev = router::Device::new(8); - let pref = rdev.add(); + let pref = rdev.new_peer(); } diff --git a/src/router/device.rs b/src/router/device.rs index 4dd6539..8f3d485 100644 --- a/src/router/device.rs +++ b/src/router/device.rs @@ -6,7 +6,7 @@ use crossbeam_deque::{Injector, Steal}; use std::collections::HashMap; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; -use std::sync::mpsc::SyncSender; +use std::sync::mpsc::{sync_channel, SyncSender}; use std::sync::{Arc, Mutex, Weak}; use std::thread; use std::time::Instant; @@ -37,11 +37,11 @@ struct PeerInner { thread_inbound: spin::Mutex>, inorder_outbound: SyncSender<()>, inorder_inbound: SyncSender<()>, - staged_packets: Mutex; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake - rx_bytes: AtomicU64, // received bytes - tx_bytes: AtomicU64, // transmitted bytes - keys: spin::Mutex, // key-wheel - ekey: spin::Mutex>, // encryption state + staged_packets: spin::Mutex; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake + rx_bytes: AtomicU64, // received bytes + tx_bytes: AtomicU64, // transmitted bytes + keys: spin::Mutex, // key-wheel + ekey: spin::Mutex>, // encryption state endpoint: spin::Mutex>>, } @@ -55,22 +55,24 @@ struct EncryptionState { struct DecryptionState { key: [u8; 32], - protector: Arc>, + // keypair: Weak, + protector: spin::Mutex, peer: Weak, death: Instant, // time when the key can no longer be used for decryption } struct KeyWheel { - next: Option, // next key state (unconfirmed) - current: Option, // current key state (used for encryption) - previous: Option, // old key state (used for decryption) + next: Option>, // next key state (unconfirmed) + current: Option>, // current key state (used for encryption) + previous: Option>, // old key state (used for decryption) + retired: Option, // retired id (previous id, after confirming key-pair) } pub struct Peer(Arc); -pub struct Device(DeviceInner); +pub struct Device(Arc); fn treebit_list( - peer: &Peer, + peer: &Arc, table: &spin::RwLock>>, callback: Box R>, ) -> Vec @@ -81,7 +83,7 @@ where for subnet in table.read().iter() { let (ip, masklen, p) = subnet; if let Some(p) = p.upgrade() { - if Arc::ptr_eq(&p, &peer.0) { + if Arc::ptr_eq(&p, &peer) { res.push(callback(ip, masklen)) } } @@ -116,10 +118,12 @@ where impl Drop for Peer { fn drop(&mut self) { // mark peer as stopped + let peer = &self.0; peer.stopped.store(true, Ordering::SeqCst); // remove from cryptkey router + treebit_remove(self, &peer.device.ipv4); treebit_remove(self, &peer.device.ipv6); @@ -127,15 +131,16 @@ impl Drop for Peer { peer.thread_inbound.lock().thread().unpark(); peer.thread_outbound.lock().thread().unpark(); - // collect ids to release + + // release ids from the receiver map + let mut keys = peer.keys.lock(); let mut release = Vec::with_capacity(3); - keys.next.map(|k| release.push(k.recv.id)); - keys.current.map(|k| release.push(k.recv.id)); - keys.previous.map(|k| release.push(k.recv.id)); + keys.next.as_ref().map(|k| release.push(k.recv.id)); + keys.current.as_ref().map(|k| release.push(k.recv.id)); + keys.previous.as_ref().map(|k| release.push(k.recv.id)); - // remove from receive id map if release.len() > 0 { let mut recv = peer.device.recv.write(); for id in &release { @@ -170,23 +175,73 @@ impl Drop for Device { } } -impl Peer { - pub fn set_endpoint(&self, endpoint: SocketAddr) { - *self.0.endpoint.lock() = Some(Arc::new(endpoint)) - } +impl PeerInner { + pub fn keypair_confirm(&self, kp: Weak) { + let mut keys = self.keys.lock(); + + // Attempt to upgrade Weak -> Arc + // (this should ensure that the key is in the key-wheel, + // which holds the only strong reference) + let kp = match kp.upgrade() { + Some(kp) => kp, + None => { + return; + } + }; + + debug_assert!( + keys.retired.is_none(), + "retired spot is not free for previous" + ); - pub fn keypair_confirm(&self, ks: Arc) { - *self.0.ekey.lock() = Some(EncryptionState { - id: ks.send.id, - key: ks.send.key, + debug_assert!( + if let Some(key) = &keys.next { + Arc::ptr_eq(&kp, &key) + } else { + false + }, + "if next has been overwritten, before confirmation, the key-pair should have been dropped!" + ); + + // enable use for encryption and set confirmed + *self.ekey.lock() = Some(EncryptionState { + id: kp.send.id, + key: kp.send.key, nonce: 0, - death: ks.birth + REJECT_AFTER_TIME, + death: kp.birth + REJECT_AFTER_TIME, }); + + // rotate the key-wheel + let release = keys.previous.as_ref().map(|k| k.recv.id); + keys.previous = keys.current.as_ref().map(|v| v.clone()); + keys.current = Some(kp.clone()); + keys.retired = release; } +} - fn keypair_add(&self, new: KeyPair) -> Option { +/// Public interface and handle to the peer +impl Peer { + pub fn set_endpoint(&self, endpoint: SocketAddr) { + *self.0.endpoint.lock() = Some(Arc::new(endpoint)) + } + + /// Add a new keypair + /// + /// # Arguments + /// + /// - new: The new confirmed/unconfirmed key pair + /// + /// # Returns + /// + /// A vector of ids which has been released. + /// These should be released in the handshake module. + pub fn add_keypair(&self, new: KeyPair) -> Vec { let mut keys = self.0.keys.lock(); - let release = keys.previous.map(|k| k.recv.id); + let mut release = Vec::with_capacity(2); + + // collect ids to be released + keys.retired.map(|v| release.push(v)); + keys.previous.as_ref().map(|k| release.push(k.recv.id)); // update key-wheel if new.confirmed { @@ -199,14 +254,37 @@ impl Peer { }); // move current into previous - keys.previous = keys.current; - keys.current = Some(new); + keys.previous = keys.current.as_ref().map(|v| v.clone());; + keys.current = Some(Arc::new(new)); } else { // store the key and await confirmation - keys.previous = keys.next; - keys.next = Some(new); + keys.previous = keys.next.as_ref().map(|v| v.clone());; + keys.next = Some(Arc::new(new)); }; + // update incoming packet id map + { + let mut recv = self.0.device.recv.write(); + + // purge recv map of released ids + for id in &release { + recv.remove(&id); + } + + // map new id to keypair + debug_assert!(!recv.contains_key(&new.recv.id)); + + recv.insert( + new.recv.id, + DecryptionState { + key: new.recv.key, + protector: spin::Mutex::new(AntiReplay::new()), + peer: Arc::downgrade(&self.0), + death: new.birth + REJECT_AFTER_TIME, + }, + ); + } + // return the released id (for handshake state machine) release } @@ -218,77 +296,52 @@ impl Peer { pub fn tx_bytes(&self) -> u64 { self.0.tx_bytes.load(Ordering::Relaxed) } -} -impl Device { - pub fn new(workers: usize) -> Device { - Device(DeviceInner { - threads: vec![], - stopped: AtomicBool::new(false), - injector: Injector::new(), - recv: spin::RwLock::new(HashMap::new()), - ipv4: spin::RwLock::new(IpLookupTable::new()), - ipv6: spin::RwLock::new(IpLookupTable::new()), - }) - } - - pub fn add_subnet(&mut self, ip: IpAddr, masklen: u32, peer: Peer) { + pub fn add_subnet(&self, ip: IpAddr, masklen: u32) { match ip { - IpAddr::V4(v4) => self - .0 - .ipv4 - .write() - .insert(v4, masklen, Arc::downgrade(&peer.0)), - IpAddr::V6(v6) => self - .0 - .ipv6 - .write() - .insert(v6, masklen, Arc::downgrade(&peer.0)), + IpAddr::V4(v4) => { + self.0 + .device + .ipv4 + .write() + .insert(v4, masklen, Arc::downgrade(&self.0)) + } + IpAddr::V6(v6) => { + self.0 + .device + .ipv6 + .write() + .insert(v6, masklen, Arc::downgrade(&self.0)) + } }; } - pub fn list_subnets(&self, peer: Peer) -> Vec<(IpAddr, u32)> { + pub fn list_subnets(&self) -> Vec<(IpAddr, u32)> { let mut res = Vec::new(); res.append(&mut treebit_list( - &peer, - &self.0.ipv4, + &self.0, + &self.0.device.ipv4, Box::new(|ip, masklen| (IpAddr::V4(ip), masklen)), )); res.append(&mut treebit_list( - &peer, - &self.0.ipv6, + &self.0, + &self.0.device.ipv6, Box::new(|ip, masklen| (IpAddr::V6(ip), masklen)), )); res } +} - pub fn keypair_add(&self, peer: Peer, new: KeyPair) -> Option { - // update key-wheel of peer - let release = peer.keypair_add(new); - - // update incoming packet id map - let mut recv = self.0.recv.write(); - - // release id of previous keypair - if let Some(id) = release { - debug_assert!(recv.contains_key(&id)); - recv.remove(&id); - }; - - // map new id to keypair - debug_assert!(!recv.contains_key(&new.recv.id)); - - recv.insert( - new.recv.id, - DecryptionState { - key: new.recv.key, - protector: Arc::new(spin::Mutex::new(AntiReplay::new())), - peer: Arc::downgrade(&peer.0), - death: new.birth + REJECT_AFTER_TIME, - }, - ); - - release +impl Device { + pub fn new(workers: usize) -> Device { + Device(Arc::new(DeviceInner { + threads: vec![], + stopped: AtomicBool::new(false), + injector: Injector::new(), + recv: spin::RwLock::new(HashMap::new()), + ipv4: spin::RwLock::new(IpLookupTable::new()), + ipv6: spin::RwLock::new(IpLookupTable::new()), + })) } /// Adds a new peer to the device @@ -296,7 +349,36 @@ impl Device { /// # Returns /// /// A atomic ref. counted peer (with liftime matching the device) - pub fn add(&mut self) -> () {} + pub fn new_peer(&self) -> Peer { + // spawn inbound thread + let (send_inbound, recv_inbound) = sync_channel(1); + let handle_inbound = thread::spawn(move || {}); + + // spawn outbound thread + let (send_outbound, recv_inbound) = sync_channel(1); + let handle_outbound = thread::spawn(move || {}); + + // allocate peer object + Peer(Arc::new(PeerInner { + stopped: AtomicBool::new(false), + device: self.0.clone(), + ekey: spin::Mutex::new(None), + endpoint: spin::Mutex::new(None), + inorder_inbound: send_inbound, + inorder_outbound: send_outbound, + keys: spin::Mutex::new(KeyWheel { + next: None, + current: None, + previous: None, + retired: None, + }), + rx_bytes: AtomicU64::new(0), + tx_bytes: AtomicU64::new(0), + staged_packets: spin::Mutex::new(ArrayDeque::new()), + thread_inbound: spin::Mutex::new(handle_inbound), + thread_outbound: spin::Mutex::new(handle_outbound), + })) + } /// Cryptkey routes and sends a plaintext message (IP packet) /// -- cgit v1.2.3-59-g8ed1b