aboutsummaryrefslogtreecommitdiffstats
path: root/src/router
diff options
context:
space:
mode:
Diffstat (limited to 'src/router')
-rw-r--r--src/router/device.rs80
-rw-r--r--src/router/peer.rs133
-rw-r--r--src/router/tests.rs2
-rw-r--r--src/router/workers.rs259
4 files changed, 175 insertions, 299 deletions
diff --git a/src/router/device.rs b/src/router/device.rs
index ec6453d..58ca2f6 100644
--- a/src/router/device.rs
+++ b/src/router/device.rs
@@ -1,14 +1,13 @@
use std::cmp;
use std::collections::HashMap;
use std::net::{Ipv4Addr, Ipv6Addr};
-use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
+use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
+use std::sync::mpsc::sync_channel;
+use std::sync::mpsc::SyncSender;
use std::sync::{Arc, Weak};
use std::thread;
use std::time::Instant;
-use parking_lot::{Condvar, Mutex};
-
-use crossbeam_deque::{Injector, Worker};
use spin;
use treebitmap::IpLookupTable;
@@ -41,11 +40,6 @@ pub struct DeviceInner<C: Callbacks, T: Tun, B: Bind> {
pub call_send: C::CallbackSend,
pub call_need_key: C::CallbackKey,
- // threading and workers
- pub waker: (Mutex<()>, Condvar),
- pub running: AtomicBool, // workers running?
- pub injector: Injector<JobParallel<C, T, B>>, // parallel enc/dec task injector
-
// routing
pub recv: spin::RwLock<HashMap<u32, DecryptionState<C, T, B>>>, // receiver id -> decryption state
pub ipv4: spin::RwLock<IpLookupTable<Ipv4Addr, Weak<PeerInner<C, T, B>>>>, // ipv4 cryptkey routing
@@ -68,19 +62,20 @@ pub struct DecryptionState<C: Callbacks, T: Tun, B: Bind> {
pub death: Instant, // time when the key can no longer be used for decryption
}
-pub struct Device<C: Callbacks, T: Tun, B: Bind>(
- Arc<DeviceInner<C, T, B>>, // reference to device state
- Vec<thread::JoinHandle<()>>, // join handles for workers
-);
+pub struct Device<C: Callbacks, T: Tun, B: Bind> {
+ pub state: Arc<DeviceInner<C, T, B>>, // reference to device state
+ pub handles: Vec<thread::JoinHandle<()>>, // join handles for workers
+ pub queue_next: AtomicUsize, // next round-robin index
+ pub queues: Vec<spin::Mutex<SyncSender<JobParallel>>>, // work queues (1 per thread)
+}
impl<C: Callbacks, T: Tun, B: Bind> Drop for Device<C, T, B> {
fn drop(&mut self) {
- // mark device as stopped
- let device = &self.0;
- device.running.store(false, Ordering::SeqCst);
+ // drop all queues
+ while self.queues.pop().is_some() {}
// join all worker threads
- while match self.1.pop() {
+ while match self.handles.pop() {
Some(handle) => {
handle.thread().unpark();
handle.join().unwrap();
@@ -98,8 +93,8 @@ impl<O: Opaque, R: Callback<O>, S: Callback<O>, K: KeyCallback<O>, T: Tun, B: Bi
num_workers: usize,
tun: T,
bind: B,
- call_recv: R,
call_send: S,
+ call_recv: R,
call_need_key: K,
) -> Device<PhantomCallbacks<O, R, S, K>, T, B> {
// allocate shared device state
@@ -109,36 +104,31 @@ impl<O: Opaque, R: Callback<O>, S: Callback<O>, K: KeyCallback<O>, T: Tun, B: Bi
call_recv,
call_send,
call_need_key,
- waker: (Mutex::new(()), Condvar::new()),
- running: AtomicBool::new(true),
- injector: Injector::new(),
recv: spin::RwLock::new(HashMap::new()),
ipv4: spin::RwLock::new(IpLookupTable::new()),
ipv6: spin::RwLock::new(IpLookupTable::new()),
});
- // allocate work pool resources
- let mut workers = Vec::with_capacity(num_workers);
- let mut stealers = Vec::with_capacity(num_workers);
- for _ in 0..num_workers {
- let w = Worker::new_fifo();
- stealers.push(w.stealer());
- workers.push(w);
- }
-
// start worker threads
+ let mut queues = Vec::with_capacity(num_workers);
let mut threads = Vec::with_capacity(num_workers);
for _ in 0..num_workers {
+ // allocate work queue
+ let (tx, rx) = sync_channel(128);
+ queues.push(spin::Mutex::new(tx));
+
+ // start worker thread
let device = inner.clone();
- let stealers = stealers.clone();
- let worker = workers.pop().unwrap();
- threads.push(thread::spawn(move || {
- worker_parallel(device, worker, stealers)
- }));
+ threads.push(thread::spawn(move || worker_parallel(device, rx)));
}
// return exported device handle
- Device(inner, threads)
+ Device {
+ state: inner,
+ handles: threads,
+ queue_next: AtomicUsize::new(0),
+ queues: queues,
+ }
}
}
@@ -149,7 +139,7 @@ impl<C: Callbacks, T: Tun, B: Bind> Device<C, T, B> {
///
/// A atomic ref. counted peer (with liftime matching the device)
pub fn new_peer(&self, opaque: C::Opaque) -> Peer<C, T, B> {
- peer::new_peer(self.0.clone(), opaque)
+ peer::new_peer(self.state.clone(), opaque)
}
/// Cryptkey routes and sends a plaintext message (IP packet)
@@ -177,7 +167,7 @@ impl<C: Callbacks, T: Tun, B: Bind> Device<C, T, B> {
let dst = Ipv4Addr::from(dst);
// lookup peer (project unto and clone "value" field)
- self.0
+ self.state
.ipv4
.read()
.longest_match(dst)
@@ -195,7 +185,7 @@ impl<C: Callbacks, T: Tun, B: Bind> Device<C, T, B> {
let dst = Ipv6Addr::from(dst);
// lookup peer (project unto and clone "value" field)
- self.0
+ self.state
.ipv6
.read()
.longest_match(dst)
@@ -210,12 +200,12 @@ impl<C: Callbacks, T: Tun, B: Bind> Device<C, T, B> {
// schedule for encryption and transmission to peer
if let Some(job) = peer.send_job(msg) {
- // add job to parallel worker pool
- self.0.injector.push((peer.clone(), job));
-
- // ensure workers running, TODO: something faster
- let &(_, ref cvar) = &self.0.waker;
- cvar.notify_all();
+ // add job to worker queue
+ let idx = self.queue_next.fetch_add(1, Ordering::SeqCst);
+ self.queues[idx % self.queues.len()]
+ .lock()
+ .send(job)
+ .unwrap();
}
Ok(())
diff --git a/src/router/peer.rs b/src/router/peer.rs
index a85d87a..a31dfcf 100644
--- a/src/router/peer.rs
+++ b/src/router/peer.rs
@@ -22,9 +22,12 @@ use super::device::DeviceInner;
use super::device::EncryptionState;
use super::messages::TransportHeader;
+use futures::sync::oneshot;
+use futures::*;
+
+use super::workers::Operation;
use super::workers::{worker_inbound, worker_outbound};
-use super::workers::{JobBuffer, JobInbound, JobInner, JobOutbound};
-use super::workers::{Operation, Status};
+use super::workers::{JobBuffer, JobInbound, JobOutbound, JobParallel};
use super::types::Callbacks;
@@ -40,11 +43,9 @@ pub struct KeyWheel {
pub struct PeerInner<C: Callbacks, T: Tun, B: Bind> {
pub stopped: AtomicBool,
pub opaque: C::Opaque,
- pub outbound: Mutex<ArrayDeque<[JobOutbound; MAX_STAGED_PACKETS], Saturating>>,
- pub inbound: Mutex<ArrayDeque<[JobInbound<C, T, B>; MAX_STAGED_PACKETS], Saturating>>,
+ pub outbound: Mutex<SyncSender<JobOutbound>>,
+ pub inbound: Mutex<SyncSender<JobInbound<C, T, B>>>,
pub device: Arc<DeviceInner<C, T, B>>,
- pub thread_outbound: Mutex<Option<thread::JoinHandle<()>>>,
- pub thread_inbound: Mutex<Option<thread::JoinHandle<()>>>,
pub staged_packets: Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake
pub rx_bytes: AtomicU64, // received bytes
pub tx_bytes: AtomicU64, // transmitted bytes
@@ -53,7 +54,11 @@ pub struct PeerInner<C: Callbacks, T: Tun, B: Bind> {
pub endpoint: Mutex<Option<Arc<SocketAddr>>>,
}
-pub struct Peer<C: Callbacks, T: Tun, B: Bind>(Arc<PeerInner<C, T, B>>);
+pub struct Peer<C: Callbacks, T: Tun, B: Bind> {
+ state: Arc<PeerInner<C, T, B>>,
+ thread_outbound: thread::JoinHandle<()>,
+ thread_inbound: thread::JoinHandle<()>,
+}
fn treebit_list<A, E, C: Callbacks, T: Tun, B: Bind>(
peer: &Arc<PeerInner<C, T, B>>,
@@ -86,15 +91,15 @@ fn treebit_remove<A: Address, C: Callbacks, T: Tun, B: Bind>(
for subnet in m.iter() {
let (ip, masklen, p) = subnet;
if let Some(p) = p.upgrade() {
- if Arc::ptr_eq(&p, &peer.0) {
+ if Arc::ptr_eq(&p, &peer.state) {
subnets.push((ip, masklen))
}
}
}
// remove all key mappings
- for subnet in subnets {
- let r = m.remove(subnet.0, subnet.1);
+ for (ip, masklen) in subnets {
+ let r = m.remove(ip, masklen);
debug_assert!(r.is_some());
}
}
@@ -103,7 +108,7 @@ impl<C: Callbacks, T: Tun, B: Bind> Drop for Peer<C, T, B> {
fn drop(&mut self) {
// mark peer as stopped
- let peer = &self.0;
+ let peer = &self.state;
peer.stopped.store(true, Ordering::SeqCst);
// remove from cryptkey router
@@ -111,22 +116,6 @@ impl<C: Callbacks, T: Tun, B: Bind> Drop for Peer<C, T, B> {
treebit_remove(self, &peer.device.ipv4);
treebit_remove(self, &peer.device.ipv6);
- // unpark threads
-
- peer.thread_inbound
- .lock()
- .as_ref()
- .unwrap()
- .thread()
- .unpark();
-
- peer.thread_outbound
- .lock()
- .as_ref()
- .unwrap()
- .thread()
- .unpark();
-
// release ids from the receiver map
let mut keys = peer.keys.lock();
@@ -158,13 +147,16 @@ pub fn new_peer<C: Callbacks, T: Tun, B: Bind>(
device: Arc<DeviceInner<C, T, B>>,
opaque: C::Opaque,
) -> Peer<C, T, B> {
+ let (out_tx, out_rx) = sync_channel(128);
+ let (in_tx, in_rx) = sync_channel(128);
+
// allocate peer object
let peer = {
let device = device.clone();
Arc::new(PeerInner {
opaque,
- inbound: Mutex::new(ArrayDeque::new()),
- outbound: Mutex::new(ArrayDeque::new()),
+ inbound: Mutex::new(in_tx),
+ outbound: Mutex::new(out_tx),
stopped: AtomicBool::new(false),
device: device,
ekey: spin::Mutex::new(None),
@@ -178,26 +170,28 @@ pub fn new_peer<C: Callbacks, T: Tun, B: Bind>(
rx_bytes: AtomicU64::new(0),
tx_bytes: AtomicU64::new(0),
staged_packets: spin::Mutex::new(ArrayDeque::new()),
- thread_inbound: spin::Mutex::new(None),
- thread_outbound: spin::Mutex::new(None),
})
};
// spawn outbound thread
- *peer.thread_inbound.lock() = {
+ let thread_inbound = {
let peer = peer.clone();
let device = device.clone();
- Some(thread::spawn(move || worker_outbound(device, peer)))
+ thread::spawn(move || worker_outbound(device, peer, out_rx))
};
// spawn inbound thread
- *peer.thread_outbound.lock() = {
+ let thread_outbound = {
let peer = peer.clone();
let device = device.clone();
- Some(thread::spawn(move || worker_inbound(device, peer)))
+ thread::spawn(move || worker_inbound(device, peer, in_rx))
};
- Peer(peer)
+ Peer {
+ state: peer,
+ thread_inbound,
+ thread_outbound,
+ }
}
impl<C: Callbacks, T: Tun, B: Bind> PeerInner<C, T, B> {
@@ -209,7 +203,7 @@ impl<C: Callbacks, T: Tun, B: Bind> PeerInner<C, T, B> {
// rotate key-wheel
}
- pub fn send_job(&self, mut msg: Vec<u8>) -> Option<JobBuffer> {
+ pub fn send_job(&self, mut msg: Vec<u8>) -> Option<JobParallel> {
debug_assert!(msg.len() >= mem::size_of::<TransportHeader>());
// parse / cast
@@ -239,29 +233,26 @@ impl<C: Callbacks, T: Tun, B: Bind> PeerInner<C, T, B> {
}
};
- // create job
- let job = Arc::new(spin::Mutex::new(JobInner {
- msg,
- key,
- status: Status::Waiting,
- op: Operation::Encryption,
- }));
-
- // add job to in-order queue and return to device for inclusion in worker pool
- match self.outbound.lock().push_back(job.clone()) {
- Ok(_) => Some(job),
+ // add job to in-order queue and return sendeer to device for inclusion in worker pool
+ let (tx, rx) = oneshot();
+ match self.outbound.lock().try_send(rx) {
+ Ok(_) => Some((
+ tx,
+ JobBuffer {
+ msg,
+ key,
+ okay: false,
+ op: Operation::Encryption,
+ },
+ )),
Err(_) => None,
}
}
}
impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> {
- fn new(inner: PeerInner<C, T, B>) -> Peer<C, T, B> {
- Peer(Arc::new(inner))
- }
-
pub fn set_endpoint(&self, endpoint: SocketAddr) {
- *self.0.endpoint.lock() = Some(Arc::new(endpoint))
+ *self.state.endpoint.lock() = Some(Arc::new(endpoint))
}
/// Add a new keypair
@@ -275,7 +266,7 @@ impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> {
/// A vector of ids which has been released.
/// These should be released in the handshake module.
pub fn add_keypair(&self, new: KeyPair) -> Vec<u32> {
- let mut keys = self.0.keys.lock();
+ let mut keys = self.state.keys.lock();
let mut release = Vec::with_capacity(2);
let new = Arc::new(new);
@@ -286,7 +277,7 @@ impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> {
// update key-wheel
if new.initiator {
// start using key for encryption
- *self.0.ekey.lock() = Some(EncryptionState {
+ *self.state.ekey.lock() = Some(EncryptionState {
id: new.send.id,
key: new.send.key,
nonce: 0,
@@ -294,17 +285,17 @@ impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> {
});
// move current into previous
- keys.previous = keys.current.as_ref().map(|v| v.clone());;
+ keys.previous = keys.current.as_ref().map(|v| v.clone());
keys.current = Some(new.clone());
} else {
// store the key and await confirmation
- keys.previous = keys.next.as_ref().map(|v| v.clone());;
+ keys.previous = keys.next.as_ref().map(|v| v.clone());
keys.next = Some(new.clone());
};
// update incoming packet id map
{
- let mut recv = self.0.device.recv.write();
+ let mut recv = self.state.device.recv.write();
// purge recv map of released ids
for id in &release {
@@ -321,7 +312,7 @@ impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> {
keypair: Arc::downgrade(&new),
key: new.recv.key,
protector: spin::Mutex::new(AntiReplay::new()),
- peer: Arc::downgrade(&self.0),
+ peer: Arc::downgrade(&self.state),
death: new.birth + REJECT_AFTER_TIME,
},
);
@@ -332,28 +323,28 @@ impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> {
}
pub fn rx_bytes(&self) -> u64 {
- self.0.rx_bytes.load(Ordering::Relaxed)
+ self.state.rx_bytes.load(Ordering::Relaxed)
}
pub fn tx_bytes(&self) -> u64 {
- self.0.tx_bytes.load(Ordering::Relaxed)
+ self.state.tx_bytes.load(Ordering::Relaxed)
}
pub fn add_subnet(&self, ip: IpAddr, masklen: u32) {
match ip {
IpAddr::V4(v4) => {
- self.0
+ self.state
.device
.ipv4
.write()
- .insert(v4, masklen, Arc::downgrade(&self.0))
+ .insert(v4, masklen, Arc::downgrade(&self.state))
}
IpAddr::V6(v6) => {
- self.0
+ self.state
.device
.ipv6
.write()
- .insert(v6, masklen, Arc::downgrade(&self.0))
+ .insert(v6, masklen, Arc::downgrade(&self.state))
}
};
}
@@ -361,21 +352,21 @@ impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> {
pub fn list_subnets(&self) -> Vec<(IpAddr, u32)> {
let mut res = Vec::new();
res.append(&mut treebit_list(
- &self.0,
- &self.0.device.ipv4,
+ &self.state,
+ &self.state.device.ipv4,
Box::new(|ip, masklen| (IpAddr::V4(ip), masklen)),
));
res.append(&mut treebit_list(
- &self.0,
- &self.0.device.ipv6,
+ &self.state,
+ &self.state.device.ipv6,
Box::new(|ip, masklen| (IpAddr::V6(ip), masklen)),
));
res
}
pub fn remove_subnets(&self) {
- treebit_remove(self, &self.0.device.ipv4);
- treebit_remove(self, &self.0.device.ipv6);
+ treebit_remove(self, &self.state.device.ipv4);
+ treebit_remove(self, &self.state.device.ipv6);
}
fn send(&self, msg: Vec<u8>) {}
diff --git a/src/router/tests.rs b/src/router/tests.rs
index d626aeb..1e049c4 100644
--- a/src/router/tests.rs
+++ b/src/router/tests.rs
@@ -131,7 +131,7 @@ fn test_outbound() {
workers,
TunTest {},
BindTest {},
- |t: &Arc<AtomicBool>, data: bool, sent: bool| {},
+ |t: &Arc<AtomicBool>, data: bool, sent: bool| println!("send"),
|t: &Arc<AtomicBool>, data: bool, sent: bool| {},
|t: &Arc<AtomicBool>| t.store(true, Ordering::SeqCst),
);
diff --git a/src/router/workers.rs b/src/router/workers.rs
index 0e68954..e79502f 100644
--- a/src/router/workers.rs
+++ b/src/router/workers.rs
@@ -5,6 +5,9 @@ use std::sync::mpsc::{sync_channel, Receiver, TryRecvError};
use std::sync::{Arc, Weak};
use std::thread;
+use futures::sync::oneshot;
+use futures::*;
+
use spin;
use crossbeam_deque::{Injector, Steal, Stealer, Worker};
@@ -25,93 +28,27 @@ pub enum Operation {
Decryption,
}
-#[derive(PartialEq, Debug)]
-pub enum Status {
- Fault, // unsealing failed
- Done, // job valid and complete
- Waiting, // job awaiting completion
+pub struct JobBuffer {
+ pub msg: Vec<u8>, // message buffer (nonce and receiver id set)
+ pub key: [u8; 32], // chacha20poly1305 key
+ pub okay: bool, // state of the job
+ pub op: Operation, // should be buffer be encrypted / decrypted?
}
-pub struct JobInner {
- pub msg: Vec<u8>, // message buffer (nonce and receiver id set)
- pub key: [u8; 32], // chacha20poly1305 key
- pub status: Status, // state of the job
- pub op: Operation, // should be buffer be encrypted / decrypted?
-}
-
-pub type JobBuffer = Arc<spin::Mutex<JobInner>>;
-pub type JobParallel<C, T, B> = (Arc<PeerInner<C, T, B>>, JobBuffer);
-pub type JobInbound<C, T, B> = (Weak<DecryptionState<C, T, B>>, JobBuffer);
-pub type JobOutbound = JobBuffer;
-
-/* Strategy for workers acquiring a new job:
- *
- * 1. Try the local job queue (owned by the thread)
- * 2. Try fetching a batch of jobs from the global injector
- * 3. Attempt to steal jobs from other threads.
- */
-fn find_task<T>(local: &Worker<T>, global: &Injector<T>, stealers: &[Stealer<T>]) -> Option<T> {
- local.pop().or_else(|| {
- iter::repeat_with(|| {
- global
- .steal_batch_and_pop(local)
- .or_else(|| stealers.iter().map(|s| s.steal()).collect())
- })
- .find(|s| !s.is_retry())
- .and_then(|s| s.success())
- })
-}
-
-fn wait_buffer(running: AtomicBool, buf: &JobBuffer) {
- while running.load(Ordering::Acquire) {
- match buf.try_lock() {
- None => (),
- Some(buf) => {
- if buf.status == Status::Waiting {
- return;
- }
- }
- };
- thread::park();
- }
-}
-
-fn wait_recv<T>(running: &AtomicBool, recv: &Receiver<T>) -> Result<T, TryRecvError> {
- while running.load(Ordering::Acquire) {
- match recv.try_recv() {
- Err(TryRecvError::Empty) => (),
- value => {
- return value;
- }
- };
- thread::park();
- }
- return Err(TryRecvError::Disconnected);
-}
+pub type JobParallel = (oneshot::Sender<JobBuffer>, JobBuffer);
+pub type JobInbound<C, T, B> = (Weak<DecryptionState<C, T, B>>, oneshot::Receiver<JobBuffer>);
+pub type JobOutbound = oneshot::Receiver<JobBuffer>;
pub fn worker_inbound<C: Callbacks, T: Tun, B: Bind>(
device: Arc<DeviceInner<C, T, B>>, // related device
peer: Arc<PeerInner<C, T, B>>, // related peer
+ receiver: Receiver<JobInbound<C, T, B>>,
) {
- while !peer.stopped.load(Ordering::Acquire) {
- inner(&device, &peer)
- }
-
+ /*
fn inner<C: Callbacks, T: Tun, B: Bind>(
device: &Arc<DeviceInner<C, T, B>>,
peer: &Arc<PeerInner<C, T, B>>,
) {
- // wait for job to be submitted
- let (state, buf) = loop {
- match peer.inbound.lock().pop_front() {
- Some(elem) => break elem,
- _ => (),
- }
-
- // default is to park
- thread::park()
- };
-
// wait for job to complete
loop {
match buf.try_lock() {
@@ -167,136 +104,94 @@ pub fn worker_inbound<C: Callbacks, T: Tun, B: Bind>(
thread::park()
}
}
+ */
}
pub fn worker_outbound<C: Callbacks, T: Tun, B: Bind>(
device: Arc<DeviceInner<C, T, B>>, // related device
peer: Arc<PeerInner<C, T, B>>, // related peer
+ receiver: Receiver<JobOutbound>,
) {
- while !peer.stopped.load(Ordering::Acquire) {
- inner(&device, &peer)
- }
-
- fn inner<C: Callbacks, T: Tun, B: Bind>(
- device: &Arc<DeviceInner<C, T, B>>,
- peer: &Arc<PeerInner<C, T, B>>,
- ) {
- // wait for job to be submitted
- let (state, buf) = loop {
- match peer.inbound.lock().pop_front() {
- Some(elem) => break elem,
- _ => (),
+ loop {
+ // fetch job
+ let rx = match receiver.recv() {
+ Ok(v) => v,
+ _ => {
+ return;
}
-
- // default is to park
- thread::park()
};
// wait for job to complete
- loop {
- match buf.try_lock() {
- None => (),
- Some(buf) => match buf.status {
- Status::Fault => break (),
- Status::Done => {
- // parse / cast
- let (header, packet) = match LayoutVerified::new_from_prefix(&buf.msg[..]) {
- Some(v) => v,
- None => continue,
- };
- let header: LayoutVerified<&[u8], TransportHeader> = header;
-
- // write to UDP device, TODO
- let xmit = false;
-
- // trigger callback
- (device.call_send)(
- &peer.opaque,
- buf.msg.len()
- > CHACHA20_POLY1305.nonce_len() + mem::size_of::<TransportHeader>(),
- xmit,
- );
- break;
- }
- _ => (),
- },
- };
-
- // default is to park
- thread::park()
- }
+ let _ = rx
+ .map(|buf| {
+ if buf.okay {
+ // write to UDP device, TODO
+ let xmit = false;
+
+ // trigger callback
+ (device.call_send)(
+ &peer.opaque,
+ buf.msg.len()
+ > CHACHA20_POLY1305.nonce_len() + mem::size_of::<TransportHeader>(),
+ xmit,
+ );
+ }
+ })
+ .wait();
}
}
pub fn worker_parallel<C: Callbacks, T: Tun, B: Bind>(
device: Arc<DeviceInner<C, T, B>>,
- local: Worker<JobParallel<C, T, B>>, // local job queue (local to thread)
- stealers: Vec<Stealer<JobParallel<C, T, B>>>, // stealers (from other threads)
+ receiver: Receiver<JobParallel>,
) {
- while device.running.load(Ordering::SeqCst) {
- match find_task(&local, &device.injector, &stealers) {
- Some(job) => {
- let (peer, buf) = job;
-
- // take ownership of the job buffer and complete it
- {
- let mut buf = buf.lock();
-
- // cast and check size of packet
- let (header, packet) = match LayoutVerified::new_from_prefix(&buf.msg[..]) {
- Some(v) => v,
- None => continue,
- };
+ loop {
+ // fetch next job
+ let (tx, mut buf) = match receiver.recv() {
+ Err(_) => {
+ return;
+ }
+ Ok(val) => val,
+ };
- if packet.len() < CHACHA20_POLY1305.nonce_len() {
- continue;
- }
+ // cast and check size of packet
+ let (header, packet) = match LayoutVerified::new_from_prefix(&buf.msg[..]) {
+ Some(v) => v,
+ None => continue,
+ };
- let header: LayoutVerified<&[u8], TransportHeader> = header;
+ if packet.len() < CHACHA20_POLY1305.nonce_len() {
+ continue;
+ }
- // do the weird ring AEAD dance
- let key = LessSafeKey::new(
- UnboundKey::new(&CHACHA20_POLY1305, &buf.key[..]).unwrap(),
- );
+ let header: LayoutVerified<&[u8], TransportHeader> = header;
- // create a nonce object
- let mut nonce = [0u8; 12];
- debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len()); // why the this is not a constant, god knows...
- nonce[4..].copy_from_slice(header.f_counter.as_bytes());
- let nonce = Nonce::assume_unique_for_key(nonce);
+ // do the weird ring AEAD dance
+ let key = LessSafeKey::new(UnboundKey::new(&CHACHA20_POLY1305, &buf.key[..]).unwrap());
- match buf.op {
- Operation::Encryption => {
- // note: extends the vector to accommodate the tag
- key.seal_in_place_append_tag(nonce, Aad::empty(), &mut buf.msg)
- .unwrap();
- buf.status = Status::Done;
- }
- Operation::Decryption => {
- // opening failure is signaled by fault state
- buf.status = match key.open_in_place(nonce, Aad::empty(), &mut buf.msg)
- {
- Ok(_) => Status::Done,
- Err(_) => Status::Fault,
- };
- }
- }
- }
+ // create a nonce object
+ let mut nonce = [0u8; 12];
+ debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len());
+ nonce[4..].copy_from_slice(header.f_counter.as_bytes());
+ let nonce = Nonce::assume_unique_for_key(nonce);
- // ensure consumer is unparked (TODO: better looking + wrap in atomic?)
- peer.thread_outbound
- .lock()
- .as_ref()
- .unwrap()
- .thread()
- .unpark();
+ match buf.op {
+ Operation::Encryption => {
+ // note: extends the vector to accommodate the tag
+ key.seal_in_place_append_tag(nonce, Aad::empty(), &mut buf.msg)
+ .unwrap();
+ buf.okay = true;
}
- None => {
- // wait for notification from device
- let &(ref lock, ref cvar) = &device.waker;
- let mut guard = lock.lock();
- cvar.wait(&mut guard);
+ Operation::Decryption => {
+ // opening failure is signaled by fault state
+ buf.okay = match key.open_in_place(nonce, Aad::empty(), &mut buf.msg) {
+ Ok(_) => true,
+ Err(_) => false,
+ };
}
}
+
+ // pass ownership to consumer
+ let _ = tx.send(buf);
}
}