summaryrefslogtreecommitdiffstats
path: root/src/router/workers.rs
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2019-09-04 19:08:13 +0200
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2019-09-04 19:08:13 +0200
commit6d11da441bde4fa75eef755bef4c97f0d1f6a29b (patch)
tree41575f0851461e0080258af4a11615de5a9d99c3 /src/router/workers.rs
parentWake workers when submitting work (diff)
downloadwireguard-rs-6d11da441bde4fa75eef755bef4c97f0d1f6a29b.tar.xz
wireguard-rs-6d11da441bde4fa75eef755bef4c97f0d1f6a29b.zip
Simply passing of JobBuffer ownership
Diffstat (limited to 'src/router/workers.rs')
-rw-r--r--src/router/workers.rs259
1 files changed, 77 insertions, 182 deletions
diff --git a/src/router/workers.rs b/src/router/workers.rs
index 0e68954..e79502f 100644
--- a/src/router/workers.rs
+++ b/src/router/workers.rs
@@ -5,6 +5,9 @@ use std::sync::mpsc::{sync_channel, Receiver, TryRecvError};
use std::sync::{Arc, Weak};
use std::thread;
+use futures::sync::oneshot;
+use futures::*;
+
use spin;
use crossbeam_deque::{Injector, Steal, Stealer, Worker};
@@ -25,93 +28,27 @@ pub enum Operation {
Decryption,
}
-#[derive(PartialEq, Debug)]
-pub enum Status {
- Fault, // unsealing failed
- Done, // job valid and complete
- Waiting, // job awaiting completion
+pub struct JobBuffer {
+ pub msg: Vec<u8>, // message buffer (nonce and receiver id set)
+ pub key: [u8; 32], // chacha20poly1305 key
+ pub okay: bool, // state of the job
+ pub op: Operation, // should be buffer be encrypted / decrypted?
}
-pub struct JobInner {
- pub msg: Vec<u8>, // message buffer (nonce and receiver id set)
- pub key: [u8; 32], // chacha20poly1305 key
- pub status: Status, // state of the job
- pub op: Operation, // should be buffer be encrypted / decrypted?
-}
-
-pub type JobBuffer = Arc<spin::Mutex<JobInner>>;
-pub type JobParallel<C, T, B> = (Arc<PeerInner<C, T, B>>, JobBuffer);
-pub type JobInbound<C, T, B> = (Weak<DecryptionState<C, T, B>>, JobBuffer);
-pub type JobOutbound = JobBuffer;
-
-/* Strategy for workers acquiring a new job:
- *
- * 1. Try the local job queue (owned by the thread)
- * 2. Try fetching a batch of jobs from the global injector
- * 3. Attempt to steal jobs from other threads.
- */
-fn find_task<T>(local: &Worker<T>, global: &Injector<T>, stealers: &[Stealer<T>]) -> Option<T> {
- local.pop().or_else(|| {
- iter::repeat_with(|| {
- global
- .steal_batch_and_pop(local)
- .or_else(|| stealers.iter().map(|s| s.steal()).collect())
- })
- .find(|s| !s.is_retry())
- .and_then(|s| s.success())
- })
-}
-
-fn wait_buffer(running: AtomicBool, buf: &JobBuffer) {
- while running.load(Ordering::Acquire) {
- match buf.try_lock() {
- None => (),
- Some(buf) => {
- if buf.status == Status::Waiting {
- return;
- }
- }
- };
- thread::park();
- }
-}
-
-fn wait_recv<T>(running: &AtomicBool, recv: &Receiver<T>) -> Result<T, TryRecvError> {
- while running.load(Ordering::Acquire) {
- match recv.try_recv() {
- Err(TryRecvError::Empty) => (),
- value => {
- return value;
- }
- };
- thread::park();
- }
- return Err(TryRecvError::Disconnected);
-}
+pub type JobParallel = (oneshot::Sender<JobBuffer>, JobBuffer);
+pub type JobInbound<C, T, B> = (Weak<DecryptionState<C, T, B>>, oneshot::Receiver<JobBuffer>);
+pub type JobOutbound = oneshot::Receiver<JobBuffer>;
pub fn worker_inbound<C: Callbacks, T: Tun, B: Bind>(
device: Arc<DeviceInner<C, T, B>>, // related device
peer: Arc<PeerInner<C, T, B>>, // related peer
+ receiver: Receiver<JobInbound<C, T, B>>,
) {
- while !peer.stopped.load(Ordering::Acquire) {
- inner(&device, &peer)
- }
-
+ /*
fn inner<C: Callbacks, T: Tun, B: Bind>(
device: &Arc<DeviceInner<C, T, B>>,
peer: &Arc<PeerInner<C, T, B>>,
) {
- // wait for job to be submitted
- let (state, buf) = loop {
- match peer.inbound.lock().pop_front() {
- Some(elem) => break elem,
- _ => (),
- }
-
- // default is to park
- thread::park()
- };
-
// wait for job to complete
loop {
match buf.try_lock() {
@@ -167,136 +104,94 @@ pub fn worker_inbound<C: Callbacks, T: Tun, B: Bind>(
thread::park()
}
}
+ */
}
pub fn worker_outbound<C: Callbacks, T: Tun, B: Bind>(
device: Arc<DeviceInner<C, T, B>>, // related device
peer: Arc<PeerInner<C, T, B>>, // related peer
+ receiver: Receiver<JobOutbound>,
) {
- while !peer.stopped.load(Ordering::Acquire) {
- inner(&device, &peer)
- }
-
- fn inner<C: Callbacks, T: Tun, B: Bind>(
- device: &Arc<DeviceInner<C, T, B>>,
- peer: &Arc<PeerInner<C, T, B>>,
- ) {
- // wait for job to be submitted
- let (state, buf) = loop {
- match peer.inbound.lock().pop_front() {
- Some(elem) => break elem,
- _ => (),
+ loop {
+ // fetch job
+ let rx = match receiver.recv() {
+ Ok(v) => v,
+ _ => {
+ return;
}
-
- // default is to park
- thread::park()
};
// wait for job to complete
- loop {
- match buf.try_lock() {
- None => (),
- Some(buf) => match buf.status {
- Status::Fault => break (),
- Status::Done => {
- // parse / cast
- let (header, packet) = match LayoutVerified::new_from_prefix(&buf.msg[..]) {
- Some(v) => v,
- None => continue,
- };
- let header: LayoutVerified<&[u8], TransportHeader> = header;
-
- // write to UDP device, TODO
- let xmit = false;
-
- // trigger callback
- (device.call_send)(
- &peer.opaque,
- buf.msg.len()
- > CHACHA20_POLY1305.nonce_len() + mem::size_of::<TransportHeader>(),
- xmit,
- );
- break;
- }
- _ => (),
- },
- };
-
- // default is to park
- thread::park()
- }
+ let _ = rx
+ .map(|buf| {
+ if buf.okay {
+ // write to UDP device, TODO
+ let xmit = false;
+
+ // trigger callback
+ (device.call_send)(
+ &peer.opaque,
+ buf.msg.len()
+ > CHACHA20_POLY1305.nonce_len() + mem::size_of::<TransportHeader>(),
+ xmit,
+ );
+ }
+ })
+ .wait();
}
}
pub fn worker_parallel<C: Callbacks, T: Tun, B: Bind>(
device: Arc<DeviceInner<C, T, B>>,
- local: Worker<JobParallel<C, T, B>>, // local job queue (local to thread)
- stealers: Vec<Stealer<JobParallel<C, T, B>>>, // stealers (from other threads)
+ receiver: Receiver<JobParallel>,
) {
- while device.running.load(Ordering::SeqCst) {
- match find_task(&local, &device.injector, &stealers) {
- Some(job) => {
- let (peer, buf) = job;
-
- // take ownership of the job buffer and complete it
- {
- let mut buf = buf.lock();
-
- // cast and check size of packet
- let (header, packet) = match LayoutVerified::new_from_prefix(&buf.msg[..]) {
- Some(v) => v,
- None => continue,
- };
+ loop {
+ // fetch next job
+ let (tx, mut buf) = match receiver.recv() {
+ Err(_) => {
+ return;
+ }
+ Ok(val) => val,
+ };
- if packet.len() < CHACHA20_POLY1305.nonce_len() {
- continue;
- }
+ // cast and check size of packet
+ let (header, packet) = match LayoutVerified::new_from_prefix(&buf.msg[..]) {
+ Some(v) => v,
+ None => continue,
+ };
- let header: LayoutVerified<&[u8], TransportHeader> = header;
+ if packet.len() < CHACHA20_POLY1305.nonce_len() {
+ continue;
+ }
- // do the weird ring AEAD dance
- let key = LessSafeKey::new(
- UnboundKey::new(&CHACHA20_POLY1305, &buf.key[..]).unwrap(),
- );
+ let header: LayoutVerified<&[u8], TransportHeader> = header;
- // create a nonce object
- let mut nonce = [0u8; 12];
- debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len()); // why the this is not a constant, god knows...
- nonce[4..].copy_from_slice(header.f_counter.as_bytes());
- let nonce = Nonce::assume_unique_for_key(nonce);
+ // do the weird ring AEAD dance
+ let key = LessSafeKey::new(UnboundKey::new(&CHACHA20_POLY1305, &buf.key[..]).unwrap());
- match buf.op {
- Operation::Encryption => {
- // note: extends the vector to accommodate the tag
- key.seal_in_place_append_tag(nonce, Aad::empty(), &mut buf.msg)
- .unwrap();
- buf.status = Status::Done;
- }
- Operation::Decryption => {
- // opening failure is signaled by fault state
- buf.status = match key.open_in_place(nonce, Aad::empty(), &mut buf.msg)
- {
- Ok(_) => Status::Done,
- Err(_) => Status::Fault,
- };
- }
- }
- }
+ // create a nonce object
+ let mut nonce = [0u8; 12];
+ debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len());
+ nonce[4..].copy_from_slice(header.f_counter.as_bytes());
+ let nonce = Nonce::assume_unique_for_key(nonce);
- // ensure consumer is unparked (TODO: better looking + wrap in atomic?)
- peer.thread_outbound
- .lock()
- .as_ref()
- .unwrap()
- .thread()
- .unpark();
+ match buf.op {
+ Operation::Encryption => {
+ // note: extends the vector to accommodate the tag
+ key.seal_in_place_append_tag(nonce, Aad::empty(), &mut buf.msg)
+ .unwrap();
+ buf.okay = true;
}
- None => {
- // wait for notification from device
- let &(ref lock, ref cvar) = &device.waker;
- let mut guard = lock.lock();
- cvar.wait(&mut guard);
+ Operation::Decryption => {
+ // opening failure is signaled by fault state
+ buf.okay = match key.open_in_place(nonce, Aad::empty(), &mut buf.msg) {
+ Ok(_) => true,
+ Err(_) => false,
+ };
}
}
+
+ // pass ownership to consumer
+ let _ = tx.send(buf);
}
}