diff options
author | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-12-09 13:21:12 +0100 |
---|---|---|
committer | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-12-09 13:21:12 +0100 |
commit | 115fa574a807877594c3b8cf467798fc0524d007 (patch) | |
tree | 07d487f3f8ef130d17536fb93b02f937b7bf66ae /src/wireguard/router/outbound.rs | |
parent | Fixed inbound job bug (add to sequential queue) (diff) | |
download | wireguard-rs-115fa574a807877594c3b8cf467798fc0524d007.tar.xz wireguard-rs-115fa574a807877594c3b8cf467798fc0524d007.zip |
Move to run queue
Diffstat (limited to 'src/wireguard/router/outbound.rs')
-rw-r--r-- | src/wireguard/router/outbound.rs | 138 |
1 files changed, 69 insertions, 69 deletions
diff --git a/src/wireguard/router/outbound.rs b/src/wireguard/router/outbound.rs index d08637b..6c42d8f 100644 --- a/src/wireguard/router/outbound.rs +++ b/src/wireguard/router/outbound.rs @@ -5,6 +5,7 @@ use super::types::Callbacks; use super::KeyPair; use super::REJECT_AFTER_MESSAGES; use super::{tun, udp, Endpoint}; +use super::device::Device; use std::sync::mpsc::Receiver; use std::sync::Arc; @@ -31,78 +32,77 @@ impl Outbound { } #[inline(always)] -fn parallel<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>( - _peer: &Peer<E, C, T, B>, - body: &mut Outbound, +pub fn parallel<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>( + device: Device<E, C, T, B>, + receiver: Receiver<Job<Peer<E, C, T, B>, Outbound>>, + ) { - log::trace!("worker, parallel section, obtained job"); - - // make space for the tag - body.msg.extend([0u8; SIZE_TAG].iter()); - - // cast to header (should never fail) - let (mut header, packet): (LayoutVerified<&mut [u8], TransportHeader>, &mut [u8]) = - LayoutVerified::new_from_prefix(&mut body.msg[..]) - .expect("earlier code should ensure that there is ample space"); - - // set header fields - debug_assert!( - body.counter < REJECT_AFTER_MESSAGES, - "should be checked when assigning counters" - ); - header.f_type.set(TYPE_TRANSPORT); - header.f_receiver.set(body.keypair.send.id); - header.f_counter.set(body.counter); - - // create a nonce object - let mut nonce = [0u8; 12]; - debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len()); - nonce[4..].copy_from_slice(header.f_counter.as_bytes()); - let nonce = Nonce::assume_unique_for_key(nonce); - - // do the weird ring AEAD dance - let key = - LessSafeKey::new(UnboundKey::new(&CHACHA20_POLY1305, &body.keypair.send.key[..]).unwrap()); - - // encrypt content of transport message in-place - let end = packet.len() - SIZE_TAG; - let tag = key - .seal_in_place_separate_tag(nonce, Aad::empty(), &mut packet[..end]) - .unwrap(); - - // append tag - packet[end..].copy_from_slice(tag.as_ref()); -} + fn work<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>( + _peer: &Peer<E, C, T, B>, + body: &mut Outbound, + ) { + log::trace!("worker, parallel section, obtained job"); + + // make space for the tag + body.msg.extend([0u8; SIZE_TAG].iter()); + + // cast to header (should never fail) + let (mut header, packet): (LayoutVerified<&mut [u8], TransportHeader>, &mut [u8]) = + LayoutVerified::new_from_prefix(&mut body.msg[..]) + .expect("earlier code should ensure that there is ample space"); + + // set header fields + debug_assert!( + body.counter < REJECT_AFTER_MESSAGES, + "should be checked when assigning counters" + ); + header.f_type.set(TYPE_TRANSPORT); + header.f_receiver.set(body.keypair.send.id); + header.f_counter.set(body.counter); + + // create a nonce object + let mut nonce = [0u8; 12]; + debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len()); + nonce[4..].copy_from_slice(header.f_counter.as_bytes()); + let nonce = Nonce::assume_unique_for_key(nonce); + + // do the weird ring AEAD dance + let key = + LessSafeKey::new(UnboundKey::new(&CHACHA20_POLY1305, &body.keypair.send.key[..]).unwrap()); + + // encrypt content of transport message in-place + let end = packet.len() - SIZE_TAG; + let tag = key + .seal_in_place_separate_tag(nonce, Aad::empty(), &mut packet[..end]) + .unwrap(); + + // append tag + packet[end..].copy_from_slice(tag.as_ref()); + } -#[inline(always)] -fn sequential<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>( - peer: &Peer<E, C, T, B>, - body: &mut Outbound, -) { - log::trace!("worker, sequential section, obtained job"); - - // send to peer - let xmit = peer.send(&body.msg[..]).is_ok(); - - // trigger callback - C::send( - &peer.opaque, - body.msg.len(), - xmit, - &body.keypair, - body.counter, - ); + worker_parallel(device, |dev| &dev.run_outbound, receiver, work); } -#[inline(always)] -pub fn queue<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>( - peer: &Peer<E, C, T, B>, -) -> &InorderQueue<Peer<E, C, T, B>, Outbound> { - &peer.outbound -} -pub fn worker<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>( - receiver: Receiver<Job<Peer<E, C, T, B>, Outbound>>, +#[inline(always)] +pub fn sequential<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>( + device: Device<E, C, T, B>, ) { - worker_template(receiver, parallel, sequential, queue) -} + device.run_outbound.run(|peer| { + peer.outbound.handle(|body| { + log::trace!("worker, sequential section, obtained job"); + + // send to peer + let xmit = peer.send(&body.msg[..]).is_ok(); + + // trigger callback + C::send( + &peer.opaque, + body.msg.len(), + xmit, + &body.keypair, + body.counter, + ); + }); + }); +}
\ No newline at end of file |