aboutsummaryrefslogtreecommitdiffstats
path: root/src/router/device.rs
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2019-08-28 11:52:08 +0200
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2019-08-28 11:52:08 +0200
commit10e6436e6bb064e93062c6f1a7b0034976538f77 (patch)
treede77eae6ab7285dfc868f8f542843b4bec79be43 /src/router/device.rs
parentAdd confirm_key stub (diff)
downloadwireguard-rs-10e6436e6bb064e93062c6f1a7b0034976538f77.tar.xz
wireguard-rs-10e6436e6bb064e93062c6f1a7b0034976538f77.zip
Start worker threads for device
Diffstat (limited to 'src/router/device.rs')
-rw-r--r--src/router/device.rs53
1 files changed, 41 insertions, 12 deletions
diff --git a/src/router/device.rs b/src/router/device.rs
index 0d5224e..bee4ad4 100644
--- a/src/router/device.rs
+++ b/src/router/device.rs
@@ -5,7 +5,7 @@ use std::sync::{Arc, Weak};
use std::thread;
use std::time::Instant;
-use crossbeam_deque::{Injector, Steal};
+use crossbeam_deque::{Injector, Steal, Stealer, Worker};
use spin;
use treebitmap::IpLookupTable;
@@ -15,12 +15,13 @@ use super::peer;
use super::peer::{Peer, PeerInner};
use super::types::{Callback, KeyCallback, Opaque};
+use super::workers::{worker_parallel, JobParallel};
pub struct DeviceInner<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>> {
// threading and workers
- pub stopped: AtomicBool,
- pub injector: Injector<()>, // parallel enc/dec task injector
- pub threads: Vec<thread::JoinHandle<()>>, // join handles of worker threads
+ pub running: AtomicBool, // workers running?
+ pub parked: AtomicBool, // any workers parked?
+ pub injector: Injector<JobParallel>, // parallel enc/dec task injector
// unboxed callbacks (used for timers and handshake requests)
pub event_send: S, // called when authenticated message send
@@ -52,19 +53,23 @@ pub struct DecryptionState<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCall
pub struct Device<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>>(
Arc<DeviceInner<T, S, R, K>>,
+ Vec<thread::JoinHandle<()>>,
);
impl<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>> Drop for Device<T, S, R, K> {
fn drop(&mut self) {
// mark device as stopped
let device = &self.0;
- device.stopped.store(true, Ordering::SeqCst);
+ device.running.store(false, Ordering::SeqCst);
// eat all parallel jobs
- while device.injector.steal() != Steal::Empty {}
+ while match device.injector.steal() {
+ Steal::Empty => true,
+ _ => false,
+ } {}
// unpark all threads
- for handle in &device.threads {
+ for handle in &self.1 {
handle.thread().unpark();
}
}
@@ -72,22 +77,46 @@ impl<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>> Drop for Devi
impl<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>> Device<T, S, R, K> {
pub fn new(
- workers: usize,
+ num_workers: usize,
event_recv: R,
event_send: S,
event_need_key: K,
) -> Device<T, S, R, K> {
- Device(Arc::new(DeviceInner {
+ // allocate shared device state
+ let inner = Arc::new(DeviceInner {
event_recv,
event_send,
event_need_key,
- threads: vec![],
- stopped: AtomicBool::new(false),
+ parked: AtomicBool::new(false),
+ running: AtomicBool::new(true),
injector: Injector::new(),
recv: spin::RwLock::new(HashMap::new()),
ipv4: spin::RwLock::new(IpLookupTable::new()),
ipv6: spin::RwLock::new(IpLookupTable::new()),
- }))
+ });
+
+ // alloacate work pool resources
+ let mut workers = Vec::with_capacity(num_workers);
+ let mut stealers = Vec::with_capacity(num_workers);
+ for _ in 0..num_workers {
+ let w = Worker::new_fifo();
+ stealers.push(w.stealer());
+ workers.push(w);
+ }
+
+ // start worker threads
+ let mut threads = Vec::with_capacity(num_workers);
+ for _ in 0..num_workers {
+ let device = inner.clone();
+ let stealers = stealers.clone();
+ let worker = workers.pop().unwrap();
+ threads.push(thread::spawn(move || {
+ worker_parallel(device, worker, stealers)
+ }));
+ }
+
+ // return exported device handle
+ Device(inner, threads)
}
/// Adds a new peer to the device