summaryrefslogtreecommitdiffstats
path: root/src/router/workers.rs
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2019-09-10 21:42:21 +0200
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2019-09-10 21:42:21 +0200
commit02d9bdcc96c955b654a45d3629b1ee515902078d (patch)
treed0231989ccca424d26f0dcded640acac079aa9de /src/router/workers.rs
parentBegin work on full router interaction unittest (diff)
downloadwireguard-rs-02d9bdcc96c955b654a45d3629b1ee515902078d.tar.xz
wireguard-rs-02d9bdcc96c955b654a45d3629b1ee515902078d.zip
Full inbound/outbound router test
Diffstat (limited to '')
-rw-r--r--src/router/workers.rs72
1 files changed, 54 insertions, 18 deletions
diff --git a/src/router/workers.rs b/src/router/workers.rs
index 85cf22a..b038a20 100644
--- a/src/router/workers.rs
+++ b/src/router/workers.rs
@@ -13,13 +13,14 @@ use std::sync::atomic::Ordering;
use zerocopy::{AsBytes, LayoutVerified};
use super::device::{DecryptionState, DeviceInner};
-use super::messages::TransportHeader;
+use super::messages::{TransportHeader, TYPE_TRANSPORT};
use super::peer::PeerInner;
use super::types::Callbacks;
+use super::super::types::{Bind, Tun};
use super::ip::*;
-use super::super::types::{Bind, Tun};
+const SIZE_TAG: usize = 16;
#[derive(PartialEq, Debug)]
pub enum Operation {
@@ -105,32 +106,37 @@ pub fn worker_inbound<C: Callbacks, T: Tun, B: Bind>(
return;
}
};
+ debug!("inbound worker: obtained job");
// wait for job to complete
let _ = rx
.map(|buf| {
+ debug!("inbound worker: job complete");
if buf.okay {
// cast transport header
let (header, packet): (LayoutVerified<&[u8], TransportHeader>, &[u8]) =
match LayoutVerified::new_from_prefix(&buf.msg[..]) {
Some(v) => v,
None => {
+ debug!("inbound worker: failed to parse message");
return;
}
};
debug_assert!(
packet.len() >= CHACHA20_POLY1305.tag_len(),
- "this should be checked earlier in the pipeline"
+ "this should be checked earlier in the pipeline (decryption should fail)"
);
// check for replay
if !state.protector.lock().update(header.f_counter.get()) {
+ debug!("inbound worker: replay detected");
return;
}
// check for confirms key
if !state.confirmed.swap(true, Ordering::SeqCst) {
+ debug!("inbound worker: message confirms key");
peer.confirm_key(&state.keypair);
}
@@ -138,7 +144,8 @@ pub fn worker_inbound<C: Callbacks, T: Tun, B: Bind>(
*peer.endpoint.lock() = Some(endpoint);
// calculate length of IP packet + padding
- let length = packet.len() - CHACHA20_POLY1305.nonce_len();
+ let length = packet.len() - SIZE_TAG;
+ debug!("inbound worker: plaintext length = {}", length);
// check if should be written to TUN
let mut sent = false;
@@ -155,10 +162,14 @@ pub fn worker_inbound<C: Callbacks, T: Tun, B: Bind>(
}
}
}
+ } else {
+ debug!("inbound worker: received keepalive")
}
// trigger callback
- (device.call_recv)(&peer.opaque, length == 0, sent);
+ (device.call_recv)(&peer.opaque, buf.msg.len(), length == 0, sent);
+ } else {
+ debug!("inbound worker: authentication failure")
}
})
.wait();
@@ -178,10 +189,12 @@ pub fn worker_outbound<C: Callbacks, T: Tun, B: Bind>(
return;
}
};
+ debug!("outbound worker: obtained job");
// wait for job to complete
let _ = rx
.map(|buf| {
+ debug!("outbound worker: job complete");
if buf.okay {
// write to UDP bind
let xmit = if let Some(dst) = peer.endpoint.lock().as_ref() {
@@ -199,6 +212,7 @@ pub fn worker_outbound<C: Callbacks, T: Tun, B: Bind>(
// trigger callback
(device.call_send)(
&peer.opaque,
+ buf.msg.len(),
buf.msg.len()
> CHACHA20_POLY1305.nonce_len() + mem::size_of::<TransportHeader>(),
xmit,
@@ -218,17 +232,26 @@ pub fn worker_parallel(receiver: Receiver<JobParallel>) {
}
Ok(val) => val,
};
+ debug!("parallel worker: obtained job");
+
+ // make space for tag (TODO: consider moving this out)
+ if buf.op == Operation::Encryption {
+ buf.msg.extend([0u8; SIZE_TAG].iter());
+ }
// cast and check size of packet
- let (header, packet): (LayoutVerified<&[u8], TransportHeader>, &[u8]) =
- match LayoutVerified::new_from_prefix(&buf.msg[..]) {
+ let (mut header, packet): (LayoutVerified<&mut [u8], TransportHeader>, &mut [u8]) =
+ match LayoutVerified::new_from_prefix(&mut buf.msg[..]) {
Some(v) => v,
- None => continue,
+ None => {
+ debug_assert!(
+ false,
+ "parallel worker: failed to parse message (insufficient size)"
+ );
+ continue;
+ }
};
-
- if packet.len() < CHACHA20_POLY1305.nonce_len() {
- continue;
- }
+ debug_assert!(packet.len() >= CHACHA20_POLY1305.tag_len());
// do the weird ring AEAD dance
let key = LessSafeKey::new(UnboundKey::new(&CHACHA20_POLY1305, &buf.key[..]).unwrap());
@@ -241,18 +264,27 @@ pub fn worker_parallel(receiver: Receiver<JobParallel>) {
match buf.op {
Operation::Encryption => {
- debug!("worker, process encryption");
+ debug!("parallel worker: process encryption");
- // note: extends the vector to accommodate the tag
- key.seal_in_place_append_tag(nonce, Aad::empty(), &mut buf.msg)
+ // set the type field
+ header.f_type.set(TYPE_TRANSPORT);
+
+ // encrypt content of transport message in-place
+ let end = packet.len() - SIZE_TAG;
+ let tag = key
+ .seal_in_place_separate_tag(nonce, Aad::empty(), &mut packet[..end])
.unwrap();
+
+ // append tag
+ packet[end..].copy_from_slice(tag.as_ref());
+
buf.okay = true;
}
Operation::Decryption => {
- debug!("worker, process decryption");
+ debug!("parallel worker: process decryption");
// opening failure is signaled by fault state
- buf.okay = match key.open_in_place(nonce, Aad::empty(), &mut buf.msg) {
+ buf.okay = match key.open_in_place(nonce, Aad::empty(), packet) {
Ok(_) => true,
Err(_) => false,
};
@@ -260,6 +292,10 @@ pub fn worker_parallel(receiver: Receiver<JobParallel>) {
}
// pass ownership to consumer
- let _ = tx.send(buf);
+ let okay = tx.send(buf);
+ debug!(
+ "parallel worker: passing ownership to sequential worker: {}",
+ okay.is_ok()
+ );
}
}