summaryrefslogtreecommitdiffstats
path: root/src/router/workers.rs
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2019-09-07 18:38:19 +0200
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2019-09-07 18:38:19 +0200
commit7b61ee4c2db87e195f5291fb1a3927648d38a2a4 (patch)
tree410c0609c3f4d1afbd0d87791b9156a538f59398 /src/router/workers.rs
parentAdded outbound benchmark (diff)
downloadwireguard-rs-7b61ee4c2db87e195f5291fb1a3927648d38a2a4.tar.xz
wireguard-rs-7b61ee4c2db87e195f5291fb1a3927648d38a2a4.zip
Write inbound packets to TUN device
Diffstat (limited to 'src/router/workers.rs')
-rw-r--r--src/router/workers.rs108
1 files changed, 82 insertions, 26 deletions
diff --git a/src/router/workers.rs b/src/router/workers.rs
index b18b038..45e1058 100644
--- a/src/router/workers.rs
+++ b/src/router/workers.rs
@@ -1,6 +1,6 @@
use std::mem;
use std::sync::mpsc::Receiver;
-use std::sync::{Arc, Weak};
+use std::sync::Arc;
use futures::sync::oneshot;
use futures::*;
@@ -8,15 +8,17 @@ use futures::*;
use log::debug;
use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305};
-use std::sync::atomic::{AtomicBool, Ordering};
+use std::net::{Ipv4Addr, Ipv6Addr};
+use std::sync::atomic::Ordering;
use zerocopy::{AsBytes, LayoutVerified};
-use super::device::DecryptionState;
-use super::device::DeviceInner;
+use super::device::{DecryptionState, DeviceInner};
use super::messages::TransportHeader;
use super::peer::PeerInner;
use super::types::Callbacks;
+use super::ip::*;
+
use super::super::types::{Bind, Tun};
#[derive(PartialEq, Debug)]
@@ -33,9 +35,60 @@ pub struct JobBuffer {
}
pub type JobParallel = (oneshot::Sender<JobBuffer>, JobBuffer);
-pub type JobInbound<C, T, B> = (Weak<DecryptionState<C, T, B>>, oneshot::Receiver<JobBuffer>);
+pub type JobInbound<C, T, B: Bind> = (
+ Arc<DecryptionState<C, T, B>>,
+ B::Endpoint,
+ oneshot::Receiver<JobBuffer>,
+);
pub type JobOutbound = oneshot::Receiver<JobBuffer>;
+#[inline(always)]
+fn check_route<C: Callbacks, T: Tun, B: Bind>(
+ device: &Arc<DeviceInner<C, T, B>>,
+ peer: &Arc<PeerInner<C, T, B>>,
+ packet: &[u8],
+) -> Option<usize> {
+ match packet[0] >> 4 {
+ VERSION_IP4 => {
+ // check length and cast to IPv4 header
+ let (header, _) = LayoutVerified::new_from_prefix(packet)?;
+ let header: LayoutVerified<&[u8], IPv4Header> = header;
+
+ // check IPv4 source address
+ device
+ .ipv4
+ .read()
+ .longest_match(Ipv4Addr::from(header.f_source))
+ .and_then(|(_, _, p)| {
+ if Arc::ptr_eq(p, &peer) {
+ Some(header.f_total_len.get() as usize)
+ } else {
+ None
+ }
+ })
+ }
+ VERSION_IP6 => {
+ // check length and cast to IPv6 header
+ let (header, packet) = LayoutVerified::new_from_prefix(packet)?;
+ let header: LayoutVerified<&[u8], IPv6Header> = header;
+
+ // check IPv6 source address
+ device
+ .ipv6
+ .read()
+ .longest_match(Ipv6Addr::from(header.f_source))
+ .and_then(|(_, _, p)| {
+ if Arc::ptr_eq(p, &peer) {
+ Some(header.f_len.get() as usize + mem::size_of::<IPv6Header>())
+ } else {
+ None
+ }
+ })
+ }
+ _ => None,
+ }
+}
+
pub fn worker_inbound<C: Callbacks, T: Tun, B: Bind>(
device: Arc<DeviceInner<C, T, B>>, // related device
peer: Arc<PeerInner<C, T, B>>, // related peer
@@ -43,7 +96,7 @@ pub fn worker_inbound<C: Callbacks, T: Tun, B: Bind>(
) {
loop {
// fetch job
- let (state, rx) = match receiver.recv() {
+ let (state, endpoint, rx) = match receiver.recv() {
Ok(v) => v,
_ => {
return;
@@ -62,13 +115,10 @@ pub fn worker_inbound<C: Callbacks, T: Tun, B: Bind>(
}
};
let header: LayoutVerified<&[u8], TransportHeader> = header;
-
- // obtain strong reference to decryption state
- let state = if let Some(state) = state.upgrade() {
- state
- } else {
- return;
- };
+ debug_assert!(
+ packet.len() >= 16,
+ "this should be checked earlier in the pipeline"
+ );
// check for replay
if !state.protector.lock().update(header.f_counter.get()) {
@@ -77,23 +127,29 @@ pub fn worker_inbound<C: Callbacks, T: Tun, B: Bind>(
// check for confirms key
if !state.confirmed.swap(true, Ordering::SeqCst) {
- peer.confirm_key(state.keypair.clone());
+ peer.confirm_key(&state.keypair);
}
- // update endpoint, TODO
-
- // write packet to TUN device, TODO
+ // update endpoint
+ *peer.endpoint.lock() = Some(endpoint);
+
+ // calculate length of IP packet + padding
+ let length = packet.len() - CHACHA20_POLY1305.nonce_len();
+
+ // check if should be written to TUN
+ let mut sent = false;
+ if length > 0 {
+ if let Some(inner_len) = check_route(&device, &peer, &packet[..length]) {
+ debug_assert!(inner_len <= length, "should be validated");
+ if inner_len <= length {
+ sent = true;
+ let _ = device.tun.write(&packet[..inner_len]);
+ }
+ }
+ }
// trigger callback
- debug_assert!(
- packet.len() >= CHACHA20_POLY1305.nonce_len(),
- "this should be checked earlier in the pipeline"
- );
- (device.call_recv)(
- &peer.opaque,
- packet.len() > CHACHA20_POLY1305.nonce_len(),
- true,
- );
+ (device.call_recv)(&peer.opaque, length == 0, sent);
}
})
.wait();