summaryrefslogtreecommitdiffstats
path: root/src/router/workers.rs
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2019-09-02 23:32:07 +0200
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2019-09-02 23:32:07 +0200
commitf55014ef8ff7308d37be9c92f41b9ccf809c719d (patch)
tree0a8ea562dd872b1a5a0a2271256bd1b6d6d5d731 /src/router/workers.rs
parentReconsider inorder queueing (diff)
downloadwireguard-rs-f55014ef8ff7308d37be9c92f41b9ccf809c719d.tar.xz
wireguard-rs-f55014ef8ff7308d37be9c92f41b9ccf809c719d.zip
Wake workers when submitting work
Diffstat (limited to 'src/router/workers.rs')
-rw-r--r--src/router/workers.rs233
1 files changed, 127 insertions, 106 deletions
diff --git a/src/router/workers.rs b/src/router/workers.rs
index 241b06f..0e68954 100644
--- a/src/router/workers.rs
+++ b/src/router/workers.rs
@@ -90,69 +90,81 @@ fn wait_recv<T>(running: &AtomicBool, recv: &Receiver<T>) -> Result<T, TryRecvEr
}
pub fn worker_inbound<C: Callbacks, T: Tun, B: Bind>(
- device: Arc<DeviceInner<C, T, B>>, // related device
- peer: Arc<PeerInner<C, T, B>>, // related peer
- recv: Receiver<JobInbound<C, T, B>>, // in order queue
+ device: Arc<DeviceInner<C, T, B>>, // related device
+ peer: Arc<PeerInner<C, T, B>>, // related peer
) {
- loop {
- match wait_recv(&peer.stopped, &recv) {
- Ok((state, buf)) => {
- while !peer.stopped.load(Ordering::Acquire) {
- match buf.try_lock() {
- None => (),
- Some(buf) => match buf.status {
- Status::Done => {
- // parse / cast
- let (header, packet) =
- match LayoutVerified::new_from_prefix(&buf.msg[..]) {
- Some(v) => v,
- None => continue,
- };
- let header: LayoutVerified<&[u8], TransportHeader> = header;
-
- // obtain strong reference to decryption state
- let state = if let Some(state) = state.upgrade() {
- state
- } else {
- break;
- };
-
- // check for replay
- if !state.protector.lock().update(header.f_counter.get()) {
- break;
- }
-
- // check for confirms key
- if !state.confirmed.swap(true, Ordering::SeqCst) {
- peer.confirm_key(state.keypair.clone());
- }
-
- // update enpoint, TODO
-
- // write packet to TUN device, TODO
-
- // trigger callback
- debug_assert!(
- packet.len() >= CHACHA20_POLY1305.nonce_len(),
- "this should be checked earlier in the pipeline"
- );
- (device.call_recv)(
- &peer.opaque,
- packet.len() > CHACHA20_POLY1305.nonce_len(),
- true,
- );
- break;
- }
- Status::Fault => break,
- _ => (),
- },
- };
- thread::park();
- }
- }
- Err(_) => {
- break;
+ while !peer.stopped.load(Ordering::Acquire) {
+ inner(&device, &peer)
+ }
+
+ fn inner<C: Callbacks, T: Tun, B: Bind>(
+ device: &Arc<DeviceInner<C, T, B>>,
+ peer: &Arc<PeerInner<C, T, B>>,
+ ) {
+ // wait for job to be submitted
+ let (state, buf) = loop {
+ match peer.inbound.lock().pop_front() {
+ Some(elem) => break elem,
+ _ => (),
}
+
+ // default is to park
+ thread::park()
+ };
+
+ // wait for job to complete
+ loop {
+ match buf.try_lock() {
+ None => (),
+ Some(buf) => match buf.status {
+ Status::Fault => break (),
+ Status::Done => {
+ // parse / cast
+ let (header, packet) = match LayoutVerified::new_from_prefix(&buf.msg[..]) {
+ Some(v) => v,
+ None => continue,
+ };
+ let header: LayoutVerified<&[u8], TransportHeader> = header;
+
+ // obtain strong reference to decryption state
+ let state = if let Some(state) = state.upgrade() {
+ state
+ } else {
+ break;
+ };
+
+ // check for replay
+ if !state.protector.lock().update(header.f_counter.get()) {
+ break;
+ }
+
+ // check for confirms key
+ if !state.confirmed.swap(true, Ordering::SeqCst) {
+ peer.confirm_key(state.keypair.clone());
+ }
+
+ // update endpoint, TODO
+
+ // write packet to TUN device, TODO
+
+ // trigger callback
+ debug_assert!(
+ packet.len() >= CHACHA20_POLY1305.nonce_len(),
+ "this should be checked earlier in the pipeline"
+ );
+ (device.call_recv)(
+ &peer.opaque,
+ packet.len() > CHACHA20_POLY1305.nonce_len(),
+ true,
+ );
+ break;
+ }
+ _ => (),
+ },
+ };
+
+ // default is to park
+ thread::park()
}
}
}
@@ -160,48 +172,58 @@ pub fn worker_inbound<C: Callbacks, T: Tun, B: Bind>(
pub fn worker_outbound<C: Callbacks, T: Tun, B: Bind>(
device: Arc<DeviceInner<C, T, B>>, // related device
peer: Arc<PeerInner<C, T, B>>, // related peer
- recv: Receiver<JobOutbound>, // in order queue
) {
- loop {
- match wait_recv(&peer.stopped, &recv) {
- Ok(buf) => {
- while !peer.stopped.load(Ordering::Acquire) {
- match buf.try_lock() {
- None => (), // nothing to do
- Some(buf) => match buf.status {
- Status::Done => {
- // parse / cast
- let (header, packet) =
- match LayoutVerified::new_from_prefix(&buf.msg[..]) {
- Some(v) => v,
- None => continue,
- };
- let header: LayoutVerified<&[u8], TransportHeader> = header;
-
- // write to UDP device, TODO
- let xmit = false;
-
- // trigger callback
- (device.call_send)(
- &peer.opaque,
- buf.msg.len()
- > CHACHA20_POLY1305.nonce_len()
- + mem::size_of::<TransportHeader>(),
- xmit,
- );
- break;
- }
- Status::Fault => break,
- _ => (),
- },
- };
- thread::park();
- }
- }
- Err(e) => {
- println!("park outbound! {:?}", e);
- break;
+ while !peer.stopped.load(Ordering::Acquire) {
+ inner(&device, &peer)
+ }
+
+ fn inner<C: Callbacks, T: Tun, B: Bind>(
+ device: &Arc<DeviceInner<C, T, B>>,
+ peer: &Arc<PeerInner<C, T, B>>,
+ ) {
+ // wait for job to be submitted
+ let (state, buf) = loop {
+ match peer.inbound.lock().pop_front() {
+ Some(elem) => break elem,
+ _ => (),
}
+
+ // default is to park
+ thread::park()
+ };
+
+ // wait for job to complete
+ loop {
+ match buf.try_lock() {
+ None => (),
+ Some(buf) => match buf.status {
+ Status::Fault => break (),
+ Status::Done => {
+ // parse / cast
+ let (header, packet) = match LayoutVerified::new_from_prefix(&buf.msg[..]) {
+ Some(v) => v,
+ None => continue,
+ };
+ let header: LayoutVerified<&[u8], TransportHeader> = header;
+
+ // write to UDP device, TODO
+ let xmit = false;
+
+ // trigger callback
+ (device.call_send)(
+ &peer.opaque,
+ buf.msg.len()
+ > CHACHA20_POLY1305.nonce_len() + mem::size_of::<TransportHeader>(),
+ xmit,
+ );
+ break;
+ }
+ _ => (),
+ },
+ };
+
+ // default is to park
+ thread::park()
}
}
}
@@ -212,11 +234,9 @@ pub fn worker_parallel<C: Callbacks, T: Tun, B: Bind>(
stealers: Vec<Stealer<JobParallel<C, T, B>>>, // stealers (from other threads)
) {
while device.running.load(Ordering::SeqCst) {
- println!("running");
match find_task(&local, &device.injector, &stealers) {
Some(job) => {
let (peer, buf) = job;
- println!("jobs!");
// take ownership of the job buffer and complete it
{
@@ -272,9 +292,10 @@ pub fn worker_parallel<C: Callbacks, T: Tun, B: Bind>(
.unpark();
}
None => {
- println!("park");
- device.parked.store(true, Ordering::Release);
- thread::park();
+ // wait for notification from device
+ let &(ref lock, ref cvar) = &device.waker;
+ let mut guard = lock.lock();
+ cvar.wait(&mut guard);
}
}
}