use std::collections::HashMap; use std::net::{Ipv4Addr, Ipv6Addr}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Weak}; use std::thread; use std::time::Instant; use crossbeam_deque::{Injector, Steal, Stealer, Worker}; use spin; use treebitmap::IpLookupTable; use super::super::types::{Bind, KeyPair, Tun}; use super::anti_replay::AntiReplay; use super::peer; use super::peer::{Peer, PeerInner}; use super::types::{Callback, KeyCallback, Opaque}; use super::workers::{worker_parallel, JobParallel}; pub struct DeviceInner, R: Callback, K: KeyCallback> { // threading and workers pub running: AtomicBool, // workers running? pub parked: AtomicBool, // any workers parked? pub injector: Injector, // parallel enc/dec task injector // unboxed callbacks (used for timers and handshake requests) pub event_send: S, // called when authenticated message send pub event_recv: R, // called when authenticated message received pub event_need_key: K, // called when new key material is required // routing pub recv: spin::RwLock>>, // receiver id -> decryption state pub ipv4: spin::RwLock>>>, // ipv4 cryptkey routing pub ipv6: spin::RwLock>>>, // ipv6 cryptkey routing } pub struct EncryptionState { pub key: [u8; 32], // encryption key pub id: u32, // sender id pub nonce: u64, // next available nonce pub death: Instant, // time when the key no longer can be used for encryption // (birth + reject-after-time - keepalive-timeout - rekey-timeout) } pub struct DecryptionState, R: Callback, K: KeyCallback> { pub key: [u8; 32], pub keypair: Weak, pub confirmed: AtomicBool, pub protector: spin::Mutex, pub peer: Weak>, pub death: Instant, // time when the key can no longer be used for decryption } pub struct Device, R: Callback, K: KeyCallback>( Arc>, Vec>, ); impl, R: Callback, K: KeyCallback> Drop for Device { fn drop(&mut self) { // mark device as stopped let device = &self.0; device.running.store(false, Ordering::SeqCst); // join all worker threads while match self.1.pop() { Some(handle) => { handle.thread().unpark(); handle.join().unwrap(); true } _ => false, } {} } } impl, R: Callback, K: KeyCallback> Device { pub fn new( num_workers: usize, event_recv: R, event_send: S, event_need_key: K, ) -> Device { // allocate shared device state let inner = Arc::new(DeviceInner { event_recv, event_send, event_need_key, parked: AtomicBool::new(false), running: AtomicBool::new(true), injector: Injector::new(), recv: spin::RwLock::new(HashMap::new()), ipv4: spin::RwLock::new(IpLookupTable::new()), ipv6: spin::RwLock::new(IpLookupTable::new()), }); // alloacate work pool resources let mut workers = Vec::with_capacity(num_workers); let mut stealers = Vec::with_capacity(num_workers); for _ in 0..num_workers { let w = Worker::new_fifo(); stealers.push(w.stealer()); workers.push(w); } // start worker threads let mut threads = Vec::with_capacity(num_workers); for _ in 0..num_workers { let device = inner.clone(); let stealers = stealers.clone(); let worker = workers.pop().unwrap(); threads.push(thread::spawn(move || { worker_parallel(device, worker, stealers) })); } // return exported device handle Device(inner, threads) } /// Adds a new peer to the device /// /// # Returns /// /// A atomic ref. counted peer (with liftime matching the device) pub fn new_peer(&self, opaque: T) -> Peer { peer::new_peer(self.0.clone(), opaque) } /// Cryptkey routes and sends a plaintext message (IP packet) /// /// # Arguments /// /// - pt_msg: IP packet to cryptkey route /// pub fn send(&self, pt_msg: &mut [u8]) { unimplemented!(); } /// Receive an encrypted transport message /// /// # Arguments /// /// - ct_msg: Encrypted transport message pub fn recv(&self, ct_msg: &mut [u8]) { unimplemented!(); } }