diff options
author | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-09-04 19:08:13 +0200 |
---|---|---|
committer | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-09-04 19:08:13 +0200 |
commit | 6d11da441bde4fa75eef755bef4c97f0d1f6a29b (patch) | |
tree | 41575f0851461e0080258af4a11615de5a9d99c3 /src/router/workers.rs | |
parent | Wake workers when submitting work (diff) | |
download | wireguard-rs-6d11da441bde4fa75eef755bef4c97f0d1f6a29b.tar.xz wireguard-rs-6d11da441bde4fa75eef755bef4c97f0d1f6a29b.zip |
Simply passing of JobBuffer ownership
Diffstat (limited to 'src/router/workers.rs')
-rw-r--r-- | src/router/workers.rs | 259 |
1 files changed, 77 insertions, 182 deletions
diff --git a/src/router/workers.rs b/src/router/workers.rs index 0e68954..e79502f 100644 --- a/src/router/workers.rs +++ b/src/router/workers.rs @@ -5,6 +5,9 @@ use std::sync::mpsc::{sync_channel, Receiver, TryRecvError}; use std::sync::{Arc, Weak}; use std::thread; +use futures::sync::oneshot; +use futures::*; + use spin; use crossbeam_deque::{Injector, Steal, Stealer, Worker}; @@ -25,93 +28,27 @@ pub enum Operation { Decryption, } -#[derive(PartialEq, Debug)] -pub enum Status { - Fault, // unsealing failed - Done, // job valid and complete - Waiting, // job awaiting completion +pub struct JobBuffer { + pub msg: Vec<u8>, // message buffer (nonce and receiver id set) + pub key: [u8; 32], // chacha20poly1305 key + pub okay: bool, // state of the job + pub op: Operation, // should be buffer be encrypted / decrypted? } -pub struct JobInner { - pub msg: Vec<u8>, // message buffer (nonce and receiver id set) - pub key: [u8; 32], // chacha20poly1305 key - pub status: Status, // state of the job - pub op: Operation, // should be buffer be encrypted / decrypted? -} - -pub type JobBuffer = Arc<spin::Mutex<JobInner>>; -pub type JobParallel<C, T, B> = (Arc<PeerInner<C, T, B>>, JobBuffer); -pub type JobInbound<C, T, B> = (Weak<DecryptionState<C, T, B>>, JobBuffer); -pub type JobOutbound = JobBuffer; - -/* Strategy for workers acquiring a new job: - * - * 1. Try the local job queue (owned by the thread) - * 2. Try fetching a batch of jobs from the global injector - * 3. Attempt to steal jobs from other threads. - */ -fn find_task<T>(local: &Worker<T>, global: &Injector<T>, stealers: &[Stealer<T>]) -> Option<T> { - local.pop().or_else(|| { - iter::repeat_with(|| { - global - .steal_batch_and_pop(local) - .or_else(|| stealers.iter().map(|s| s.steal()).collect()) - }) - .find(|s| !s.is_retry()) - .and_then(|s| s.success()) - }) -} - -fn wait_buffer(running: AtomicBool, buf: &JobBuffer) { - while running.load(Ordering::Acquire) { - match buf.try_lock() { - None => (), - Some(buf) => { - if buf.status == Status::Waiting { - return; - } - } - }; - thread::park(); - } -} - -fn wait_recv<T>(running: &AtomicBool, recv: &Receiver<T>) -> Result<T, TryRecvError> { - while running.load(Ordering::Acquire) { - match recv.try_recv() { - Err(TryRecvError::Empty) => (), - value => { - return value; - } - }; - thread::park(); - } - return Err(TryRecvError::Disconnected); -} +pub type JobParallel = (oneshot::Sender<JobBuffer>, JobBuffer); +pub type JobInbound<C, T, B> = (Weak<DecryptionState<C, T, B>>, oneshot::Receiver<JobBuffer>); +pub type JobOutbound = oneshot::Receiver<JobBuffer>; pub fn worker_inbound<C: Callbacks, T: Tun, B: Bind>( device: Arc<DeviceInner<C, T, B>>, // related device peer: Arc<PeerInner<C, T, B>>, // related peer + receiver: Receiver<JobInbound<C, T, B>>, ) { - while !peer.stopped.load(Ordering::Acquire) { - inner(&device, &peer) - } - + /* fn inner<C: Callbacks, T: Tun, B: Bind>( device: &Arc<DeviceInner<C, T, B>>, peer: &Arc<PeerInner<C, T, B>>, ) { - // wait for job to be submitted - let (state, buf) = loop { - match peer.inbound.lock().pop_front() { - Some(elem) => break elem, - _ => (), - } - - // default is to park - thread::park() - }; - // wait for job to complete loop { match buf.try_lock() { @@ -167,136 +104,94 @@ pub fn worker_inbound<C: Callbacks, T: Tun, B: Bind>( thread::park() } } + */ } pub fn worker_outbound<C: Callbacks, T: Tun, B: Bind>( device: Arc<DeviceInner<C, T, B>>, // related device peer: Arc<PeerInner<C, T, B>>, // related peer + receiver: Receiver<JobOutbound>, ) { - while !peer.stopped.load(Ordering::Acquire) { - inner(&device, &peer) - } - - fn inner<C: Callbacks, T: Tun, B: Bind>( - device: &Arc<DeviceInner<C, T, B>>, - peer: &Arc<PeerInner<C, T, B>>, - ) { - // wait for job to be submitted - let (state, buf) = loop { - match peer.inbound.lock().pop_front() { - Some(elem) => break elem, - _ => (), + loop { + // fetch job + let rx = match receiver.recv() { + Ok(v) => v, + _ => { + return; } - - // default is to park - thread::park() }; // wait for job to complete - loop { - match buf.try_lock() { - None => (), - Some(buf) => match buf.status { - Status::Fault => break (), - Status::Done => { - // parse / cast - let (header, packet) = match LayoutVerified::new_from_prefix(&buf.msg[..]) { - Some(v) => v, - None => continue, - }; - let header: LayoutVerified<&[u8], TransportHeader> = header; - - // write to UDP device, TODO - let xmit = false; - - // trigger callback - (device.call_send)( - &peer.opaque, - buf.msg.len() - > CHACHA20_POLY1305.nonce_len() + mem::size_of::<TransportHeader>(), - xmit, - ); - break; - } - _ => (), - }, - }; - - // default is to park - thread::park() - } + let _ = rx + .map(|buf| { + if buf.okay { + // write to UDP device, TODO + let xmit = false; + + // trigger callback + (device.call_send)( + &peer.opaque, + buf.msg.len() + > CHACHA20_POLY1305.nonce_len() + mem::size_of::<TransportHeader>(), + xmit, + ); + } + }) + .wait(); } } pub fn worker_parallel<C: Callbacks, T: Tun, B: Bind>( device: Arc<DeviceInner<C, T, B>>, - local: Worker<JobParallel<C, T, B>>, // local job queue (local to thread) - stealers: Vec<Stealer<JobParallel<C, T, B>>>, // stealers (from other threads) + receiver: Receiver<JobParallel>, ) { - while device.running.load(Ordering::SeqCst) { - match find_task(&local, &device.injector, &stealers) { - Some(job) => { - let (peer, buf) = job; - - // take ownership of the job buffer and complete it - { - let mut buf = buf.lock(); - - // cast and check size of packet - let (header, packet) = match LayoutVerified::new_from_prefix(&buf.msg[..]) { - Some(v) => v, - None => continue, - }; + loop { + // fetch next job + let (tx, mut buf) = match receiver.recv() { + Err(_) => { + return; + } + Ok(val) => val, + }; - if packet.len() < CHACHA20_POLY1305.nonce_len() { - continue; - } + // cast and check size of packet + let (header, packet) = match LayoutVerified::new_from_prefix(&buf.msg[..]) { + Some(v) => v, + None => continue, + }; - let header: LayoutVerified<&[u8], TransportHeader> = header; + if packet.len() < CHACHA20_POLY1305.nonce_len() { + continue; + } - // do the weird ring AEAD dance - let key = LessSafeKey::new( - UnboundKey::new(&CHACHA20_POLY1305, &buf.key[..]).unwrap(), - ); + let header: LayoutVerified<&[u8], TransportHeader> = header; - // create a nonce object - let mut nonce = [0u8; 12]; - debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len()); // why the this is not a constant, god knows... - nonce[4..].copy_from_slice(header.f_counter.as_bytes()); - let nonce = Nonce::assume_unique_for_key(nonce); + // do the weird ring AEAD dance + let key = LessSafeKey::new(UnboundKey::new(&CHACHA20_POLY1305, &buf.key[..]).unwrap()); - match buf.op { - Operation::Encryption => { - // note: extends the vector to accommodate the tag - key.seal_in_place_append_tag(nonce, Aad::empty(), &mut buf.msg) - .unwrap(); - buf.status = Status::Done; - } - Operation::Decryption => { - // opening failure is signaled by fault state - buf.status = match key.open_in_place(nonce, Aad::empty(), &mut buf.msg) - { - Ok(_) => Status::Done, - Err(_) => Status::Fault, - }; - } - } - } + // create a nonce object + let mut nonce = [0u8; 12]; + debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len()); + nonce[4..].copy_from_slice(header.f_counter.as_bytes()); + let nonce = Nonce::assume_unique_for_key(nonce); - // ensure consumer is unparked (TODO: better looking + wrap in atomic?) - peer.thread_outbound - .lock() - .as_ref() - .unwrap() - .thread() - .unpark(); + match buf.op { + Operation::Encryption => { + // note: extends the vector to accommodate the tag + key.seal_in_place_append_tag(nonce, Aad::empty(), &mut buf.msg) + .unwrap(); + buf.okay = true; } - None => { - // wait for notification from device - let &(ref lock, ref cvar) = &device.waker; - let mut guard = lock.lock(); - cvar.wait(&mut guard); + Operation::Decryption => { + // opening failure is signaled by fault state + buf.okay = match key.open_in_place(nonce, Aad::empty(), &mut buf.msg) { + Ok(_) => true, + Err(_) => false, + }; } } + + // pass ownership to consumer + let _ = tx.send(buf); } } |