diff options
Diffstat (limited to 'src/router')
-rw-r--r-- | src/router/device.rs | 18 | ||||
-rw-r--r-- | src/router/types.rs | 12 | ||||
-rw-r--r-- | src/router/workers.rs | 3 |
3 files changed, 19 insertions, 14 deletions
diff --git a/src/router/device.rs b/src/router/device.rs index bee4ad4..a7f0590 100644 --- a/src/router/device.rs +++ b/src/router/device.rs @@ -9,7 +9,8 @@ use crossbeam_deque::{Injector, Steal, Stealer, Worker}; use spin; use treebitmap::IpLookupTable; -use super::super::types::KeyPair; +use super::super::types::{Bind, KeyPair, Tun}; + use super::anti_replay::AntiReplay; use super::peer; use super::peer::{Peer, PeerInner}; @@ -62,16 +63,15 @@ impl<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>> Drop for Devi let device = &self.0; device.running.store(false, Ordering::SeqCst); - // eat all parallel jobs - while match device.injector.steal() { - Steal::Empty => true, + // join all worker threads + while match self.1.pop() { + Some(handle) => { + handle.thread().unpark(); + handle.join().unwrap(); + true + } _ => false, } {} - - // unpark all threads - for handle in &self.1 { - handle.thread().unpark(); - } } } diff --git a/src/router/types.rs b/src/router/types.rs index 2ed011b..3d486bc 100644 --- a/src/router/types.rs +++ b/src/router/types.rs @@ -3,7 +3,7 @@ pub trait Opaque: Send + Sync + 'static {} impl<T> Opaque for T where T: Send + Sync + 'static {} /// A send/recv callback takes 3 arguments: -/// +/// /// * `0`, a reference to the opaque value assigned to the peer /// * `1`, a bool indicating whether the message contained data (not just keepalive) /// * `2`, a bool indicating whether the message was transmitted (i.e. did the peer have an associated endpoint?) @@ -12,8 +12,14 @@ pub trait Callback<T>: Fn(&T, bool, bool) -> () + Sync + Send + 'static {} impl<T, F> Callback<T> for F where F: Fn(&T, bool, bool) -> () + Sync + Send + 'static {} /// A key callback takes 1 argument -/// +/// /// * `0`, a reference to the opaque value assigned to the peer pub trait KeyCallback<T>: Fn(&T) -> () + Sync + Send + 'static {} -impl<T, F> KeyCallback<T> for F where F: Fn(&T) -> () + Sync + Send + 'static {}
\ No newline at end of file +impl<T, F> KeyCallback<T> for F where F: Fn(&T) -> () + Sync + Send + 'static {} + +pub trait TunCallback<T>: Fn(&T, bool, bool) -> () + Sync + Send + 'static {} + +pub trait BindCallback<T>: Fn(&T, bool, bool) -> () + Sync + Send + 'static {} + +pub trait Endpoint: Send + Sync {} diff --git a/src/router/workers.rs b/src/router/workers.rs index 1fd2cdf..4861847 100644 --- a/src/router/workers.rs +++ b/src/router/workers.rs @@ -208,7 +208,7 @@ pub fn worker_parallel<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback local: Worker<JobParallel>, // local job queue (local to thread) stealers: Vec<Stealer<JobParallel>>, // stealers (from other threads) ) { - while !device.running.load(Ordering::SeqCst) { + while device.running.load(Ordering::SeqCst) { match find_task(&local, &device.injector, &stealers) { Some(job) => { let (handle, buf) = job; @@ -262,7 +262,6 @@ pub fn worker_parallel<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback handle.thread().unpark(); } None => { - // no jobs, park the worker device.parked.store(true, Ordering::Release); thread::park(); } |