From 657a1ccb440cbca438da1c72257298131a017b61 Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Tue, 13 Aug 2019 19:42:05 +0200 Subject: Implement add_keypair semantics --- src/main.rs | 8 ++- src/router/device.rs | 150 ++++++++++++++++++++++++++++++++++++++++++--------- src/router/mod.rs | 3 +- 3 files changed, 133 insertions(+), 28 deletions(-) (limited to 'src') diff --git a/src/main.rs b/src/main.rs index e49a217..90ba496 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,12 +4,16 @@ mod handshake; mod router; mod types; -use sodiumoxide; +use std::sync::Arc; -use handshake::Device; +use sodiumoxide; use types::KeyPair; fn main() { // choose optimal crypto implementations for platform sodiumoxide::init().unwrap(); + + let mut rdev = router::Device::new(); + + let pref = rdev.add(); } diff --git a/src/router/device.rs b/src/router/device.rs index 1296be6..ec63111 100644 --- a/src/router/device.rs +++ b/src/router/device.rs @@ -1,24 +1,33 @@ -use arraydeque::{ArrayDeque, Wrapping}; +use arraydeque::{ArrayDeque, Saturating, Wrapping}; +use lifeguard::{Pool, Recycled}; use treebitmap::IpLookupTable; -use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; -use std::sync::atomic::{AtomicPtr, AtomicU64}; -use std::sync::{Arc, Mutex}; -use std::time::Instant; +use std::collections::HashMap; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::sync::atomic::{AtomicPtr, AtomicU64, Ordering}; +use std::sync::{Arc, Mutex, Weak}; +use std::time::{Duration, Instant}; + +use spin::RwLock; use super::super::types::KeyPair; use super::anti_replay::AntiReplay; +use std::u64; + +const REJECT_AFTER_MESSAGES: u64 = u64::MAX - (1 << 4); const MAX_STAGED_PACKETS: usize = 128; -pub struct Device { - ipv4: IpLookupTable>, - ipv6: IpLookupTable>, +pub struct Device<'a> { + recv: RwLock>>>, // map receiver id -> peer + ipv4: IpLookupTable>>, // ipv4 trie + ipv6: IpLookupTable>>, // ipv6 trie + pool: Pool>, // message buffer pool } struct KeyState(KeyPair, AntiReplay); -struct EncryptState { +struct EncryptionState { key: [u8; 32], // encryption key id: u64, // sender id nonce: AtomicU64, // next available nonce @@ -32,28 +41,119 @@ struct KeyWheel { previous: AtomicPtr>>, // old key state (used for decryption) } -pub struct Peer { +pub struct Peer<'a> { + inorder: Mutex>>; MAX_STAGED_PACKETS], Saturating>>, // inorder queue staged_packets: Mutex; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake rx_bytes: AtomicU64, // received bytes tx_bytes: AtomicU64, // transmitted bytes keys: KeyWheel, // key-wheel - encryption: AtomicPtr>, // current encryption key (starts expired) + ekey: AtomicPtr>, // encryption state + endpoint: AtomicPtr>>, } -pub struct PeerRef(); +impl<'a> Peer<'a> { + pub fn set_endpoint(&self, endpoint: SocketAddr) { + self.endpoint + .store(&mut Arc::new(Some(endpoint)), Ordering::Relaxed) + } -impl Device { - pub fn new() -> Device { - unimplemented!(); + pub fn add_keypair(&self, keypair: KeyPair) { + let confirmed = keypair.confirmed; + let mut st_new = Arc::new(Some(KeyState(keypair, AntiReplay::new()))); + let st_previous = self.keys.previous.load(Ordering::Relaxed); + if confirmed { + // previous <- current + self.keys.previous.compare_and_swap( + st_previous, + self.keys.current.load(Ordering::Relaxed), + Ordering::Relaxed, + ); + + // current <- new + self.keys.next.store(&mut st_new, Ordering::Relaxed) + } else { + // previous <- next + self.keys.previous.compare_and_swap( + st_previous, + self.keys.next.load(Ordering::Relaxed), + Ordering::Relaxed, + ); + + // next <- new + self.keys.next.store(&mut st_new, Ordering::Relaxed) + } + } + + pub fn rx_bytes(&self) -> u64 { + self.rx_bytes.load(Ordering::Relaxed) + } + + pub fn tx_bytes(&self) -> u64 { + self.tx_bytes.load(Ordering::Relaxed) + } +} + +impl<'a> Device<'a> { + pub fn new() -> Device<'a> { + Device { + recv: RwLock::new(HashMap::new()), + ipv4: IpLookupTable::new(), + ipv6: IpLookupTable::new(), + pool: Pool::with_size_and_max(0, MAX_STAGED_PACKETS * 2), + } + } + + pub fn subnets(&self, peer: Arc>) -> Vec<(IpAddr, u32)> { + let mut subnets = Vec::new(); + + // extract ipv4 entries + for subnet in self.ipv4.iter() { + let (ip, masklen, p) = subnet; + if Arc::ptr_eq(&peer, p) { + subnets.push((IpAddr::V4(ip), masklen)) + } + } + + // extract ipv6 entries + for subnet in self.ipv6.iter() { + let (ip, masklen, p) = subnet; + if Arc::ptr_eq(&peer, p) { + subnets.push((IpAddr::V6(ip), masklen)) + } + } + + subnets } /// Adds a new peer to the device /// /// # Returns /// - /// An opaque value representing the peer. - pub fn add(&self) -> PeerRef { - unimplemented!(); + /// A atomic ref. counted peer (with liftime matching the device) + pub fn add(&mut self) -> Arc> { + Arc::new(Peer { + inorder: Mutex::new(ArrayDeque::new()), + staged_packets: Mutex::new(ArrayDeque::new()), + rx_bytes: AtomicU64::new(0), + tx_bytes: AtomicU64::new(0), + keys: KeyWheel { + next: AtomicPtr::new(&mut Arc::new(None)), + current: AtomicPtr::new(&mut Arc::new(None)), + previous: AtomicPtr::new(&mut Arc::new(None)), + }, + // long expired encryption key + ekey: AtomicPtr::new(&mut Arc::new(EncryptionState { + key: [0u8; 32], + id: 0, + nonce: AtomicU64::new(REJECT_AFTER_MESSAGES), + death: Instant::now() - Duration::from_secs(31536000), + })), + endpoint: AtomicPtr::new(&mut Arc::new(None)), + }) + } + + pub fn get_buffer(&self) -> Recycled> { + self.pool.new() } /// Cryptkey routes and sends a plaintext message (IP packet) @@ -68,7 +168,7 @@ impl Device { /// This indicates that a handshake should be initated (see the handshake module). /// If this occurs the packet is copied to an internal buffer /// and retransmission can be attempted using send_run_queue - pub fn send(&self, pt_msg: &mut [u8]) -> Option { + pub fn send(&self, pt_msg: &mut [u8]) -> Arc { unimplemented!(); } @@ -80,7 +180,7 @@ impl Device { /// /// - peer: Reference to the destination peer /// - msg: Message to transmit - pub fn send_raw(&self, peer: PeerRef, msg: &mut [u8]) { + pub fn send_raw(&self, peer: Arc, msg: &mut [u8]) { unimplemented!(); } @@ -89,7 +189,7 @@ impl Device { /// # Arguments /// /// - peer: Reference for the peer to flush - pub fn flush_queue(&self, peer: PeerRef) { + pub fn flush_queue(&self, peer: Arc) { unimplemented!(); } @@ -101,7 +201,7 @@ impl Device { /// /// A boolean indicating whether packages where sent. /// Note: This is used for implicit confirmation of handshakes. - pub fn send_run_queue(&self, peer: PeerRef) -> bool { + pub fn send_run_queue(&self, peer: Arc) -> bool { unimplemented!(); } @@ -119,15 +219,15 @@ impl Device { /// # Arguments /// /// - peer: The peer to retrieve the endpoint for - pub fn get_endpoint(&self, peer: PeerRef) -> SocketAddr { + pub fn get_endpoint(&self, peer: Arc) -> SocketAddr { unimplemented!(); } - pub fn set_endpoint(&self, peer: PeerRef, endpoint: SocketAddr) { + pub fn set_endpoint(&self, peer: Arc, endpoint: SocketAddr) { unimplemented!(); } - pub fn new_keypair(&self, peer: PeerRef, keypair: KeyPair) { + pub fn new_keypair(&self, peer: Arc, keypair: KeyPair) { unimplemented!(); } } diff --git a/src/router/mod.rs b/src/router/mod.rs index 646b03b..41e38ba 100644 --- a/src/router/mod.rs +++ b/src/router/mod.rs @@ -1,4 +1,5 @@ mod anti_replay; mod buffer; +mod device; -pub mod device; +pub use device::Device; \ No newline at end of file -- cgit v1.2.3-59-g8ed1b