From f55014ef8ff7308d37be9c92f41b9ccf809c719d Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Mon, 2 Sep 2019 23:32:07 +0200 Subject: Wake workers when submitting work --- src/router/workers.rs | 233 +++++++++++++++++++++++++++----------------------- 1 file changed, 127 insertions(+), 106 deletions(-) (limited to 'src/router/workers.rs') diff --git a/src/router/workers.rs b/src/router/workers.rs index 241b06f..0e68954 100644 --- a/src/router/workers.rs +++ b/src/router/workers.rs @@ -90,69 +90,81 @@ fn wait_recv(running: &AtomicBool, recv: &Receiver) -> Result( - device: Arc>, // related device - peer: Arc>, // related peer - recv: Receiver>, // in order queue + device: Arc>, // related device + peer: Arc>, // related peer ) { - loop { - match wait_recv(&peer.stopped, &recv) { - Ok((state, buf)) => { - while !peer.stopped.load(Ordering::Acquire) { - match buf.try_lock() { - None => (), - Some(buf) => match buf.status { - Status::Done => { - // parse / cast - let (header, packet) = - match LayoutVerified::new_from_prefix(&buf.msg[..]) { - Some(v) => v, - None => continue, - }; - let header: LayoutVerified<&[u8], TransportHeader> = header; - - // obtain strong reference to decryption state - let state = if let Some(state) = state.upgrade() { - state - } else { - break; - }; - - // check for replay - if !state.protector.lock().update(header.f_counter.get()) { - break; - } - - // check for confirms key - if !state.confirmed.swap(true, Ordering::SeqCst) { - peer.confirm_key(state.keypair.clone()); - } - - // update enpoint, TODO - - // write packet to TUN device, TODO - - // trigger callback - debug_assert!( - packet.len() >= CHACHA20_POLY1305.nonce_len(), - "this should be checked earlier in the pipeline" - ); - (device.call_recv)( - &peer.opaque, - packet.len() > CHACHA20_POLY1305.nonce_len(), - true, - ); - break; - } - Status::Fault => break, - _ => (), - }, - }; - thread::park(); - } - } - Err(_) => { - break; + while !peer.stopped.load(Ordering::Acquire) { + inner(&device, &peer) + } + + fn inner( + device: &Arc>, + peer: &Arc>, + ) { + // wait for job to be submitted + let (state, buf) = loop { + match peer.inbound.lock().pop_front() { + Some(elem) => break elem, + _ => (), } + + // default is to park + thread::park() + }; + + // wait for job to complete + loop { + match buf.try_lock() { + None => (), + Some(buf) => match buf.status { + Status::Fault => break (), + Status::Done => { + // parse / cast + let (header, packet) = match LayoutVerified::new_from_prefix(&buf.msg[..]) { + Some(v) => v, + None => continue, + }; + let header: LayoutVerified<&[u8], TransportHeader> = header; + + // obtain strong reference to decryption state + let state = if let Some(state) = state.upgrade() { + state + } else { + break; + }; + + // check for replay + if !state.protector.lock().update(header.f_counter.get()) { + break; + } + + // check for confirms key + if !state.confirmed.swap(true, Ordering::SeqCst) { + peer.confirm_key(state.keypair.clone()); + } + + // update endpoint, TODO + + // write packet to TUN device, TODO + + // trigger callback + debug_assert!( + packet.len() >= CHACHA20_POLY1305.nonce_len(), + "this should be checked earlier in the pipeline" + ); + (device.call_recv)( + &peer.opaque, + packet.len() > CHACHA20_POLY1305.nonce_len(), + true, + ); + break; + } + _ => (), + }, + }; + + // default is to park + thread::park() } } } @@ -160,48 +172,58 @@ pub fn worker_inbound( pub fn worker_outbound( device: Arc>, // related device peer: Arc>, // related peer - recv: Receiver, // in order queue ) { - loop { - match wait_recv(&peer.stopped, &recv) { - Ok(buf) => { - while !peer.stopped.load(Ordering::Acquire) { - match buf.try_lock() { - None => (), // nothing to do - Some(buf) => match buf.status { - Status::Done => { - // parse / cast - let (header, packet) = - match LayoutVerified::new_from_prefix(&buf.msg[..]) { - Some(v) => v, - None => continue, - }; - let header: LayoutVerified<&[u8], TransportHeader> = header; - - // write to UDP device, TODO - let xmit = false; - - // trigger callback - (device.call_send)( - &peer.opaque, - buf.msg.len() - > CHACHA20_POLY1305.nonce_len() - + mem::size_of::(), - xmit, - ); - break; - } - Status::Fault => break, - _ => (), - }, - }; - thread::park(); - } - } - Err(e) => { - println!("park outbound! {:?}", e); - break; + while !peer.stopped.load(Ordering::Acquire) { + inner(&device, &peer) + } + + fn inner( + device: &Arc>, + peer: &Arc>, + ) { + // wait for job to be submitted + let (state, buf) = loop { + match peer.inbound.lock().pop_front() { + Some(elem) => break elem, + _ => (), } + + // default is to park + thread::park() + }; + + // wait for job to complete + loop { + match buf.try_lock() { + None => (), + Some(buf) => match buf.status { + Status::Fault => break (), + Status::Done => { + // parse / cast + let (header, packet) = match LayoutVerified::new_from_prefix(&buf.msg[..]) { + Some(v) => v, + None => continue, + }; + let header: LayoutVerified<&[u8], TransportHeader> = header; + + // write to UDP device, TODO + let xmit = false; + + // trigger callback + (device.call_send)( + &peer.opaque, + buf.msg.len() + > CHACHA20_POLY1305.nonce_len() + mem::size_of::(), + xmit, + ); + break; + } + _ => (), + }, + }; + + // default is to park + thread::park() } } } @@ -212,11 +234,9 @@ pub fn worker_parallel( stealers: Vec>>, // stealers (from other threads) ) { while device.running.load(Ordering::SeqCst) { - println!("running"); match find_task(&local, &device.injector, &stealers) { Some(job) => { let (peer, buf) = job; - println!("jobs!"); // take ownership of the job buffer and complete it { @@ -272,9 +292,10 @@ pub fn worker_parallel( .unpark(); } None => { - println!("park"); - device.parked.store(true, Ordering::Release); - thread::park(); + // wait for notification from device + let &(ref lock, ref cvar) = &device.waker; + let mut guard = lock.lock(); + cvar.wait(&mut guard); } } } -- cgit v1.2.3-59-g8ed1b