summaryrefslogtreecommitdiffstats
path: root/src/router/workers.rs
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2019-08-27 22:20:22 +0200
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2019-08-27 22:20:22 +0200
commite5f515098afc2e60ceba4ed5faa56a887b9dbc75 (patch)
tree668a7e857f9583f1fb3c8a0b7692b189141ff2ed /src/router/workers.rs
parentAdded sealing/opening to the router worker (diff)
downloadwireguard-rs-e5f515098afc2e60ceba4ed5faa56a887b9dbc75.tar.xz
wireguard-rs-e5f515098afc2e60ceba4ed5faa56a887b9dbc75.zip
Work on inbound/outbound consume code
Diffstat (limited to '')
-rw-r--r--src/router/workers.rs75
1 files changed, 67 insertions, 8 deletions
diff --git a/src/router/workers.rs b/src/router/workers.rs
index 0888db1..98074e7 100644
--- a/src/router/workers.rs
+++ b/src/router/workers.rs
@@ -1,4 +1,5 @@
use std::iter;
+use std::mem;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{sync_channel, Receiver, TryRecvError};
use std::sync::{Arc, Weak};
@@ -97,12 +98,50 @@ pub fn worker_inbound<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<
while !peer.stopped.load(Ordering::Acquire) {
match buf.try_lock() {
None => (),
- Some(buf) => {
- if buf.status != Status::Waiting {
- // consume
+ Some(buf) => match buf.status {
+ Status::Done => {
+ // cast
+ let (header, packet) =
+ match LayoutVerified::new_from_prefix(&buf.msg[..]) {
+ Some(v) => v,
+ None => continue,
+ };
+ let header: LayoutVerified<&[u8], TransportHeader> = header;
+
+ // obtain strong reference to decryption state
+ let state = if let Some(state) = state.upgrade() {
+ state
+ } else {
+ break;
+ };
+
+ // check for replay
+ if !state.protector.lock().update(header.f_counter.get()) {
+ break;
+ }
+
+ // check for confirms key
+ if state.confirmed.swap(true, Ordering::SeqCst) {
+ // TODO: confirm key
+ }
+
+ // write packet to TUN device
+
+ // trigger callback
+ debug_assert!(
+ packet.len() >= CHACHA20_POLY1305.nonce_len(),
+ "this should be checked earlier in the pipeline"
+ );
+ (device.event_recv)(
+ &peer.opaque,
+ packet.len() > CHACHA20_POLY1305.nonce_len(),
+ true,
+ );
break;
}
- }
+ Status::Fault => break,
+ _ => (),
+ },
};
thread::park();
}
@@ -125,12 +164,32 @@ pub fn worker_outbound<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback
while !peer.stopped.load(Ordering::Acquire) {
match buf.try_lock() {
None => (),
- Some(buf) => {
- if buf.status != Status::Waiting {
- // consume
+ Some(buf) => match buf.status {
+ Status::Done => {
+ // cast
+ let (header, packet) =
+ match LayoutVerified::new_from_prefix(&buf.msg[..]) {
+ Some(v) => v,
+ None => continue,
+ };
+ let header: LayoutVerified<&[u8], TransportHeader> = header;
+
+ // write to UDP device
+ let xmit = false;
+
+ // trigger callback
+ (device.event_send)(
+ &peer.opaque,
+ buf.msg.len()
+ > CHACHA20_POLY1305.nonce_len()
+ + mem::size_of::<TransportHeader>(),
+ xmit,
+ );
break;
}
- }
+ Status::Fault => break,
+ _ => (),
+ },
};
thread::park();
}