diff options
Diffstat (limited to '')
-rw-r--r-- | src/router/workers.rs | 75 |
1 files changed, 67 insertions, 8 deletions
diff --git a/src/router/workers.rs b/src/router/workers.rs index 0888db1..98074e7 100644 --- a/src/router/workers.rs +++ b/src/router/workers.rs @@ -1,4 +1,5 @@ use std::iter; +use std::mem; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{sync_channel, Receiver, TryRecvError}; use std::sync::{Arc, Weak}; @@ -97,12 +98,50 @@ pub fn worker_inbound<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback< while !peer.stopped.load(Ordering::Acquire) { match buf.try_lock() { None => (), - Some(buf) => { - if buf.status != Status::Waiting { - // consume + Some(buf) => match buf.status { + Status::Done => { + // cast + let (header, packet) = + match LayoutVerified::new_from_prefix(&buf.msg[..]) { + Some(v) => v, + None => continue, + }; + let header: LayoutVerified<&[u8], TransportHeader> = header; + + // obtain strong reference to decryption state + let state = if let Some(state) = state.upgrade() { + state + } else { + break; + }; + + // check for replay + if !state.protector.lock().update(header.f_counter.get()) { + break; + } + + // check for confirms key + if state.confirmed.swap(true, Ordering::SeqCst) { + // TODO: confirm key + } + + // write packet to TUN device + + // trigger callback + debug_assert!( + packet.len() >= CHACHA20_POLY1305.nonce_len(), + "this should be checked earlier in the pipeline" + ); + (device.event_recv)( + &peer.opaque, + packet.len() > CHACHA20_POLY1305.nonce_len(), + true, + ); break; } - } + Status::Fault => break, + _ => (), + }, }; thread::park(); } @@ -125,12 +164,32 @@ pub fn worker_outbound<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback while !peer.stopped.load(Ordering::Acquire) { match buf.try_lock() { None => (), - Some(buf) => { - if buf.status != Status::Waiting { - // consume + Some(buf) => match buf.status { + Status::Done => { + // cast + let (header, packet) = + match LayoutVerified::new_from_prefix(&buf.msg[..]) { + Some(v) => v, + None => continue, + }; + let header: LayoutVerified<&[u8], TransportHeader> = header; + + // write to UDP device + let xmit = false; + + // trigger callback + (device.event_send)( + &peer.opaque, + buf.msg.len() + > CHACHA20_POLY1305.nonce_len() + + mem::size_of::<TransportHeader>(), + xmit, + ); break; } - } + Status::Fault => break, + _ => (), + }, }; thread::park(); } |