summaryrefslogtreecommitdiffstats
path: root/src/router/workers.rs
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2019-08-28 11:52:08 +0200
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2019-08-28 11:52:08 +0200
commit10e6436e6bb064e93062c6f1a7b0034976538f77 (patch)
treede77eae6ab7285dfc868f8f542843b4bec79be43 /src/router/workers.rs
parentAdd confirm_key stub (diff)
downloadwireguard-rs-10e6436e6bb064e93062c6f1a7b0034976538f77.tar.xz
wireguard-rs-10e6436e6bb064e93062c6f1a7b0034976538f77.zip
Start worker threads for device
Diffstat (limited to 'src/router/workers.rs')
-rw-r--r--src/router/workers.rs26
1 files changed, 12 insertions, 14 deletions
diff --git a/src/router/workers.rs b/src/router/workers.rs
index 320f6a1..f02ee15 100644
--- a/src/router/workers.rs
+++ b/src/router/workers.rs
@@ -18,7 +18,7 @@ use super::peer::PeerInner;
use super::types::{Callback, KeyCallback, Opaque};
#[derive(PartialEq, Debug)]
-enum Operation {
+pub enum Operation {
Encryption,
Decryption,
}
@@ -60,8 +60,8 @@ fn find_task<T>(local: &Worker<T>, global: &Injector<T>, stealers: &[Stealer<T>]
})
}
-fn wait_buffer(stopped: AtomicBool, buf: &JobBuffer) {
- while !stopped.load(Ordering::Acquire) {
+fn wait_buffer(running: AtomicBool, buf: &JobBuffer) {
+ while running.load(Ordering::Acquire) {
match buf.try_lock() {
None => (),
Some(buf) => {
@@ -74,8 +74,8 @@ fn wait_buffer(stopped: AtomicBool, buf: &JobBuffer) {
}
}
-fn wait_recv<T>(stopped: &AtomicBool, recv: &Receiver<T>) -> Result<T, TryRecvError> {
- while !stopped.load(Ordering::Acquire) {
+fn wait_recv<T>(running: &AtomicBool, recv: &Receiver<T>) -> Result<T, TryRecvError> {
+ while running.load(Ordering::Acquire) {
match recv.try_recv() {
Err(TryRecvError::Empty) => (),
value => {
@@ -201,15 +201,13 @@ pub fn worker_outbound<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback
}
}
-pub fn worker_parallel(
- stopped: Arc<AtomicBool>, // stop workers (device has been dropped)
- parked: Arc<AtomicBool>, // thread has been parked?
- local: Worker<JobParallel>, // local job queue (local to thread)
- global: Injector<JobParallel>, // global job injector
+pub fn worker_parallel<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>>(
+ device: Arc<DeviceInner<T, S, R, K>>,
+ local: Worker<JobParallel>, // local job queue (local to thread)
stealers: Vec<Stealer<JobParallel>>, // stealers (from other threads)
) {
- while !stopped.load(Ordering::SeqCst) {
- match find_task(&local, &global, &stealers) {
+ while !device.running.load(Ordering::SeqCst) {
+ match find_task(&local, &device.injector, &stealers) {
Some(job) => {
let (handle, buf) = job;
@@ -236,7 +234,7 @@ pub fn worker_parallel(
// create a nonce object
let mut nonce = [0u8; 12];
- debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len()); // why the fuck this is not a constant, god knows...
+ debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len()); // why the this is not a constant, god knows...
nonce[4..].copy_from_slice(header.f_counter.as_bytes());
let nonce = Nonce::assume_unique_for_key(nonce);
@@ -263,7 +261,7 @@ pub fn worker_parallel(
}
None => {
// no jobs, park the worker
- parked.store(true, Ordering::Release);
+ device.parked.store(true, Ordering::Release);
thread::park();
}
}