use super::device::DecryptionState; use super::device::DeviceInner; use super::peer::PeerInner; use crossbeam_deque::{Injector, Steal, Stealer, Worker}; use spin; use std::iter; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{sync_channel, Receiver, TryRecvError}; use std::sync::{Arc, Weak}; use std::thread; #[derive(PartialEq)] enum Operation { Encryption, Decryption, } #[derive(PartialEq)] enum Status { Fault, // unsealing failed Done, // job valid and complete Waiting, // job awaiting completion } pub struct JobInner { msg: Vec, // message buffer (nonce and receiver id set) key: [u8; 32], // chacha20poly1305 key status: Status, // state of the job op: Operation, // should be buffer be encrypted / decrypted? } pub type JobBuffer = Arc>; pub type JobParallel = (Arc>, JobBuffer); pub type JobInbound = (Weak, JobBuffer); pub type JobOutbound = JobBuffer; /* Strategy for workers acquiring a new job: * * 1. Try the local job queue (owned by the thread) * 2. Try fetching a batch of jobs from the global injector * 3. Attempt to steal jobs from other threads. */ fn find_task(local: &Worker, global: &Injector, stealers: &[Stealer]) -> Option { local.pop().or_else(|| { iter::repeat_with(|| { global .steal_batch_and_pop(local) .or_else(|| stealers.iter().map(|s| s.steal()).collect()) }) .find(|s| !s.is_retry()) .and_then(|s| s.success()) }) } fn wait_buffer(stopped: AtomicBool, buf: &JobBuffer) { while !stopped.load(Ordering::Acquire) { match buf.try_lock() { None => (), Some(buf) => { if buf.status == Status::Waiting { return; } } }; thread::park(); } } fn wait_recv(stopped: &AtomicBool, recv: &Receiver) -> Result { while !stopped.load(Ordering::Acquire) { match recv.try_recv() { Err(TryRecvError::Empty) => (), value => { return value; } }; thread::park(); } return Err(TryRecvError::Disconnected); } pub fn worker_inbound( device: Arc, // related device peer: Arc, // related peer recv: Receiver, // in order queue ) { loop { match wait_recv(&peer.stopped, &recv) { Ok((state, buf)) => { while !peer.stopped.load(Ordering::Acquire) { match buf.try_lock() { None => (), Some(buf) => { if buf.status != Status::Waiting { // consume break; } } }; thread::park(); } } Err(_) => { break; } } } } pub fn worker_outbound( device: Arc, // related device peer: Arc, // related peer recv: Receiver, // in order queue ) { loop { match wait_recv(&peer.stopped, &recv) { Ok(buf) => { while !peer.stopped.load(Ordering::Acquire) { match buf.try_lock() { None => (), Some(buf) => { if buf.status != Status::Waiting { // consume break; } } }; thread::park(); } } Err(_) => { break; } } } } pub fn worker_parallel( stopped: Arc, // stop workers (device has been dropped) parked: Arc, // thread has been parked? local: Worker, // local job queue (local to thread) global: Injector, // global job injector stealers: Vec>, // stealers (from other threads) ) { while !stopped.load(Ordering::SeqCst) { match find_task(&local, &global, &stealers) { Some(job) => { let (handle, buf) = job; // take ownership of the job buffer and complete it { let mut buf = buf.lock(); match buf.op { Operation::Encryption => { // TODO: encryption buf.status = Status::Done; } Operation::Decryption => { // TODO: decryption buf.status = Status::Done; } } } // ensure consumer is unparked handle.thread().unpark(); } None => { // no jobs, park the worker parked.store(true, Ordering::Release); thread::park(); } } } }