use arraydeque::ArrayDeque; use spin::{Mutex, MutexGuard}; use std::sync::mpsc::Receiver; use std::sync::Arc; const INORDER_QUEUE_SIZE: usize = 64; pub struct InnerJob { // peer (used by worker to schedule/handle inorder queue), // when the peer is None, the job is complete peer: Option

, pub body: B, } pub struct Job { inner: Arc>>, } impl Clone for Job { fn clone(&self) -> Job { Job { inner: self.inner.clone(), } } } impl Job { pub fn new(peer: P, body: B) -> Job { Job { inner: Arc::new(Mutex::new(InnerJob { peer: Some(peer), body, })), } } } impl Job { /// Returns a mutex guard to the inner job if complete pub fn complete(&self) -> Option>> { self.inner .try_lock() .and_then(|m| if m.peer.is_none() { Some(m) } else { None }) } } pub struct InorderQueue { queue: Mutex; INORDER_QUEUE_SIZE]>>, } impl InorderQueue { pub fn send(&self, job: Job) -> bool { self.queue.lock().push_back(job).is_ok() } pub fn new() -> InorderQueue { InorderQueue { queue: Mutex::new(ArrayDeque::new()), } } #[inline(always)] pub fn handle)>(&self, f: F) { // take the mutex let mut queue = self.queue.lock(); // handle all complete messages while queue .pop_front() .and_then(|j| { // check if job is complete let ret = if let Some(mut guard) = j.complete() { f(&mut *guard); false } else { true }; // return job to cyclic buffer if not complete if ret { let _res = queue.push_front(j); debug_assert!(_res.is_ok()); None } else { // add job back to pool Some(()) } }) .is_some() {} } } /// Allows easy construction of a semi-parallel worker. /// Applicable for both decryption and encryption workers. #[inline(always)] pub fn worker_template< P, // represents a peer (atomic reference counted pointer) B, // inner body type (message buffer, key material, ...) W: Fn(&P, &mut B), S: Fn(&P, &mut B), Q: Fn(&P) -> &InorderQueue, >( receiver: Receiver>, // receiever for new jobs work_parallel: W, // perform parallel / out-of-order work on peer work_sequential: S, // perform sequential work on peer queue: Q, // resolve a peer to an inorder queue ) { log::trace!("router worker started"); loop { // handle new job let peer = { // get next job let job = match receiver.recv() { Ok(job) => job, _ => return, }; // lock the job let mut job = job.inner.lock(); // take the peer from the job let peer = job.peer.take().unwrap(); // process job work_parallel(&peer, &mut job.body); peer }; // process inorder jobs for peer queue(&peer).handle(|j| work_sequential(&peer, &mut j.body)); } }