summaryrefslogtreecommitdiffstats
path: root/src/router/workers.rs
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2019-09-01 17:16:01 +0200
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2019-09-01 17:16:01 +0200
commit929eadb651ba41bb72ba8f85a0d68c0cbad18661 (patch)
tree98fec908179a23a90205463773eebc6b9377f82c /src/router/workers.rs
parentAdded Bind trait to router (diff)
downloadwireguard-rs-929eadb651ba41bb72ba8f85a0d68c0cbad18661.tar.xz
wireguard-rs-929eadb651ba41bb72ba8f85a0d68c0cbad18661.zip
Outbound cryptkey routing
Diffstat (limited to '')
-rw-r--r--src/router/workers.rs31
1 files changed, 18 insertions, 13 deletions
diff --git a/src/router/workers.rs b/src/router/workers.rs
index c4a9f18..1af2cae 100644
--- a/src/router/workers.rs
+++ b/src/router/workers.rs
@@ -9,7 +9,7 @@ use spin;
use crossbeam_deque::{Injector, Steal, Stealer, Worker};
use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305};
-use zerocopy::{AsBytes, ByteSlice, ByteSliceMut, FromBytes, LayoutVerified, Unaligned};
+use zerocopy::{AsBytes, LayoutVerified};
use super::device::DecryptionState;
use super::device::DeviceInner;
@@ -17,7 +17,7 @@ use super::messages::TransportHeader;
use super::peer::PeerInner;
use super::types::Callbacks;
-use super::super::types::{Tun, Bind};
+use super::super::types::{Bind, Tun};
#[derive(PartialEq, Debug)]
pub enum Operation {
@@ -26,21 +26,21 @@ pub enum Operation {
}
#[derive(PartialEq, Debug)]
-enum Status {
+pub enum Status {
Fault, // unsealing failed
Done, // job valid and complete
Waiting, // job awaiting completion
}
pub struct JobInner {
- msg: Vec<u8>, // message buffer (nonce and receiver id set)
- key: [u8; 32], // chacha20poly1305 key
- status: Status, // state of the job
- op: Operation, // should be buffer be encrypted / decrypted?
+ pub msg: Vec<u8>, // message buffer (nonce and receiver id set)
+ pub key: [u8; 32], // chacha20poly1305 key
+ pub status: Status, // state of the job
+ pub op: Operation, // should be buffer be encrypted / decrypted?
}
pub type JobBuffer = Arc<spin::Mutex<JobInner>>;
-pub type JobParallel = (Arc<thread::JoinHandle<()>>, JobBuffer);
+pub type JobParallel<C, T, B> = (Arc<PeerInner<C, T, B>>, JobBuffer);
pub type JobInbound<C, T, B> = (Weak<DecryptionState<C, T, B>>, JobBuffer);
pub type JobOutbound = JobBuffer;
@@ -207,13 +207,13 @@ pub fn worker_outbound<C: Callbacks, T: Tun, B: Bind>(
pub fn worker_parallel<C: Callbacks, T: Tun, B: Bind>(
device: Arc<DeviceInner<C, T, B>>,
- local: Worker<JobParallel>, // local job queue (local to thread)
- stealers: Vec<Stealer<JobParallel>>, // stealers (from other threads)
+ local: Worker<JobParallel<C, T, B>>, // local job queue (local to thread)
+ stealers: Vec<Stealer<JobParallel<C, T, B>>>, // stealers (from other threads)
) {
while device.running.load(Ordering::SeqCst) {
match find_task(&local, &device.injector, &stealers) {
Some(job) => {
- let (handle, buf) = job;
+ let (peer, buf) = job;
// take ownership of the job buffer and complete it
{
@@ -260,8 +260,13 @@ pub fn worker_parallel<C: Callbacks, T: Tun, B: Bind>(
}
}
- // ensure consumer is unparked
- handle.thread().unpark();
+ // ensure consumer is unparked (TODO: better looking + wrap in atomic?)
+ peer.thread_outbound
+ .lock()
+ .as_ref()
+ .unwrap()
+ .thread()
+ .unpark();
}
None => {
device.parked.store(true, Ordering::Release);