use std::cmp; use std::collections::HashMap; use std::net::{Ipv4Addr, Ipv6Addr}; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::{Arc, Weak}; use std::thread; use std::time::Instant; use parking_lot::{Condvar, Mutex}; use crossbeam_deque::{Injector, Worker}; use spin; use treebitmap::IpLookupTable; use super::super::types::{Bind, KeyPair, Tun}; use super::anti_replay::AntiReplay; use super::peer; use super::peer::{Peer, PeerInner}; use super::SIZE_MESSAGE_PREFIX; use super::messages::TYPE_TRANSPORT; use super::types::{Callback, Callbacks, KeyCallback, Opaque, PhantomCallbacks, RouterError}; use super::workers::{worker_parallel, JobParallel}; // minimum sizes for IP headers const SIZE_IP4_HEADER: usize = 16; const SIZE_IP6_HEADER: usize = 36; const VERSION_IP4: u8 = 4; const VERSION_IP6: u8 = 6; const OFFSET_IP4_DST: usize = 16; const OFFSET_IP6_DST: usize = 24; pub struct DeviceInner { // IO & timer generics pub tun: T, pub bind: B, pub call_recv: C::CallbackRecv, pub call_send: C::CallbackSend, pub call_need_key: C::CallbackKey, // threading and workers pub waker: (Mutex<()>, Condvar), pub running: AtomicBool, // workers running? pub injector: Injector>, // parallel enc/dec task injector // routing pub recv: spin::RwLock>>, // receiver id -> decryption state pub ipv4: spin::RwLock>>>, // ipv4 cryptkey routing pub ipv6: spin::RwLock>>>, // ipv6 cryptkey routing } pub struct EncryptionState { pub key: [u8; 32], // encryption key pub id: u32, // receiver id pub nonce: u64, // next available nonce pub death: Instant, // (birth + reject-after-time - keepalive-timeout - rekey-timeout) } pub struct DecryptionState { pub key: [u8; 32], pub keypair: Weak, pub confirmed: AtomicBool, pub protector: spin::Mutex, pub peer: Weak>, pub death: Instant, // time when the key can no longer be used for decryption } pub struct Device( Arc>, // reference to device state Vec>, // join handles for workers ); impl Drop for Device { fn drop(&mut self) { // mark device as stopped let device = &self.0; device.running.store(false, Ordering::SeqCst); // join all worker threads while match self.1.pop() { Some(handle) => { handle.thread().unpark(); handle.join().unwrap(); true } _ => false, } {} } } impl, S: Callback, K: KeyCallback, T: Tun, B: Bind> Device, T, B> { pub fn new( num_workers: usize, tun: T, bind: B, call_recv: R, call_send: S, call_need_key: K, ) -> Device, T, B> { // allocate shared device state let inner = Arc::new(DeviceInner { tun, bind, call_recv, call_send, call_need_key, waker: (Mutex::new(()), Condvar::new()), running: AtomicBool::new(true), injector: Injector::new(), recv: spin::RwLock::new(HashMap::new()), ipv4: spin::RwLock::new(IpLookupTable::new()), ipv6: spin::RwLock::new(IpLookupTable::new()), }); // allocate work pool resources let mut workers = Vec::with_capacity(num_workers); let mut stealers = Vec::with_capacity(num_workers); for _ in 0..num_workers { let w = Worker::new_fifo(); stealers.push(w.stealer()); workers.push(w); } // start worker threads let mut threads = Vec::with_capacity(num_workers); for _ in 0..num_workers { let device = inner.clone(); let stealers = stealers.clone(); let worker = workers.pop().unwrap(); threads.push(thread::spawn(move || { worker_parallel(device, worker, stealers) })); } // return exported device handle Device(inner, threads) } } impl Device { /// Adds a new peer to the device /// /// # Returns /// /// A atomic ref. counted peer (with liftime matching the device) pub fn new_peer(&self, opaque: C::Opaque) -> Peer { peer::new_peer(self.0.clone(), opaque) } /// Cryptkey routes and sends a plaintext message (IP packet) /// /// # Arguments /// /// - pt_msg: IP packet to cryptkey route /// pub fn send(&self, msg: Vec) -> Result<(), RouterError> { // ensure that the type field access is within bounds if msg.len() < cmp::min(SIZE_IP4_HEADER, SIZE_IP6_HEADER) + SIZE_MESSAGE_PREFIX { return Err(RouterError::MalformedIPHeader); } // ignore header prefix (for in-place transport message construction) let packet = &msg[SIZE_MESSAGE_PREFIX..]; // lookup peer based on IP packet destination address let peer = match packet[0] >> 4 { VERSION_IP4 => { if msg.len() >= SIZE_IP4_HEADER { // extract IPv4 destination address let mut dst = [0u8; 4]; dst.copy_from_slice(&packet[OFFSET_IP4_DST..OFFSET_IP4_DST + 4]); let dst = Ipv4Addr::from(dst); // lookup peer (project unto and clone "value" field) self.0 .ipv4 .read() .longest_match(dst) .and_then(|(_, _, p)| p.upgrade()) .ok_or(RouterError::NoCryptKeyRoute) } else { Err(RouterError::MalformedIPHeader) } } VERSION_IP6 => { if msg.len() >= SIZE_IP6_HEADER { // extract IPv6 destination address let mut dst = [0u8; 16]; dst.copy_from_slice(&packet[OFFSET_IP6_DST..OFFSET_IP6_DST + 16]); let dst = Ipv6Addr::from(dst); // lookup peer (project unto and clone "value" field) self.0 .ipv6 .read() .longest_match(dst) .and_then(|(_, _, p)| p.upgrade()) .ok_or(RouterError::NoCryptKeyRoute) } else { Err(RouterError::MalformedIPHeader) } } _ => Err(RouterError::MalformedIPHeader), }?; // schedule for encryption and transmission to peer if let Some(job) = peer.send_job(msg) { // add job to parallel worker pool self.0.injector.push((peer.clone(), job)); // ensure workers running, TODO: something faster let &(_, ref cvar) = &self.0.waker; cvar.notify_all(); } Ok(()) } /// Receive an encrypted transport message /// /// # Arguments /// /// - msg: Encrypted transport message pub fn recv(&self, msg: Vec) -> Result<(), RouterError> { // ensure that the type field access is within bounds if msg.len() < SIZE_MESSAGE_PREFIX || msg[0] != TYPE_TRANSPORT { return Err(RouterError::MalformedTransportMessage); } // parse / cast // lookup peer based on receiver id unimplemented!(); } }