From 9cef264581ec8cf859113234b29ad5b58577ed8c Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Tue, 20 Aug 2019 21:19:53 +0200 Subject: Ensure peer threads are stopped on drop --- src/router/workers.rs | 121 ++++++++++++++++++++++++++++++-------------------- 1 file changed, 72 insertions(+), 49 deletions(-) (limited to 'src/router/workers.rs') diff --git a/src/router/workers.rs b/src/router/workers.rs index 2117190..da5b600 100644 --- a/src/router/workers.rs +++ b/src/router/workers.rs @@ -6,7 +6,7 @@ use crossbeam_deque::{Injector, Steal, Stealer, Worker}; use spin; use std::iter; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{sync_channel, Receiver}; +use std::sync::mpsc::{sync_channel, Receiver, TryRecvError}; use std::sync::{Arc, Weak}; use std::thread; @@ -23,17 +23,17 @@ enum Status { Waiting, // job awaiting completion } -struct JobInner { +pub struct JobInner { msg: Vec, // message buffer (nonce and receiver id set) key: [u8; 32], // chacha20poly1305 key status: Status, // state of the job op: Operation, // should be buffer be encrypted / decrypted? } -type JobBuffer = Arc>; -type JobParallel = (Arc>, JobBuffer); -type JobInbound = (Arc, JobBuffer); -type JobOutbound = (Weak, JobBuffer); +pub type JobBuffer = Arc>; +pub type JobParallel = (Arc>, JobBuffer); +pub type JobInbound = (Weak, JobBuffer); +pub type JobOutbound = JobBuffer; /* Strategy for workers acquiring a new job: * @@ -53,62 +53,85 @@ fn find_task(local: &Worker, global: &Injector, stealers: &[Stealer] }) } -fn worker_inbound( - device: Arc, // related device - peer: Arc, // related peer - recv: Receiver, // in order queue -) { - // reads from in order channel - for job in recv.recv().iter() { - loop { - let (state, buf) = job; - - // check if job is complete - match buf.try_lock() { - None => (), - Some(buf) => { - if buf.status != Status::Waiting { - // check replay protector - - // check if confirms keypair - - // write to tun device - - // continue to next job (no parking) - break; - } +fn wait_buffer(stopped: AtomicBool, buf: &JobBuffer) { + while !stopped.load(Ordering::Acquire) { + match buf.try_lock() { + None => (), + Some(buf) => { + if buf.status == Status::Waiting { + return; } } + }; + thread::park(); + } +} - // wait for job to complete - thread::park(); - } +fn wait_recv(stopped: &AtomicBool, recv: &Receiver) -> Result { + while !stopped.load(Ordering::Acquire) { + match recv.try_recv() { + Err(TryRecvError::Empty) => (), + value => { + return value; + } + }; + thread::park(); } + return Err(TryRecvError::Disconnected); } -fn worker_outbound( +pub fn worker_inbound( device: Arc, // related device peer: Arc, // related peer recv: Receiver, // in order queue ) { - // reads from in order channel - for job in recv.recv().iter() { - loop { - let (peer, buf) = job; - - // check if job is complete - match buf.try_lock() { - None => (), - Some(buf) => { - if buf.status != Status::Waiting { - // send buffer to peer endpoint - break; - } + loop { + match wait_recv(&peer.stopped, &recv) { + Ok((state, buf)) => { + while !peer.stopped.load(Ordering::Acquire) { + match buf.try_lock() { + None => (), + Some(buf) => { + if buf.status != Status::Waiting { + // consume + break; + } + } + }; + thread::park(); } } + Err(_) => { + break; + } + } + } +} - // wait for job to complete - thread::park(); +pub fn worker_outbound( + device: Arc, // related device + peer: Arc, // related peer + recv: Receiver, // in order queue +) { + loop { + match wait_recv(&peer.stopped, &recv) { + Ok(buf) => { + while !peer.stopped.load(Ordering::Acquire) { + match buf.try_lock() { + None => (), + Some(buf) => { + if buf.status != Status::Waiting { + // consume + break; + } + } + }; + thread::park(); + } + } + Err(_) => { + break; + } } } } -- cgit v1.2.3-59-g8ed1b