aboutsummaryrefslogtreecommitdiffstats
path: root/src/router/device.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/router/device.rs')
-rw-r--r--src/router/device.rs80
1 files changed, 35 insertions, 45 deletions
diff --git a/src/router/device.rs b/src/router/device.rs
index ec6453d..58ca2f6 100644
--- a/src/router/device.rs
+++ b/src/router/device.rs
@@ -1,14 +1,13 @@
use std::cmp;
use std::collections::HashMap;
use std::net::{Ipv4Addr, Ipv6Addr};
-use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
+use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
+use std::sync::mpsc::sync_channel;
+use std::sync::mpsc::SyncSender;
use std::sync::{Arc, Weak};
use std::thread;
use std::time::Instant;
-use parking_lot::{Condvar, Mutex};
-
-use crossbeam_deque::{Injector, Worker};
use spin;
use treebitmap::IpLookupTable;
@@ -41,11 +40,6 @@ pub struct DeviceInner<C: Callbacks, T: Tun, B: Bind> {
pub call_send: C::CallbackSend,
pub call_need_key: C::CallbackKey,
- // threading and workers
- pub waker: (Mutex<()>, Condvar),
- pub running: AtomicBool, // workers running?
- pub injector: Injector<JobParallel<C, T, B>>, // parallel enc/dec task injector
-
// routing
pub recv: spin::RwLock<HashMap<u32, DecryptionState<C, T, B>>>, // receiver id -> decryption state
pub ipv4: spin::RwLock<IpLookupTable<Ipv4Addr, Weak<PeerInner<C, T, B>>>>, // ipv4 cryptkey routing
@@ -68,19 +62,20 @@ pub struct DecryptionState<C: Callbacks, T: Tun, B: Bind> {
pub death: Instant, // time when the key can no longer be used for decryption
}
-pub struct Device<C: Callbacks, T: Tun, B: Bind>(
- Arc<DeviceInner<C, T, B>>, // reference to device state
- Vec<thread::JoinHandle<()>>, // join handles for workers
-);
+pub struct Device<C: Callbacks, T: Tun, B: Bind> {
+ pub state: Arc<DeviceInner<C, T, B>>, // reference to device state
+ pub handles: Vec<thread::JoinHandle<()>>, // join handles for workers
+ pub queue_next: AtomicUsize, // next round-robin index
+ pub queues: Vec<spin::Mutex<SyncSender<JobParallel>>>, // work queues (1 per thread)
+}
impl<C: Callbacks, T: Tun, B: Bind> Drop for Device<C, T, B> {
fn drop(&mut self) {
- // mark device as stopped
- let device = &self.0;
- device.running.store(false, Ordering::SeqCst);
+ // drop all queues
+ while self.queues.pop().is_some() {}
// join all worker threads
- while match self.1.pop() {
+ while match self.handles.pop() {
Some(handle) => {
handle.thread().unpark();
handle.join().unwrap();
@@ -98,8 +93,8 @@ impl<O: Opaque, R: Callback<O>, S: Callback<O>, K: KeyCallback<O>, T: Tun, B: Bi
num_workers: usize,
tun: T,
bind: B,
- call_recv: R,
call_send: S,
+ call_recv: R,
call_need_key: K,
) -> Device<PhantomCallbacks<O, R, S, K>, T, B> {
// allocate shared device state
@@ -109,36 +104,31 @@ impl<O: Opaque, R: Callback<O>, S: Callback<O>, K: KeyCallback<O>, T: Tun, B: Bi
call_recv,
call_send,
call_need_key,
- waker: (Mutex::new(()), Condvar::new()),
- running: AtomicBool::new(true),
- injector: Injector::new(),
recv: spin::RwLock::new(HashMap::new()),
ipv4: spin::RwLock::new(IpLookupTable::new()),
ipv6: spin::RwLock::new(IpLookupTable::new()),
});
- // allocate work pool resources
- let mut workers = Vec::with_capacity(num_workers);
- let mut stealers = Vec::with_capacity(num_workers);
- for _ in 0..num_workers {
- let w = Worker::new_fifo();
- stealers.push(w.stealer());
- workers.push(w);
- }
-
// start worker threads
+ let mut queues = Vec::with_capacity(num_workers);
let mut threads = Vec::with_capacity(num_workers);
for _ in 0..num_workers {
+ // allocate work queue
+ let (tx, rx) = sync_channel(128);
+ queues.push(spin::Mutex::new(tx));
+
+ // start worker thread
let device = inner.clone();
- let stealers = stealers.clone();
- let worker = workers.pop().unwrap();
- threads.push(thread::spawn(move || {
- worker_parallel(device, worker, stealers)
- }));
+ threads.push(thread::spawn(move || worker_parallel(device, rx)));
}
// return exported device handle
- Device(inner, threads)
+ Device {
+ state: inner,
+ handles: threads,
+ queue_next: AtomicUsize::new(0),
+ queues: queues,
+ }
}
}
@@ -149,7 +139,7 @@ impl<C: Callbacks, T: Tun, B: Bind> Device<C, T, B> {
///
/// A atomic ref. counted peer (with liftime matching the device)
pub fn new_peer(&self, opaque: C::Opaque) -> Peer<C, T, B> {
- peer::new_peer(self.0.clone(), opaque)
+ peer::new_peer(self.state.clone(), opaque)
}
/// Cryptkey routes and sends a plaintext message (IP packet)
@@ -177,7 +167,7 @@ impl<C: Callbacks, T: Tun, B: Bind> Device<C, T, B> {
let dst = Ipv4Addr::from(dst);
// lookup peer (project unto and clone "value" field)
- self.0
+ self.state
.ipv4
.read()
.longest_match(dst)
@@ -195,7 +185,7 @@ impl<C: Callbacks, T: Tun, B: Bind> Device<C, T, B> {
let dst = Ipv6Addr::from(dst);
// lookup peer (project unto and clone "value" field)
- self.0
+ self.state
.ipv6
.read()
.longest_match(dst)
@@ -210,12 +200,12 @@ impl<C: Callbacks, T: Tun, B: Bind> Device<C, T, B> {
// schedule for encryption and transmission to peer
if let Some(job) = peer.send_job(msg) {
- // add job to parallel worker pool
- self.0.injector.push((peer.clone(), job));
-
- // ensure workers running, TODO: something faster
- let &(_, ref cvar) = &self.0.waker;
- cvar.notify_all();
+ // add job to worker queue
+ let idx = self.queue_next.fetch_add(1, Ordering::SeqCst);
+ self.queues[idx % self.queues.len()]
+ .lock()
+ .send(job)
+ .unwrap();
}
Ok(())