diff options
author | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-12-09 13:21:12 +0100 |
---|---|---|
committer | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-12-09 13:21:12 +0100 |
commit | 115fa574a807877594c3b8cf467798fc0524d007 (patch) | |
tree | 07d487f3f8ef130d17536fb93b02f937b7bf66ae /src/wireguard/router/inbound.rs | |
parent | Fixed inbound job bug (add to sequential queue) (diff) | |
download | wireguard-rs-115fa574a807877594c3b8cf467798fc0524d007.tar.xz wireguard-rs-115fa574a807877594c3b8cf467798fc0524d007.zip |
Move to run queue
Diffstat (limited to 'src/wireguard/router/inbound.rs')
-rw-r--r-- | src/wireguard/router/inbound.rs | 242 |
1 files changed, 128 insertions, 114 deletions
diff --git a/src/wireguard/router/inbound.rs b/src/wireguard/router/inbound.rs index 3d47bb7..9b15750 100644 --- a/src/wireguard/router/inbound.rs +++ b/src/wireguard/router/inbound.rs @@ -4,6 +4,8 @@ use super::peer::Peer; use super::pool::*; use super::types::Callbacks; use super::{tun, udp, Endpoint}; +use super::device::Device; +use super::runq::RunQueue; use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305}; use zerocopy::{AsBytes, LayoutVerified}; @@ -38,139 +40,151 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Inbound<E, C, } #[inline(always)] -fn parallel<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>( - peer: &Peer<E, C, T, B>, - body: &mut Inbound<E, C, T, B>, +pub fn parallel<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>( + device: Device<E, C, T, B>, + receiver: Receiver<Job<Peer<E, C, T, B>, Inbound<E, C, T, B>>>, ) { - log::trace!("worker, parallel section, obtained job"); + // run queue to schedule + fn queue<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>( + device: &Device<E, C, T, B>, + ) -> &RunQueue<Peer<E, C, T, B>> { + &device.run_inbound + } + + // parallel work to apply + fn work<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>( + peer: &Peer<E, C, T, B>, + body: &mut Inbound<E, C, T, B>, + ) { + log::trace!("worker, parallel section, obtained job"); + + // cast to header followed by payload + let (header, packet): (LayoutVerified<&mut [u8], TransportHeader>, &mut [u8]) = + match LayoutVerified::new_from_prefix(&mut body.msg[..]) { + Some(v) => v, + None => { + log::debug!("inbound worker: failed to parse message"); + return; + } + }; + + // authenticate and decrypt payload + { + // create nonce object + let mut nonce = [0u8; 12]; + debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len()); + nonce[4..].copy_from_slice(header.f_counter.as_bytes()); + let nonce = Nonce::assume_unique_for_key(nonce); + + // do the weird ring AEAD dance + let key = LessSafeKey::new( + UnboundKey::new(&CHACHA20_POLY1305, &body.state.keypair.recv.key[..]).unwrap(), + ); - // cast to header followed by payload - let (header, packet): (LayoutVerified<&mut [u8], TransportHeader>, &mut [u8]) = - match LayoutVerified::new_from_prefix(&mut body.msg[..]) { - Some(v) => v, - None => { - log::debug!("inbound worker: failed to parse message"); - return; + // attempt to open (and authenticate) the body + match key.open_in_place(nonce, Aad::empty(), packet) { + Ok(_) => (), + Err(_) => { + // fault and return early + log::trace!("inbound worker: authentication failure"); + body.failed = true; + return; + } + } + } + + // cryptokey route and strip padding + let inner_len = { + let length = packet.len() - SIZE_TAG; + if length > 0 { + peer.device.table.check_route(&peer, &packet[..length]) + } else { + Some(0) } }; - // authenticate and decrypt payload - { - // create nonce object - let mut nonce = [0u8; 12]; - debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len()); - nonce[4..].copy_from_slice(header.f_counter.as_bytes()); - let nonce = Nonce::assume_unique_for_key(nonce); - - // do the weird ring AEAD dance - let key = LessSafeKey::new( - UnboundKey::new(&CHACHA20_POLY1305, &body.state.keypair.recv.key[..]).unwrap(), - ); - - // attempt to open (and authenticate) the body - match key.open_in_place(nonce, Aad::empty(), packet) { - Ok(_) => (), - Err(_) => { - // fault and return early - log::trace!("inbound worker: authentication failure"); + // truncate to remove tag + match inner_len { + None => { + log::trace!("inbound worker: cryptokey routing failed"); body.failed = true; - return; + } + Some(len) => { + log::trace!( + "inbound worker: good route, length = {} {}", + len, + if len == 0 { "(keepalive)" } else { "" } + ); + body.msg.truncate(mem::size_of::<TransportHeader>() + len); } } } - // cryptokey route and strip padding - let inner_len = { - let length = packet.len() - SIZE_TAG; - if length > 0 { - peer.device.table.check_route(&peer, &packet[..length]) - } else { - Some(0) - } - }; - - // truncate to remove tag - match inner_len { - None => { - log::trace!("inbound worker: cryptokey routing failed"); - body.failed = true; - } - Some(len) => { - log::trace!( - "inbound worker: good route, length = {} {}", - len, - if len == 0 { "(keepalive)" } else { "" } - ); - body.msg.truncate(mem::size_of::<TransportHeader>() + len); - } - } + worker_parallel(device, |dev| &dev.run_inbound, receiver, work) } #[inline(always)] -fn sequential<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>( - peer: &Peer<E, C, T, B>, - body: &mut Inbound<E, C, T, B>, +pub fn sequential<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>( + device: Device<E, C, T, B>, ) { - log::trace!("worker, sequential section, obtained job"); - - // decryption failed, return early - if body.failed { - log::trace!("job faulted, remove from queue and ignore"); - return; - } - - // cast transport header - let (header, packet): (LayoutVerified<&[u8], TransportHeader>, &[u8]) = - match LayoutVerified::new_from_prefix(&body.msg[..]) { - Some(v) => v, - None => { - log::debug!("inbound worker: failed to parse message"); - return; - } - }; - - // check for replay - if !body.state.protector.lock().update(header.f_counter.get()) { - log::debug!("inbound worker: replay detected"); - return; - } + // sequential work to apply + fn work<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>( + peer: &Peer<E, C, T, B>, + body: &mut Inbound<E, C, T, B> + ) { + log::trace!("worker, sequential section, obtained job"); + + // decryption failed, return early + if body.failed { + log::trace!("job faulted, remove from queue and ignore"); + return; + } - // check for confirms key - if !body.state.confirmed.swap(true, Ordering::SeqCst) { - log::debug!("inbound worker: message confirms key"); - peer.confirm_key(&body.state.keypair); - } + // cast transport header + let (header, packet): (LayoutVerified<&[u8], TransportHeader>, &[u8]) = + match LayoutVerified::new_from_prefix(&body.msg[..]) { + Some(v) => v, + None => { + log::debug!("inbound worker: failed to parse message"); + return; + } + }; + + // check for replay + if !body.state.protector.lock().update(header.f_counter.get()) { + log::debug!("inbound worker: replay detected"); + return; + } - // update endpoint - *peer.endpoint.lock() = body.endpoint.take(); + // check for confirms key + if !body.state.confirmed.swap(true, Ordering::SeqCst) { + log::debug!("inbound worker: message confirms key"); + peer.confirm_key(&body.state.keypair); + } - // check if should be written to TUN - let mut sent = false; - if packet.len() > 0 { - sent = match peer.device.inbound.write(&packet[..]) { - Err(e) => { - log::debug!("failed to write inbound packet to TUN: {:?}", e); - false + // update endpoint + *peer.endpoint.lock() = body.endpoint.take(); + + // check if should be written to TUN + let mut sent = false; + if packet.len() > 0 { + sent = match peer.device.inbound.write(&packet[..]) { + Err(e) => { + log::debug!("failed to write inbound packet to TUN: {:?}", e); + false + } + Ok(_) => true, } - Ok(_) => true, + } else { + log::debug!("inbound worker: received keepalive") } - } else { - log::debug!("inbound worker: received keepalive") - } - // trigger callback - C::recv(&peer.opaque, body.msg.len(), sent, &body.state.keypair); -} - -#[inline(always)] -fn queue<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>( - peer: &Peer<E, C, T, B>, -) -> &InorderQueue<Peer<E, C, T, B>, Inbound<E, C, T, B>> { - &peer.inbound -} + // trigger callback + C::recv(&peer.opaque, body.msg.len(), sent, &body.state.keypair); + } -pub fn worker<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>( - receiver: Receiver<Job<Peer<E, C, T, B>, Inbound<E, C, T, B>>>, -) { - worker_template(receiver, parallel, sequential, queue) + // handle message from the peers inbound queue + device.run_inbound.run(|peer| { + peer.inbound.handle(|body| work(&peer, body)); + }); } |