diff options
Diffstat (limited to '')
-rw-r--r-- | src/router/workers.rs | 121 |
1 files changed, 72 insertions, 49 deletions
diff --git a/src/router/workers.rs b/src/router/workers.rs index 2117190..da5b600 100644 --- a/src/router/workers.rs +++ b/src/router/workers.rs @@ -6,7 +6,7 @@ use crossbeam_deque::{Injector, Steal, Stealer, Worker}; use spin; use std::iter; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{sync_channel, Receiver}; +use std::sync::mpsc::{sync_channel, Receiver, TryRecvError}; use std::sync::{Arc, Weak}; use std::thread; @@ -23,17 +23,17 @@ enum Status { Waiting, // job awaiting completion } -struct JobInner { +pub struct JobInner { msg: Vec<u8>, // message buffer (nonce and receiver id set) key: [u8; 32], // chacha20poly1305 key status: Status, // state of the job op: Operation, // should be buffer be encrypted / decrypted? } -type JobBuffer = Arc<spin::Mutex<JobInner>>; -type JobParallel = (Arc<thread::JoinHandle<()>>, JobBuffer); -type JobInbound = (Arc<DecryptionState>, JobBuffer); -type JobOutbound = (Weak<PeerInner>, JobBuffer); +pub type JobBuffer = Arc<spin::Mutex<JobInner>>; +pub type JobParallel = (Arc<thread::JoinHandle<()>>, JobBuffer); +pub type JobInbound = (Weak<DecryptionState>, JobBuffer); +pub type JobOutbound = JobBuffer; /* Strategy for workers acquiring a new job: * @@ -53,62 +53,85 @@ fn find_task<T>(local: &Worker<T>, global: &Injector<T>, stealers: &[Stealer<T>] }) } -fn worker_inbound( - device: Arc<DeviceInner>, // related device - peer: Arc<PeerInner>, // related peer - recv: Receiver<JobInbound>, // in order queue -) { - // reads from in order channel - for job in recv.recv().iter() { - loop { - let (state, buf) = job; - - // check if job is complete - match buf.try_lock() { - None => (), - Some(buf) => { - if buf.status != Status::Waiting { - // check replay protector - - // check if confirms keypair - - // write to tun device - - // continue to next job (no parking) - break; - } +fn wait_buffer(stopped: AtomicBool, buf: &JobBuffer) { + while !stopped.load(Ordering::Acquire) { + match buf.try_lock() { + None => (), + Some(buf) => { + if buf.status == Status::Waiting { + return; } } + }; + thread::park(); + } +} - // wait for job to complete - thread::park(); - } +fn wait_recv<T>(stopped: &AtomicBool, recv: &Receiver<T>) -> Result<T, TryRecvError> { + while !stopped.load(Ordering::Acquire) { + match recv.try_recv() { + Err(TryRecvError::Empty) => (), + value => { + return value; + } + }; + thread::park(); } + return Err(TryRecvError::Disconnected); } -fn worker_outbound( +pub fn worker_inbound( device: Arc<DeviceInner>, // related device peer: Arc<PeerInner>, // related peer recv: Receiver<JobInbound>, // in order queue ) { - // reads from in order channel - for job in recv.recv().iter() { - loop { - let (peer, buf) = job; - - // check if job is complete - match buf.try_lock() { - None => (), - Some(buf) => { - if buf.status != Status::Waiting { - // send buffer to peer endpoint - break; - } + loop { + match wait_recv(&peer.stopped, &recv) { + Ok((state, buf)) => { + while !peer.stopped.load(Ordering::Acquire) { + match buf.try_lock() { + None => (), + Some(buf) => { + if buf.status != Status::Waiting { + // consume + break; + } + } + }; + thread::park(); } } + Err(_) => { + break; + } + } + } +} - // wait for job to complete - thread::park(); +pub fn worker_outbound( + device: Arc<DeviceInner>, // related device + peer: Arc<PeerInner>, // related peer + recv: Receiver<JobOutbound>, // in order queue +) { + loop { + match wait_recv(&peer.stopped, &recv) { + Ok(buf) => { + while !peer.stopped.load(Ordering::Acquire) { + match buf.try_lock() { + None => (), + Some(buf) => { + if buf.status != Status::Waiting { + // consume + break; + } + } + }; + thread::park(); + } + } + Err(_) => { + break; + } } } } |