diff options
author | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-09-02 23:32:07 +0200 |
---|---|---|
committer | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-09-02 23:32:07 +0200 |
commit | f55014ef8ff7308d37be9c92f41b9ccf809c719d (patch) | |
tree | 0a8ea562dd872b1a5a0a2271256bd1b6d6d5d731 /src/router/workers.rs | |
parent | Reconsider inorder queueing (diff) | |
download | wireguard-rs-f55014ef8ff7308d37be9c92f41b9ccf809c719d.tar.xz wireguard-rs-f55014ef8ff7308d37be9c92f41b9ccf809c719d.zip |
Wake workers when submitting work
Diffstat (limited to 'src/router/workers.rs')
-rw-r--r-- | src/router/workers.rs | 233 |
1 files changed, 127 insertions, 106 deletions
diff --git a/src/router/workers.rs b/src/router/workers.rs index 241b06f..0e68954 100644 --- a/src/router/workers.rs +++ b/src/router/workers.rs @@ -90,69 +90,81 @@ fn wait_recv<T>(running: &AtomicBool, recv: &Receiver<T>) -> Result<T, TryRecvEr } pub fn worker_inbound<C: Callbacks, T: Tun, B: Bind>( - device: Arc<DeviceInner<C, T, B>>, // related device - peer: Arc<PeerInner<C, T, B>>, // related peer - recv: Receiver<JobInbound<C, T, B>>, // in order queue + device: Arc<DeviceInner<C, T, B>>, // related device + peer: Arc<PeerInner<C, T, B>>, // related peer ) { - loop { - match wait_recv(&peer.stopped, &recv) { - Ok((state, buf)) => { - while !peer.stopped.load(Ordering::Acquire) { - match buf.try_lock() { - None => (), - Some(buf) => match buf.status { - Status::Done => { - // parse / cast - let (header, packet) = - match LayoutVerified::new_from_prefix(&buf.msg[..]) { - Some(v) => v, - None => continue, - }; - let header: LayoutVerified<&[u8], TransportHeader> = header; - - // obtain strong reference to decryption state - let state = if let Some(state) = state.upgrade() { - state - } else { - break; - }; - - // check for replay - if !state.protector.lock().update(header.f_counter.get()) { - break; - } - - // check for confirms key - if !state.confirmed.swap(true, Ordering::SeqCst) { - peer.confirm_key(state.keypair.clone()); - } - - // update enpoint, TODO - - // write packet to TUN device, TODO - - // trigger callback - debug_assert!( - packet.len() >= CHACHA20_POLY1305.nonce_len(), - "this should be checked earlier in the pipeline" - ); - (device.call_recv)( - &peer.opaque, - packet.len() > CHACHA20_POLY1305.nonce_len(), - true, - ); - break; - } - Status::Fault => break, - _ => (), - }, - }; - thread::park(); - } - } - Err(_) => { - break; + while !peer.stopped.load(Ordering::Acquire) { + inner(&device, &peer) + } + + fn inner<C: Callbacks, T: Tun, B: Bind>( + device: &Arc<DeviceInner<C, T, B>>, + peer: &Arc<PeerInner<C, T, B>>, + ) { + // wait for job to be submitted + let (state, buf) = loop { + match peer.inbound.lock().pop_front() { + Some(elem) => break elem, + _ => (), } + + // default is to park + thread::park() + }; + + // wait for job to complete + loop { + match buf.try_lock() { + None => (), + Some(buf) => match buf.status { + Status::Fault => break (), + Status::Done => { + // parse / cast + let (header, packet) = match LayoutVerified::new_from_prefix(&buf.msg[..]) { + Some(v) => v, + None => continue, + }; + let header: LayoutVerified<&[u8], TransportHeader> = header; + + // obtain strong reference to decryption state + let state = if let Some(state) = state.upgrade() { + state + } else { + break; + }; + + // check for replay + if !state.protector.lock().update(header.f_counter.get()) { + break; + } + + // check for confirms key + if !state.confirmed.swap(true, Ordering::SeqCst) { + peer.confirm_key(state.keypair.clone()); + } + + // update endpoint, TODO + + // write packet to TUN device, TODO + + // trigger callback + debug_assert!( + packet.len() >= CHACHA20_POLY1305.nonce_len(), + "this should be checked earlier in the pipeline" + ); + (device.call_recv)( + &peer.opaque, + packet.len() > CHACHA20_POLY1305.nonce_len(), + true, + ); + break; + } + _ => (), + }, + }; + + // default is to park + thread::park() } } } @@ -160,48 +172,58 @@ pub fn worker_inbound<C: Callbacks, T: Tun, B: Bind>( pub fn worker_outbound<C: Callbacks, T: Tun, B: Bind>( device: Arc<DeviceInner<C, T, B>>, // related device peer: Arc<PeerInner<C, T, B>>, // related peer - recv: Receiver<JobOutbound>, // in order queue ) { - loop { - match wait_recv(&peer.stopped, &recv) { - Ok(buf) => { - while !peer.stopped.load(Ordering::Acquire) { - match buf.try_lock() { - None => (), // nothing to do - Some(buf) => match buf.status { - Status::Done => { - // parse / cast - let (header, packet) = - match LayoutVerified::new_from_prefix(&buf.msg[..]) { - Some(v) => v, - None => continue, - }; - let header: LayoutVerified<&[u8], TransportHeader> = header; - - // write to UDP device, TODO - let xmit = false; - - // trigger callback - (device.call_send)( - &peer.opaque, - buf.msg.len() - > CHACHA20_POLY1305.nonce_len() - + mem::size_of::<TransportHeader>(), - xmit, - ); - break; - } - Status::Fault => break, - _ => (), - }, - }; - thread::park(); - } - } - Err(e) => { - println!("park outbound! {:?}", e); - break; + while !peer.stopped.load(Ordering::Acquire) { + inner(&device, &peer) + } + + fn inner<C: Callbacks, T: Tun, B: Bind>( + device: &Arc<DeviceInner<C, T, B>>, + peer: &Arc<PeerInner<C, T, B>>, + ) { + // wait for job to be submitted + let (state, buf) = loop { + match peer.inbound.lock().pop_front() { + Some(elem) => break elem, + _ => (), } + + // default is to park + thread::park() + }; + + // wait for job to complete + loop { + match buf.try_lock() { + None => (), + Some(buf) => match buf.status { + Status::Fault => break (), + Status::Done => { + // parse / cast + let (header, packet) = match LayoutVerified::new_from_prefix(&buf.msg[..]) { + Some(v) => v, + None => continue, + }; + let header: LayoutVerified<&[u8], TransportHeader> = header; + + // write to UDP device, TODO + let xmit = false; + + // trigger callback + (device.call_send)( + &peer.opaque, + buf.msg.len() + > CHACHA20_POLY1305.nonce_len() + mem::size_of::<TransportHeader>(), + xmit, + ); + break; + } + _ => (), + }, + }; + + // default is to park + thread::park() } } } @@ -212,11 +234,9 @@ pub fn worker_parallel<C: Callbacks, T: Tun, B: Bind>( stealers: Vec<Stealer<JobParallel<C, T, B>>>, // stealers (from other threads) ) { while device.running.load(Ordering::SeqCst) { - println!("running"); match find_task(&local, &device.injector, &stealers) { Some(job) => { let (peer, buf) = job; - println!("jobs!"); // take ownership of the job buffer and complete it { @@ -272,9 +292,10 @@ pub fn worker_parallel<C: Callbacks, T: Tun, B: Bind>( .unpark(); } None => { - println!("park"); - device.parked.store(true, Ordering::Release); - thread::park(); + // wait for notification from device + let &(ref lock, ref cvar) = &device.waker; + let mut guard = lock.lock(); + cvar.wait(&mut guard); } } } |