aboutsummaryrefslogtreecommitdiffstats
path: root/src/wireguard/router/device.rs
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/wireguard/router/device.rs101
1 files changed, 71 insertions, 30 deletions
diff --git a/src/wireguard/router/device.rs b/src/wireguard/router/device.rs
index e405446..9bba199 100644
--- a/src/wireguard/router/device.rs
+++ b/src/wireguard/router/device.rs
@@ -20,6 +20,7 @@ use super::peer::{new_peer, Peer, PeerHandle};
use super::types::{Callbacks, RouterError};
use super::SIZE_MESSAGE_PREFIX;
+use super::runq::RunQueue;
use super::route::RoutingTable;
use super::super::{tun, udp, Endpoint, KeyPair};
@@ -37,8 +38,12 @@ pub struct DeviceInner<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer
pub table: RoutingTable<Peer<E, C, T, B>>,
// work queues
- pub outbound_queue: ParallelQueue<Job<Peer<E, C, T, B>, outbound::Outbound>>,
- pub inbound_queue: ParallelQueue<Job<Peer<E, C, T, B>, inbound::Inbound<E, C, T, B>>>,
+ pub queue_outbound: ParallelQueue<Job<Peer<E, C, T, B>, outbound::Outbound>>,
+ pub queue_inbound: ParallelQueue<Job<Peer<E, C, T, B>, inbound::Inbound<E, C, T, B>>>,
+
+ // run queues
+ pub run_inbound: RunQueue<Peer<E, C, T, B>>,
+ pub run_outbound: RunQueue<Peer<E, C, T, B>>,
}
pub struct EncryptionState {
@@ -96,8 +101,12 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Drop
debug!("router: dropping device");
// close worker queues
- self.state.outbound_queue.close();
- self.state.inbound_queue.close();
+ self.state.queue_outbound.close();
+ self.state.queue_inbound.close();
+
+ // close run queues
+ self.state.run_outbound.close();
+ self.state.run_inbound.close();
// join all worker threads
while match self.handles.pop() {
@@ -116,43 +125,73 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Drop
impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle<E, C, T, B> {
pub fn new(num_workers: usize, tun: T) -> DeviceHandle<E, C, T, B> {
// allocate shared device state
- let (mut outrx, outbound_queue) = ParallelQueue::new(num_workers);
- let (mut inrx, inbound_queue) = ParallelQueue::new(num_workers);
- let inner = DeviceInner {
- inbound: tun,
- inbound_queue,
- outbound: RwLock::new((true, None)),
- outbound_queue,
- recv: RwLock::new(HashMap::new()),
- table: RoutingTable::new(),
+ let (mut outrx, queue_outbound) = ParallelQueue::new(num_workers);
+ let (mut inrx, queue_inbound) = ParallelQueue::new(num_workers);
+ let device = Device {
+ inner: Arc::new(DeviceInner {
+ inbound: tun,
+ queue_inbound,
+ outbound: RwLock::new((true, None)),
+ queue_outbound,
+ run_inbound: RunQueue::new(),
+ run_outbound: RunQueue::new(),
+ recv: RwLock::new(HashMap::new()),
+ table: RoutingTable::new(),
+ })
};
// start worker threads
let mut threads = Vec::with_capacity(num_workers);
+ // inbound/decryption workers
for _ in 0..num_workers {
- let rx = inrx.pop().unwrap();
- threads.push(thread::spawn(move || {
- log::debug!("inbound router worker started");
- inbound::worker(rx)
- }));
+ // parallel workers (parallel processing)
+ {
+ let device = device.clone();
+ let rx = inrx.pop().unwrap();
+ threads.push(thread::spawn(move || {
+ log::debug!("inbound parallel router worker started");
+ inbound::parallel(device, rx)
+ }));
+ }
+
+ // sequential workers (in-order processing)
+ {
+ let device = device.clone();
+ threads.push(thread::spawn(move || {
+ log::debug!("inbound sequential router worker started");
+ inbound::sequential(device)
+ }));
+ }
}
+ // outbound/encryption workers
for _ in 0..num_workers {
- let rx = outrx.pop().unwrap();
- threads.push(thread::spawn(move || {
- log::debug!("outbound router worker started");
- outbound::worker(rx)
- }));
+ // parallel workers (parallel processing)
+ {
+ let device = device.clone();
+ let rx = outrx.pop().unwrap();
+ threads.push(thread::spawn(move || {
+ log::debug!("outbound parallel router worker started");
+ outbound::parallel(device, rx)
+ }));
+ }
+
+ // sequential workers (in-order processing)
+ {
+ let device = device.clone();
+ threads.push(thread::spawn(move || {
+ log::debug!("outbound sequential router worker started");
+ outbound::sequential(device)
+ }));
+ }
}
- debug_assert_eq!(threads.len(), num_workers * 2);
+ debug_assert_eq!(threads.len(), num_workers * 4);
// return exported device handle
DeviceHandle {
- state: Device {
- inner: Arc::new(inner),
- },
+ state: device,
handles: threads,
}
}
@@ -192,7 +231,7 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle<
pub fn send(&self, msg: Vec<u8>) -> Result<(), RouterError> {
debug_assert!(msg.len() > SIZE_MESSAGE_PREFIX);
log::trace!(
- "Router, outbound packet = {}",
+ "send, packet = {}",
hex::encode(&msg[SIZE_MESSAGE_PREFIX..])
);
@@ -208,7 +247,7 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle<
// schedule for encryption and transmission to peer
if let Some(job) = peer.send_job(msg, true) {
- self.state.outbound_queue.send(job);
+ self.state.queue_outbound.send(job);
}
Ok(())
@@ -225,6 +264,8 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle<
///
///
pub fn recv(&self, src: E, msg: Vec<u8>) -> Result<(), RouterError> {
+ log::trace!("receive, src: {}", src.into_address());
+
// parse / cast
let (header, _) = match LayoutVerified::new_from_prefix(&msg[..]) {
Some(v) => v,
@@ -255,7 +296,7 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle<
// schedule for decryption and TUN write
if let Some(job) = dec.peer.recv_job(src, dec.clone(), msg) {
log::trace!("schedule decryption of transport message");
- self.state.inbound_queue.send(job);
+ self.state.queue_inbound.send(job);
}
Ok(())
}