aboutsummaryrefslogtreecommitdiffstats
path: root/src/wireguard/router/outbound.rs
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2019-12-09 13:21:12 +0100
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2019-12-09 13:21:12 +0100
commit115fa574a807877594c3b8cf467798fc0524d007 (patch)
tree07d487f3f8ef130d17536fb93b02f937b7bf66ae /src/wireguard/router/outbound.rs
parentFixed inbound job bug (add to sequential queue) (diff)
downloadwireguard-rs-115fa574a807877594c3b8cf467798fc0524d007.tar.xz
wireguard-rs-115fa574a807877594c3b8cf467798fc0524d007.zip
Move to run queue
Diffstat (limited to 'src/wireguard/router/outbound.rs')
-rw-r--r--src/wireguard/router/outbound.rs138
1 files changed, 69 insertions, 69 deletions
diff --git a/src/wireguard/router/outbound.rs b/src/wireguard/router/outbound.rs
index d08637b..6c42d8f 100644
--- a/src/wireguard/router/outbound.rs
+++ b/src/wireguard/router/outbound.rs
@@ -5,6 +5,7 @@ use super::types::Callbacks;
use super::KeyPair;
use super::REJECT_AFTER_MESSAGES;
use super::{tun, udp, Endpoint};
+use super::device::Device;
use std::sync::mpsc::Receiver;
use std::sync::Arc;
@@ -31,78 +32,77 @@ impl Outbound {
}
#[inline(always)]
-fn parallel<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>(
- _peer: &Peer<E, C, T, B>,
- body: &mut Outbound,
+pub fn parallel<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>(
+ device: Device<E, C, T, B>,
+ receiver: Receiver<Job<Peer<E, C, T, B>, Outbound>>,
+
) {
- log::trace!("worker, parallel section, obtained job");
-
- // make space for the tag
- body.msg.extend([0u8; SIZE_TAG].iter());
-
- // cast to header (should never fail)
- let (mut header, packet): (LayoutVerified<&mut [u8], TransportHeader>, &mut [u8]) =
- LayoutVerified::new_from_prefix(&mut body.msg[..])
- .expect("earlier code should ensure that there is ample space");
-
- // set header fields
- debug_assert!(
- body.counter < REJECT_AFTER_MESSAGES,
- "should be checked when assigning counters"
- );
- header.f_type.set(TYPE_TRANSPORT);
- header.f_receiver.set(body.keypair.send.id);
- header.f_counter.set(body.counter);
-
- // create a nonce object
- let mut nonce = [0u8; 12];
- debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len());
- nonce[4..].copy_from_slice(header.f_counter.as_bytes());
- let nonce = Nonce::assume_unique_for_key(nonce);
-
- // do the weird ring AEAD dance
- let key =
- LessSafeKey::new(UnboundKey::new(&CHACHA20_POLY1305, &body.keypair.send.key[..]).unwrap());
-
- // encrypt content of transport message in-place
- let end = packet.len() - SIZE_TAG;
- let tag = key
- .seal_in_place_separate_tag(nonce, Aad::empty(), &mut packet[..end])
- .unwrap();
-
- // append tag
- packet[end..].copy_from_slice(tag.as_ref());
-}
+ fn work<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>(
+ _peer: &Peer<E, C, T, B>,
+ body: &mut Outbound,
+ ) {
+ log::trace!("worker, parallel section, obtained job");
+
+ // make space for the tag
+ body.msg.extend([0u8; SIZE_TAG].iter());
+
+ // cast to header (should never fail)
+ let (mut header, packet): (LayoutVerified<&mut [u8], TransportHeader>, &mut [u8]) =
+ LayoutVerified::new_from_prefix(&mut body.msg[..])
+ .expect("earlier code should ensure that there is ample space");
+
+ // set header fields
+ debug_assert!(
+ body.counter < REJECT_AFTER_MESSAGES,
+ "should be checked when assigning counters"
+ );
+ header.f_type.set(TYPE_TRANSPORT);
+ header.f_receiver.set(body.keypair.send.id);
+ header.f_counter.set(body.counter);
+
+ // create a nonce object
+ let mut nonce = [0u8; 12];
+ debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len());
+ nonce[4..].copy_from_slice(header.f_counter.as_bytes());
+ let nonce = Nonce::assume_unique_for_key(nonce);
+
+ // do the weird ring AEAD dance
+ let key =
+ LessSafeKey::new(UnboundKey::new(&CHACHA20_POLY1305, &body.keypair.send.key[..]).unwrap());
+
+ // encrypt content of transport message in-place
+ let end = packet.len() - SIZE_TAG;
+ let tag = key
+ .seal_in_place_separate_tag(nonce, Aad::empty(), &mut packet[..end])
+ .unwrap();
+
+ // append tag
+ packet[end..].copy_from_slice(tag.as_ref());
+ }
-#[inline(always)]
-fn sequential<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>(
- peer: &Peer<E, C, T, B>,
- body: &mut Outbound,
-) {
- log::trace!("worker, sequential section, obtained job");
-
- // send to peer
- let xmit = peer.send(&body.msg[..]).is_ok();
-
- // trigger callback
- C::send(
- &peer.opaque,
- body.msg.len(),
- xmit,
- &body.keypair,
- body.counter,
- );
+ worker_parallel(device, |dev| &dev.run_outbound, receiver, work);
}
-#[inline(always)]
-pub fn queue<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>(
- peer: &Peer<E, C, T, B>,
-) -> &InorderQueue<Peer<E, C, T, B>, Outbound> {
- &peer.outbound
-}
-pub fn worker<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>(
- receiver: Receiver<Job<Peer<E, C, T, B>, Outbound>>,
+#[inline(always)]
+pub fn sequential<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>(
+ device: Device<E, C, T, B>,
) {
- worker_template(receiver, parallel, sequential, queue)
-}
+ device.run_outbound.run(|peer| {
+ peer.outbound.handle(|body| {
+ log::trace!("worker, sequential section, obtained job");
+
+ // send to peer
+ let xmit = peer.send(&body.msg[..]).is_ok();
+
+ // trigger callback
+ C::send(
+ &peer.opaque,
+ body.msg.len(),
+ xmit,
+ &body.keypair,
+ body.counter,
+ );
+ });
+ });
+} \ No newline at end of file