From 6d11da441bde4fa75eef755bef4c97f0d1f6a29b Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Wed, 4 Sep 2019 19:08:13 +0200 Subject: Simply passing of JobBuffer ownership --- src/router/device.rs | 80 +++++++++++++++++++++++----------------------------- 1 file changed, 35 insertions(+), 45 deletions(-) (limited to 'src/router/device.rs') diff --git a/src/router/device.rs b/src/router/device.rs index ec6453d..58ca2f6 100644 --- a/src/router/device.rs +++ b/src/router/device.rs @@ -1,14 +1,13 @@ use std::cmp; use std::collections::HashMap; use std::net::{Ipv4Addr, Ipv6Addr}; -use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::mpsc::sync_channel; +use std::sync::mpsc::SyncSender; use std::sync::{Arc, Weak}; use std::thread; use std::time::Instant; -use parking_lot::{Condvar, Mutex}; - -use crossbeam_deque::{Injector, Worker}; use spin; use treebitmap::IpLookupTable; @@ -41,11 +40,6 @@ pub struct DeviceInner { pub call_send: C::CallbackSend, pub call_need_key: C::CallbackKey, - // threading and workers - pub waker: (Mutex<()>, Condvar), - pub running: AtomicBool, // workers running? - pub injector: Injector>, // parallel enc/dec task injector - // routing pub recv: spin::RwLock>>, // receiver id -> decryption state pub ipv4: spin::RwLock>>>, // ipv4 cryptkey routing @@ -68,19 +62,20 @@ pub struct DecryptionState { pub death: Instant, // time when the key can no longer be used for decryption } -pub struct Device( - Arc>, // reference to device state - Vec>, // join handles for workers -); +pub struct Device { + pub state: Arc>, // reference to device state + pub handles: Vec>, // join handles for workers + pub queue_next: AtomicUsize, // next round-robin index + pub queues: Vec>>, // work queues (1 per thread) +} impl Drop for Device { fn drop(&mut self) { - // mark device as stopped - let device = &self.0; - device.running.store(false, Ordering::SeqCst); + // drop all queues + while self.queues.pop().is_some() {} // join all worker threads - while match self.1.pop() { + while match self.handles.pop() { Some(handle) => { handle.thread().unpark(); handle.join().unwrap(); @@ -98,8 +93,8 @@ impl, S: Callback, K: KeyCallback, T: Tun, B: Bi num_workers: usize, tun: T, bind: B, - call_recv: R, call_send: S, + call_recv: R, call_need_key: K, ) -> Device, T, B> { // allocate shared device state @@ -109,36 +104,31 @@ impl, S: Callback, K: KeyCallback, T: Tun, B: Bi call_recv, call_send, call_need_key, - waker: (Mutex::new(()), Condvar::new()), - running: AtomicBool::new(true), - injector: Injector::new(), recv: spin::RwLock::new(HashMap::new()), ipv4: spin::RwLock::new(IpLookupTable::new()), ipv6: spin::RwLock::new(IpLookupTable::new()), }); - // allocate work pool resources - let mut workers = Vec::with_capacity(num_workers); - let mut stealers = Vec::with_capacity(num_workers); - for _ in 0..num_workers { - let w = Worker::new_fifo(); - stealers.push(w.stealer()); - workers.push(w); - } - // start worker threads + let mut queues = Vec::with_capacity(num_workers); let mut threads = Vec::with_capacity(num_workers); for _ in 0..num_workers { + // allocate work queue + let (tx, rx) = sync_channel(128); + queues.push(spin::Mutex::new(tx)); + + // start worker thread let device = inner.clone(); - let stealers = stealers.clone(); - let worker = workers.pop().unwrap(); - threads.push(thread::spawn(move || { - worker_parallel(device, worker, stealers) - })); + threads.push(thread::spawn(move || worker_parallel(device, rx))); } // return exported device handle - Device(inner, threads) + Device { + state: inner, + handles: threads, + queue_next: AtomicUsize::new(0), + queues: queues, + } } } @@ -149,7 +139,7 @@ impl Device { /// /// A atomic ref. counted peer (with liftime matching the device) pub fn new_peer(&self, opaque: C::Opaque) -> Peer { - peer::new_peer(self.0.clone(), opaque) + peer::new_peer(self.state.clone(), opaque) } /// Cryptkey routes and sends a plaintext message (IP packet) @@ -177,7 +167,7 @@ impl Device { let dst = Ipv4Addr::from(dst); // lookup peer (project unto and clone "value" field) - self.0 + self.state .ipv4 .read() .longest_match(dst) @@ -195,7 +185,7 @@ impl Device { let dst = Ipv6Addr::from(dst); // lookup peer (project unto and clone "value" field) - self.0 + self.state .ipv6 .read() .longest_match(dst) @@ -210,12 +200,12 @@ impl Device { // schedule for encryption and transmission to peer if let Some(job) = peer.send_job(msg) { - // add job to parallel worker pool - self.0.injector.push((peer.clone(), job)); - - // ensure workers running, TODO: something faster - let &(_, ref cvar) = &self.0.waker; - cvar.notify_all(); + // add job to worker queue + let idx = self.queue_next.fetch_add(1, Ordering::SeqCst); + self.queues[idx % self.queues.len()] + .lock() + .send(job) + .unwrap(); } Ok(()) -- cgit v1.2.3-59-g8ed1b