summaryrefslogtreecommitdiffstats
path: root/src/router/workers.rs
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2019-08-20 21:19:53 +0200
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2019-08-20 21:19:53 +0200
commit9cef264581ec8cf859113234b29ad5b58577ed8c (patch)
tree55320a3a619805b643e46d41068ceefe1628b0df /src/router/workers.rs
parentRemoved platform mod (diff)
downloadwireguard-rs-9cef264581ec8cf859113234b29ad5b58577ed8c.tar.xz
wireguard-rs-9cef264581ec8cf859113234b29ad5b58577ed8c.zip
Ensure peer threads are stopped on drop
Diffstat (limited to '')
-rw-r--r--src/router/workers.rs121
1 files changed, 72 insertions, 49 deletions
diff --git a/src/router/workers.rs b/src/router/workers.rs
index 2117190..da5b600 100644
--- a/src/router/workers.rs
+++ b/src/router/workers.rs
@@ -6,7 +6,7 @@ use crossbeam_deque::{Injector, Steal, Stealer, Worker};
use spin;
use std::iter;
use std::sync::atomic::{AtomicBool, Ordering};
-use std::sync::mpsc::{sync_channel, Receiver};
+use std::sync::mpsc::{sync_channel, Receiver, TryRecvError};
use std::sync::{Arc, Weak};
use std::thread;
@@ -23,17 +23,17 @@ enum Status {
Waiting, // job awaiting completion
}
-struct JobInner {
+pub struct JobInner {
msg: Vec<u8>, // message buffer (nonce and receiver id set)
key: [u8; 32], // chacha20poly1305 key
status: Status, // state of the job
op: Operation, // should be buffer be encrypted / decrypted?
}
-type JobBuffer = Arc<spin::Mutex<JobInner>>;
-type JobParallel = (Arc<thread::JoinHandle<()>>, JobBuffer);
-type JobInbound = (Arc<DecryptionState>, JobBuffer);
-type JobOutbound = (Weak<PeerInner>, JobBuffer);
+pub type JobBuffer = Arc<spin::Mutex<JobInner>>;
+pub type JobParallel = (Arc<thread::JoinHandle<()>>, JobBuffer);
+pub type JobInbound = (Weak<DecryptionState>, JobBuffer);
+pub type JobOutbound = JobBuffer;
/* Strategy for workers acquiring a new job:
*
@@ -53,62 +53,85 @@ fn find_task<T>(local: &Worker<T>, global: &Injector<T>, stealers: &[Stealer<T>]
})
}
-fn worker_inbound(
- device: Arc<DeviceInner>, // related device
- peer: Arc<PeerInner>, // related peer
- recv: Receiver<JobInbound>, // in order queue
-) {
- // reads from in order channel
- for job in recv.recv().iter() {
- loop {
- let (state, buf) = job;
-
- // check if job is complete
- match buf.try_lock() {
- None => (),
- Some(buf) => {
- if buf.status != Status::Waiting {
- // check replay protector
-
- // check if confirms keypair
-
- // write to tun device
-
- // continue to next job (no parking)
- break;
- }
+fn wait_buffer(stopped: AtomicBool, buf: &JobBuffer) {
+ while !stopped.load(Ordering::Acquire) {
+ match buf.try_lock() {
+ None => (),
+ Some(buf) => {
+ if buf.status == Status::Waiting {
+ return;
}
}
+ };
+ thread::park();
+ }
+}
- // wait for job to complete
- thread::park();
- }
+fn wait_recv<T>(stopped: &AtomicBool, recv: &Receiver<T>) -> Result<T, TryRecvError> {
+ while !stopped.load(Ordering::Acquire) {
+ match recv.try_recv() {
+ Err(TryRecvError::Empty) => (),
+ value => {
+ return value;
+ }
+ };
+ thread::park();
}
+ return Err(TryRecvError::Disconnected);
}
-fn worker_outbound(
+pub fn worker_inbound(
device: Arc<DeviceInner>, // related device
peer: Arc<PeerInner>, // related peer
recv: Receiver<JobInbound>, // in order queue
) {
- // reads from in order channel
- for job in recv.recv().iter() {
- loop {
- let (peer, buf) = job;
-
- // check if job is complete
- match buf.try_lock() {
- None => (),
- Some(buf) => {
- if buf.status != Status::Waiting {
- // send buffer to peer endpoint
- break;
- }
+ loop {
+ match wait_recv(&peer.stopped, &recv) {
+ Ok((state, buf)) => {
+ while !peer.stopped.load(Ordering::Acquire) {
+ match buf.try_lock() {
+ None => (),
+ Some(buf) => {
+ if buf.status != Status::Waiting {
+ // consume
+ break;
+ }
+ }
+ };
+ thread::park();
}
}
+ Err(_) => {
+ break;
+ }
+ }
+ }
+}
- // wait for job to complete
- thread::park();
+pub fn worker_outbound(
+ device: Arc<DeviceInner>, // related device
+ peer: Arc<PeerInner>, // related peer
+ recv: Receiver<JobOutbound>, // in order queue
+) {
+ loop {
+ match wait_recv(&peer.stopped, &recv) {
+ Ok(buf) => {
+ while !peer.stopped.load(Ordering::Acquire) {
+ match buf.try_lock() {
+ None => (),
+ Some(buf) => {
+ if buf.status != Status::Waiting {
+ // consume
+ break;
+ }
+ }
+ };
+ thread::park();
+ }
+ }
+ Err(_) => {
+ break;
+ }
}
}
}