From 929eadb651ba41bb72ba8f85a0d68c0cbad18661 Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Sun, 1 Sep 2019 17:16:01 +0200 Subject: Outbound cryptkey routing --- src/router/workers.rs | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) (limited to 'src/router/workers.rs') diff --git a/src/router/workers.rs b/src/router/workers.rs index c4a9f18..1af2cae 100644 --- a/src/router/workers.rs +++ b/src/router/workers.rs @@ -9,7 +9,7 @@ use spin; use crossbeam_deque::{Injector, Steal, Stealer, Worker}; use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305}; -use zerocopy::{AsBytes, ByteSlice, ByteSliceMut, FromBytes, LayoutVerified, Unaligned}; +use zerocopy::{AsBytes, LayoutVerified}; use super::device::DecryptionState; use super::device::DeviceInner; @@ -17,7 +17,7 @@ use super::messages::TransportHeader; use super::peer::PeerInner; use super::types::Callbacks; -use super::super::types::{Tun, Bind}; +use super::super::types::{Bind, Tun}; #[derive(PartialEq, Debug)] pub enum Operation { @@ -26,21 +26,21 @@ pub enum Operation { } #[derive(PartialEq, Debug)] -enum Status { +pub enum Status { Fault, // unsealing failed Done, // job valid and complete Waiting, // job awaiting completion } pub struct JobInner { - msg: Vec, // message buffer (nonce and receiver id set) - key: [u8; 32], // chacha20poly1305 key - status: Status, // state of the job - op: Operation, // should be buffer be encrypted / decrypted? + pub msg: Vec, // message buffer (nonce and receiver id set) + pub key: [u8; 32], // chacha20poly1305 key + pub status: Status, // state of the job + pub op: Operation, // should be buffer be encrypted / decrypted? } pub type JobBuffer = Arc>; -pub type JobParallel = (Arc>, JobBuffer); +pub type JobParallel = (Arc>, JobBuffer); pub type JobInbound = (Weak>, JobBuffer); pub type JobOutbound = JobBuffer; @@ -207,13 +207,13 @@ pub fn worker_outbound( pub fn worker_parallel( device: Arc>, - local: Worker, // local job queue (local to thread) - stealers: Vec>, // stealers (from other threads) + local: Worker>, // local job queue (local to thread) + stealers: Vec>>, // stealers (from other threads) ) { while device.running.load(Ordering::SeqCst) { match find_task(&local, &device.injector, &stealers) { Some(job) => { - let (handle, buf) = job; + let (peer, buf) = job; // take ownership of the job buffer and complete it { @@ -260,8 +260,13 @@ pub fn worker_parallel( } } - // ensure consumer is unparked - handle.thread().unpark(); + // ensure consumer is unparked (TODO: better looking + wrap in atomic?) + peer.thread_outbound + .lock() + .as_ref() + .unwrap() + .thread() + .unpark(); } None => { device.parked.store(true, Ordering::Release); -- cgit v1.2.3-59-g8ed1b