aboutsummaryrefslogtreecommitdiffstats
path: root/src/wireguard/router/device.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/wireguard/router/device.rs')
-rw-r--r--src/wireguard/router/device.rs100
1 files changed, 19 insertions, 81 deletions
diff --git a/src/wireguard/router/device.rs b/src/wireguard/router/device.rs
index f903a8e..b8e3821 100644
--- a/src/wireguard/router/device.rs
+++ b/src/wireguard/router/device.rs
@@ -10,19 +10,16 @@ use spin::{Mutex, RwLock};
use zerocopy::LayoutVerified;
use super::anti_replay::AntiReplay;
-use super::pool::Job;
use super::constants::PARALLEL_QUEUE_SIZE;
-use super::inbound;
-use super::outbound;
-
use super::messages::{TransportHeader, TYPE_TRANSPORT};
use super::peer::{new_peer, Peer, PeerHandle};
use super::types::{Callbacks, RouterError};
use super::SIZE_MESSAGE_PREFIX;
+use super::receive::ReceiveJob;
use super::route::RoutingTable;
-use super::runq::RunQueue;
+use super::worker::{worker, JobUnion};
use super::super::{tun, udp, Endpoint, KeyPair};
use super::ParallelQueue;
@@ -38,13 +35,8 @@ pub struct DeviceInner<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer
pub recv: RwLock<HashMap<u32, Arc<DecryptionState<E, C, T, B>>>>, // receiver id -> decryption state
pub table: RoutingTable<Peer<E, C, T, B>>,
- // work queues
- pub queue_outbound: ParallelQueue<Job<Peer<E, C, T, B>, outbound::Outbound>>,
- pub queue_inbound: ParallelQueue<Job<Peer<E, C, T, B>, inbound::Inbound<E, C, T, B>>>,
-
- // run queues
- pub run_inbound: RunQueue<Peer<E, C, T, B>>,
- pub run_outbound: RunQueue<Peer<E, C, T, B>>,
+ // work queue
+ pub work: ParallelQueue<JobUnion<E, C, T, B>>,
}
pub struct EncryptionState {
@@ -101,13 +93,8 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Drop
fn drop(&mut self) {
debug!("router: dropping device");
- // close worker queues
- self.state.queue_outbound.close();
- self.state.queue_inbound.close();
-
- // close run queues
- self.state.run_outbound.close();
- self.state.run_inbound.close();
+ // close worker queue
+ self.state.work.close();
// join all worker threads
while match self.handles.pop() {
@@ -118,24 +105,17 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Drop
}
_ => false,
} {}
-
- debug!("router: device dropped");
}
}
impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle<E, C, T, B> {
pub fn new(num_workers: usize, tun: T) -> DeviceHandle<E, C, T, B> {
- // allocate shared device state
- let (queue_outbound, mut outrx) = ParallelQueue::new(num_workers, PARALLEL_QUEUE_SIZE);
- let (queue_inbound, mut inrx) = ParallelQueue::new(num_workers, PARALLEL_QUEUE_SIZE);
+ let (work, mut consumers) = ParallelQueue::new(num_workers, PARALLEL_QUEUE_SIZE);
let device = Device {
inner: Arc::new(DeviceInner {
+ work,
inbound: tun,
- queue_inbound,
outbound: RwLock::new((true, None)),
- queue_outbound,
- run_inbound: RunQueue::new(),
- run_outbound: RunQueue::new(),
recv: RwLock::new(HashMap::new()),
table: RoutingTable::new(),
}),
@@ -143,52 +123,10 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle<
// start worker threads
let mut threads = Vec::with_capacity(num_workers);
-
- // inbound/decryption workers
- for _ in 0..num_workers {
- // parallel workers (parallel processing)
- {
- let device = device.clone();
- let rx = inrx.pop().unwrap();
- threads.push(thread::spawn(move || {
- log::debug!("inbound parallel router worker started");
- inbound::parallel(device, rx)
- }));
- }
-
- // sequential workers (in-order processing)
- {
- let device = device.clone();
- threads.push(thread::spawn(move || {
- log::debug!("inbound sequential router worker started");
- inbound::sequential(device)
- }));
- }
+ while let Some(rx) = consumers.pop() {
+ threads.push(thread::spawn(move || worker(rx)));
}
-
- // outbound/encryption workers
- for _ in 0..num_workers {
- // parallel workers (parallel processing)
- {
- let device = device.clone();
- let rx = outrx.pop().unwrap();
- threads.push(thread::spawn(move || {
- log::debug!("outbound parallel router worker started");
- outbound::parallel(device, rx)
- }));
- }
-
- // sequential workers (in-order processing)
- {
- let device = device.clone();
- threads.push(thread::spawn(move || {
- log::debug!("outbound sequential router worker started");
- outbound::sequential(device)
- }));
- }
- }
-
- debug_assert_eq!(threads.len(), num_workers * 4);
+ debug_assert_eq!(threads.len(), num_workers);
// return exported device handle
DeviceHandle {
@@ -250,10 +188,7 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle<
.ok_or(RouterError::NoCryptoKeyRoute)?;
// schedule for encryption and transmission to peer
- if let Some(job) = peer.send_job(msg, true) {
- self.state.queue_outbound.send(job);
- }
-
+ peer.send(msg, true);
Ok(())
}
@@ -297,10 +232,13 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle<
.get(&header.f_receiver.get())
.ok_or(RouterError::UnknownReceiverId)?;
- // schedule for decryption and TUN write
- if let Some(job) = dec.peer.recv_job(src, dec.clone(), msg) {
- log::trace!("schedule decryption of transport message");
- self.state.queue_inbound.send(job);
+ // create inbound job
+ let job = ReceiveJob::new(msg, dec.clone(), src);
+
+ // 1. add to sequential queue (drop if full)
+ // 2. then add to parallel work queue (wait if full)
+ if dec.peer.inbound.push(job.clone()) {
+ self.state.work.send(JobUnion::Inbound(job));
}
Ok(())
}