aboutsummaryrefslogtreecommitdiffstats
path: root/src/wireguard/router/device.rs
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2020-02-16 20:25:31 +0100
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2020-02-16 20:25:31 +0100
commitead75828cdaa5253e57b5792b51e3d99a4a78ea0 (patch)
tree97fcba5fe19efcb52c0e25cebe4ec359c0d503c8 /src/wireguard/router/device.rs
parentFixed EINVAL on read4/6 from invalid namelen (diff)
downloadwireguard-rs-ead75828cdaa5253e57b5792b51e3d99a4a78ea0.tar.xz
wireguard-rs-ead75828cdaa5253e57b5792b51e3d99a4a78ea0.zip
Simplified router code
Diffstat (limited to 'src/wireguard/router/device.rs')
-rw-r--r--src/wireguard/router/device.rs125
1 files changed, 30 insertions, 95 deletions
diff --git a/src/wireguard/router/device.rs b/src/wireguard/router/device.rs
index 96b7d82..9d78178 100644
--- a/src/wireguard/router/device.rs
+++ b/src/wireguard/router/device.rs
@@ -10,19 +10,16 @@ use spin::{Mutex, RwLock};
use zerocopy::LayoutVerified;
use super::anti_replay::AntiReplay;
-use super::pool::Job;
use super::constants::PARALLEL_QUEUE_SIZE;
-use super::inbound;
-use super::outbound;
-
use super::messages::{TransportHeader, TYPE_TRANSPORT};
use super::peer::{new_peer, Peer, PeerHandle};
use super::types::{Callbacks, RouterError};
use super::SIZE_MESSAGE_PREFIX;
+use super::receive::ReceiveJob;
use super::route::RoutingTable;
-use super::runq::RunQueue;
+use super::worker::{worker, JobUnion};
use super::super::{tun, udp, Endpoint, KeyPair};
use super::ParallelQueue;
@@ -38,13 +35,8 @@ pub struct DeviceInner<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer
pub recv: RwLock<HashMap<u32, Arc<DecryptionState<E, C, T, B>>>>, // receiver id -> decryption state
pub table: RoutingTable<Peer<E, C, T, B>>,
- // work queues
- pub queue_outbound: ParallelQueue<Job<Peer<E, C, T, B>, outbound::Outbound>>,
- pub queue_inbound: ParallelQueue<Job<Peer<E, C, T, B>, inbound::Inbound<E, C, T, B>>>,
-
- // run queues
- pub run_inbound: RunQueue<Peer<E, C, T, B>>,
- pub run_outbound: RunQueue<Peer<E, C, T, B>>,
+ // work queue
+ pub work: ParallelQueue<JobUnion<E, C, T, B>>,
}
pub struct EncryptionState {
@@ -101,13 +93,8 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Drop
fn drop(&mut self) {
debug!("router: dropping device");
- // close worker queues
- self.state.queue_outbound.close();
- self.state.queue_inbound.close();
-
- // close run queues
- self.state.run_outbound.close();
- self.state.run_inbound.close();
+ // close worker queue
+ self.state.work.close();
// join all worker threads
while match self.handles.pop() {
@@ -118,77 +105,28 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Drop
}
_ => false,
} {}
-
- debug!("router: device dropped");
}
}
impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle<E, C, T, B> {
pub fn new(num_workers: usize, tun: T) -> DeviceHandle<E, C, T, B> {
- // allocate shared device state
- let (queue_outbound, mut outrx) = ParallelQueue::new(num_workers, PARALLEL_QUEUE_SIZE);
- let (queue_inbound, mut inrx) = ParallelQueue::new(num_workers, PARALLEL_QUEUE_SIZE);
+ let (work, mut consumers) = ParallelQueue::new(num_workers, PARALLEL_QUEUE_SIZE);
let device = Device {
inner: Arc::new(DeviceInner {
+ work,
inbound: tun,
- queue_inbound,
outbound: RwLock::new((true, None)),
- queue_outbound,
- run_inbound: RunQueue::new(),
- run_outbound: RunQueue::new(),
recv: RwLock::new(HashMap::new()),
table: RoutingTable::new(),
}),
};
// start worker threads
- let mut threads = Vec::with_capacity(4 * num_workers);
-
- // inbound/decryption workers
- for _ in 0..num_workers {
- // parallel workers (parallel processing)
- {
- let device = device.clone();
- let rx = inrx.pop().unwrap();
- threads.push(thread::spawn(move || {
- log::debug!("inbound parallel router worker started");
- inbound::parallel(device, rx)
- }));
- }
-
- // sequential workers (in-order processing)
- {
- let device = device.clone();
- threads.push(thread::spawn(move || {
- log::debug!("inbound sequential router worker started");
- inbound::sequential(device)
- }));
- }
- }
-
- // outbound/encryption workers
- for _ in 0..num_workers {
- // parallel workers (parallel processing)
- {
- let device = device.clone();
- let rx = outrx.pop().unwrap();
- threads.push(thread::spawn(move || {
- log::debug!("outbound parallel router worker started");
- outbound::parallel(device, rx)
- }));
- }
-
- // sequential workers (in-order processing)
- {
- let device = device.clone();
- threads.push(thread::spawn(move || {
- log::debug!("outbound sequential router worker started");
- outbound::sequential(device)
- }));
- }
+ let mut threads = Vec::with_capacity(num_workers);
+ while let Some(rx) = consumers.pop() {
+ threads.push(thread::spawn(move || worker(rx)));
}
-
- debug_assert_eq!(threads.len(), num_workers * 4);
+ debug_assert_eq!(threads.len(), num_workers);
// return exported device handle
DeviceHandle {
@@ -197,6 +135,16 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle<
}
}
+ pub fn send_raw(&self, msg : &[u8], dst: &mut E) -> Result<(), B::Error> {
+ let bind = self.state.outbound.read();
+ if bind.0 {
+ if let Some(bind) = bind.1.as_ref() {
+ return bind.write(msg, dst);
+ }
+ }
+ return Ok(())
+ }
+
/// Brings the router down.
/// When the router is brought down it:
/// - Prevents transmission of outbound messages.
@@ -250,10 +198,7 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle<
.ok_or(RouterError::NoCryptoKeyRoute)?;
// schedule for encryption and transmission to peer
- if let Some(job) = peer.send_job(msg, true) {
- self.state.queue_outbound.send(job);
- }
-
+ peer.send(msg, true);
Ok(())
}
@@ -297,10 +242,13 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle<
.get(&header.f_receiver.get())
.ok_or(RouterError::UnknownReceiverId)?;
- // schedule for decryption and TUN write
- if let Some(job) = dec.peer.recv_job(src, dec.clone(), msg) {
- log::trace!("schedule decryption of transport message");
- self.state.queue_inbound.send(job);
+ // create inbound job
+ let job = ReceiveJob::new(msg, dec.clone(), src);
+
+ // 1. add to sequential queue (drop if full)
+ // 2. then add to parallel work queue (wait if full)
+ if dec.peer.inbound.push(job.clone()) {
+ self.state.work.send(JobUnion::Inbound(job));
}
Ok(())
}
@@ -311,17 +259,4 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle<
pub fn set_outbound_writer(&self, new: B) {
self.state.outbound.write().1 = Some(new);
}
-
- pub fn write(&self, msg: &[u8], endpoint: &mut E) -> Result<(), RouterError> {
- let outbound = self.state.outbound.read();
- if outbound.0 {
- outbound
- .1
- .as_ref()
- .ok_or(RouterError::SendError)
- .and_then(|w| w.write(msg, endpoint).map_err(|_| RouterError::SendError))
- } else {
- Ok(())
- }
- }
}