diff options
Diffstat (limited to 'src/router')
-rw-r--r-- | src/router/device.rs | 89 | ||||
-rw-r--r-- | src/router/mod.rs | 2 |
2 files changed, 50 insertions, 41 deletions
diff --git a/src/router/device.rs b/src/router/device.rs index 3b29312..5dfd22c 100644 --- a/src/router/device.rs +++ b/src/router/device.rs @@ -1,11 +1,11 @@ -use arraydeque::{ArrayDeque, Saturating, Wrapping}; +use arraydeque::{ArrayDeque, Wrapping}; use treebitmap::IpLookupTable; +use crossbeam_deque::{Injector, Steal}; use std::collections::HashMap; -use std::error::Error; +use std::mem; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; -use std::ptr; -use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; use std::sync::{Arc, Mutex, Weak}; use std::thread; @@ -21,12 +21,29 @@ use std::u64; const REJECT_AFTER_MESSAGES: u64 = u64::MAX - (1 << 4); const MAX_STAGED_PACKETS: usize = 128; -pub struct Device { +struct DeviceInner { + stopped: AtomicBool, + injector: Injector<()>, // parallel enc/dec task injector + threads: Vec<thread::JoinHandle<()>>, recv: spin::RwLock<HashMap<u32, DecryptionState>>, ipv4: IpLookupTable<Ipv4Addr, Weak<PeerInner>>, ipv6: IpLookupTable<Ipv6Addr, Weak<PeerInner>>, } +struct PeerInner { + stopped: AtomicBool, + thread_outbound: spin::Mutex<thread::JoinHandle<()>>, + thread_inbound: spin::Mutex<thread::JoinHandle<()>>, + inorder_outbound: SyncSender<()>, + inorder_inbound: SyncSender<()>, + staged_packets: Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake + rx_bytes: AtomicU64, // received bytes + tx_bytes: AtomicU64, // transmitted bytes + keys: spin::Mutex<KeyWheel>, // key-wheel + ekey: spin::Mutex<EncryptionState>, // encryption state + endpoint: spin::Mutex<Option<Arc<SocketAddr>>>, +} + struct EncryptionState { key: [u8; 32], // encryption key id: u32, // sender id @@ -48,33 +65,34 @@ struct KeyWheel { previous: Option<KeyPair>, // old key state (used for decryption) } -struct PeerInner { - inorder_outbound: SyncSender<()>, - inorder_inbound: SyncSender<()>, - staged_packets: Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake - rx_bytes: AtomicU64, // received bytes - tx_bytes: AtomicU64, // transmitted bytes - keys: spin::Mutex<KeyWheel>, // key-wheel - ekey: spin::Mutex<EncryptionState>, // encryption state - endpoint: spin::Mutex<Option<Arc<SocketAddr>>>, -} - pub struct Peer(Arc<PeerInner>); +pub struct Device(DeviceInner); impl Drop for Peer { fn drop(&mut self) { - // stop threads and remove peer from device + // mark peer as stopped + let inner = &self.0; + inner.stopped.store(true, Ordering::SeqCst); + + // unpark threads to stop + inner.thread_inbound.lock().thread().unpark(); + inner.thread_outbound.lock().thread().unpark(); } } impl Drop for Device { fn drop(&mut self) { - // stop threads + // mark device as stopped + let inner = &self.0; + inner.stopped.store(true, Ordering::SeqCst); + + // eat all parallel jobs + while inner.injector.steal() != Steal::Empty {} } } impl Peer { - fn set_endpoint(&self, endpoint: SocketAddr) { + pub fn set_endpoint(&self, endpoint: SocketAddr) { *self.0.endpoint.lock() = Some(Arc::new(endpoint)) } @@ -87,7 +105,7 @@ impl Peer { }; } - pub fn keypair_add(&self, new: KeyPair) -> Option<u32> { + fn keypair_add(&self, new: KeyPair) -> Option<u32> { let mut keys = self.0.keys.lock(); let release = keys.previous.map(|k| k.recv.id); @@ -124,30 +142,21 @@ impl Peer { } impl Device { - pub fn new() -> Device { - Device { + pub fn new(workers: usize) -> Device { + Device(DeviceInner { + threads: vec![], + stopped: AtomicBool::new(false), + injector: Injector::new(), recv: spin::RwLock::new(HashMap::new()), ipv4: IpLookupTable::new(), ipv6: IpLookupTable::new(), - } - } - - pub fn release(&self, id: u32) { - debug_assert!( - if let Some(_) = self.recv.read().get(&id) { - true - } else { - false - }, - true - ); - self.recv.write().remove(&id); + }) } pub fn add_subnet(&mut self, ip: IpAddr, masklen: u32, peer: Peer) { match ip { - IpAddr::V4(v4) => self.ipv4.insert(v4, masklen, Arc::downgrade(&peer.0)), - IpAddr::V6(v6) => self.ipv6.insert(v6, masklen, Arc::downgrade(&peer.0)), + IpAddr::V4(v4) => self.0.ipv4.insert(v4, masklen, Arc::downgrade(&peer.0)), + IpAddr::V6(v6) => self.0.ipv6.insert(v6, masklen, Arc::downgrade(&peer.0)), }; } @@ -155,7 +164,7 @@ impl Device { let mut subnets = Vec::new(); // extract ipv4 entries - for subnet in self.ipv4.iter() { + for subnet in self.0.ipv4.iter() { let (ip, masklen, p) = subnet; if let Some(p) = p.upgrade() { if Arc::ptr_eq(&p, &peer.0) { @@ -165,7 +174,7 @@ impl Device { } // extract ipv6 entries - for subnet in self.ipv6.iter() { + for subnet in self.0.ipv6.iter() { let (ip, masklen, p) = subnet; if let Some(p) = p.upgrade() { if Arc::ptr_eq(&p, &peer.0) { @@ -182,7 +191,7 @@ impl Device { let release = peer.keypair_add(new); // update incoming packet id map - let mut recv = self.recv.write(); + let mut recv = self.0.recv.write(); // release id of previous keypair if let Some(id) = release { diff --git a/src/router/mod.rs b/src/router/mod.rs index b53b10c..7055875 100644 --- a/src/router/mod.rs +++ b/src/router/mod.rs @@ -4,4 +4,4 @@ mod device; // mod inbound; // mod outbound; -pub use device::Device;
\ No newline at end of file +pub use device::{Device, Peer};
\ No newline at end of file |