diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/router/device.rs | 53 | ||||
-rw-r--r-- | src/router/workers.rs | 26 |
2 files changed, 53 insertions, 26 deletions
diff --git a/src/router/device.rs b/src/router/device.rs index 0d5224e..bee4ad4 100644 --- a/src/router/device.rs +++ b/src/router/device.rs @@ -5,7 +5,7 @@ use std::sync::{Arc, Weak}; use std::thread; use std::time::Instant; -use crossbeam_deque::{Injector, Steal}; +use crossbeam_deque::{Injector, Steal, Stealer, Worker}; use spin; use treebitmap::IpLookupTable; @@ -15,12 +15,13 @@ use super::peer; use super::peer::{Peer, PeerInner}; use super::types::{Callback, KeyCallback, Opaque}; +use super::workers::{worker_parallel, JobParallel}; pub struct DeviceInner<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>> { // threading and workers - pub stopped: AtomicBool, - pub injector: Injector<()>, // parallel enc/dec task injector - pub threads: Vec<thread::JoinHandle<()>>, // join handles of worker threads + pub running: AtomicBool, // workers running? + pub parked: AtomicBool, // any workers parked? + pub injector: Injector<JobParallel>, // parallel enc/dec task injector // unboxed callbacks (used for timers and handshake requests) pub event_send: S, // called when authenticated message send @@ -52,19 +53,23 @@ pub struct DecryptionState<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCall pub struct Device<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>>( Arc<DeviceInner<T, S, R, K>>, + Vec<thread::JoinHandle<()>>, ); impl<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>> Drop for Device<T, S, R, K> { fn drop(&mut self) { // mark device as stopped let device = &self.0; - device.stopped.store(true, Ordering::SeqCst); + device.running.store(false, Ordering::SeqCst); // eat all parallel jobs - while device.injector.steal() != Steal::Empty {} + while match device.injector.steal() { + Steal::Empty => true, + _ => false, + } {} // unpark all threads - for handle in &device.threads { + for handle in &self.1 { handle.thread().unpark(); } } @@ -72,22 +77,46 @@ impl<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>> Drop for Devi impl<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>> Device<T, S, R, K> { pub fn new( - workers: usize, + num_workers: usize, event_recv: R, event_send: S, event_need_key: K, ) -> Device<T, S, R, K> { - Device(Arc::new(DeviceInner { + // allocate shared device state + let inner = Arc::new(DeviceInner { event_recv, event_send, event_need_key, - threads: vec![], - stopped: AtomicBool::new(false), + parked: AtomicBool::new(false), + running: AtomicBool::new(true), injector: Injector::new(), recv: spin::RwLock::new(HashMap::new()), ipv4: spin::RwLock::new(IpLookupTable::new()), ipv6: spin::RwLock::new(IpLookupTable::new()), - })) + }); + + // alloacate work pool resources + let mut workers = Vec::with_capacity(num_workers); + let mut stealers = Vec::with_capacity(num_workers); + for _ in 0..num_workers { + let w = Worker::new_fifo(); + stealers.push(w.stealer()); + workers.push(w); + } + + // start worker threads + let mut threads = Vec::with_capacity(num_workers); + for _ in 0..num_workers { + let device = inner.clone(); + let stealers = stealers.clone(); + let worker = workers.pop().unwrap(); + threads.push(thread::spawn(move || { + worker_parallel(device, worker, stealers) + })); + } + + // return exported device handle + Device(inner, threads) } /// Adds a new peer to the device diff --git a/src/router/workers.rs b/src/router/workers.rs index 320f6a1..f02ee15 100644 --- a/src/router/workers.rs +++ b/src/router/workers.rs @@ -18,7 +18,7 @@ use super::peer::PeerInner; use super::types::{Callback, KeyCallback, Opaque}; #[derive(PartialEq, Debug)] -enum Operation { +pub enum Operation { Encryption, Decryption, } @@ -60,8 +60,8 @@ fn find_task<T>(local: &Worker<T>, global: &Injector<T>, stealers: &[Stealer<T>] }) } -fn wait_buffer(stopped: AtomicBool, buf: &JobBuffer) { - while !stopped.load(Ordering::Acquire) { +fn wait_buffer(running: AtomicBool, buf: &JobBuffer) { + while running.load(Ordering::Acquire) { match buf.try_lock() { None => (), Some(buf) => { @@ -74,8 +74,8 @@ fn wait_buffer(stopped: AtomicBool, buf: &JobBuffer) { } } -fn wait_recv<T>(stopped: &AtomicBool, recv: &Receiver<T>) -> Result<T, TryRecvError> { - while !stopped.load(Ordering::Acquire) { +fn wait_recv<T>(running: &AtomicBool, recv: &Receiver<T>) -> Result<T, TryRecvError> { + while running.load(Ordering::Acquire) { match recv.try_recv() { Err(TryRecvError::Empty) => (), value => { @@ -201,15 +201,13 @@ pub fn worker_outbound<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback } } -pub fn worker_parallel( - stopped: Arc<AtomicBool>, // stop workers (device has been dropped) - parked: Arc<AtomicBool>, // thread has been parked? - local: Worker<JobParallel>, // local job queue (local to thread) - global: Injector<JobParallel>, // global job injector +pub fn worker_parallel<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>>( + device: Arc<DeviceInner<T, S, R, K>>, + local: Worker<JobParallel>, // local job queue (local to thread) stealers: Vec<Stealer<JobParallel>>, // stealers (from other threads) ) { - while !stopped.load(Ordering::SeqCst) { - match find_task(&local, &global, &stealers) { + while !device.running.load(Ordering::SeqCst) { + match find_task(&local, &device.injector, &stealers) { Some(job) => { let (handle, buf) = job; @@ -236,7 +234,7 @@ pub fn worker_parallel( // create a nonce object let mut nonce = [0u8; 12]; - debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len()); // why the fuck this is not a constant, god knows... + debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len()); // why the this is not a constant, god knows... nonce[4..].copy_from_slice(header.f_counter.as_bytes()); let nonce = Nonce::assume_unique_for_key(nonce); @@ -263,7 +261,7 @@ pub fn worker_parallel( } None => { // no jobs, park the worker - parked.store(true, Ordering::Release); + device.parked.store(true, Ordering::Release); thread::park(); } } |