diff options
author | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2020-02-16 20:25:31 +0100 |
---|---|---|
committer | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2020-02-16 20:25:31 +0100 |
commit | ead75828cdaa5253e57b5792b51e3d99a4a78ea0 (patch) | |
tree | 97fcba5fe19efcb52c0e25cebe4ec359c0d503c8 /src/wireguard/router/device.rs | |
parent | Fixed EINVAL on read4/6 from invalid namelen (diff) | |
download | wireguard-rs-ead75828cdaa5253e57b5792b51e3d99a4a78ea0.tar.xz wireguard-rs-ead75828cdaa5253e57b5792b51e3d99a4a78ea0.zip |
Simplified router code
Diffstat (limited to 'src/wireguard/router/device.rs')
-rw-r--r-- | src/wireguard/router/device.rs | 125 |
1 files changed, 30 insertions, 95 deletions
diff --git a/src/wireguard/router/device.rs b/src/wireguard/router/device.rs index 96b7d82..9d78178 100644 --- a/src/wireguard/router/device.rs +++ b/src/wireguard/router/device.rs @@ -10,19 +10,16 @@ use spin::{Mutex, RwLock}; use zerocopy::LayoutVerified; use super::anti_replay::AntiReplay; -use super::pool::Job; use super::constants::PARALLEL_QUEUE_SIZE; -use super::inbound; -use super::outbound; - use super::messages::{TransportHeader, TYPE_TRANSPORT}; use super::peer::{new_peer, Peer, PeerHandle}; use super::types::{Callbacks, RouterError}; use super::SIZE_MESSAGE_PREFIX; +use super::receive::ReceiveJob; use super::route::RoutingTable; -use super::runq::RunQueue; +use super::worker::{worker, JobUnion}; use super::super::{tun, udp, Endpoint, KeyPair}; use super::ParallelQueue; @@ -38,13 +35,8 @@ pub struct DeviceInner<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer pub recv: RwLock<HashMap<u32, Arc<DecryptionState<E, C, T, B>>>>, // receiver id -> decryption state pub table: RoutingTable<Peer<E, C, T, B>>, - // work queues - pub queue_outbound: ParallelQueue<Job<Peer<E, C, T, B>, outbound::Outbound>>, - pub queue_inbound: ParallelQueue<Job<Peer<E, C, T, B>, inbound::Inbound<E, C, T, B>>>, - - // run queues - pub run_inbound: RunQueue<Peer<E, C, T, B>>, - pub run_outbound: RunQueue<Peer<E, C, T, B>>, + // work queue + pub work: ParallelQueue<JobUnion<E, C, T, B>>, } pub struct EncryptionState { @@ -101,13 +93,8 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Drop fn drop(&mut self) { debug!("router: dropping device"); - // close worker queues - self.state.queue_outbound.close(); - self.state.queue_inbound.close(); - - // close run queues - self.state.run_outbound.close(); - self.state.run_inbound.close(); + // close worker queue + self.state.work.close(); // join all worker threads while match self.handles.pop() { @@ -118,77 +105,28 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Drop } _ => false, } {} - - debug!("router: device dropped"); } } impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle<E, C, T, B> { pub fn new(num_workers: usize, tun: T) -> DeviceHandle<E, C, T, B> { - // allocate shared device state - let (queue_outbound, mut outrx) = ParallelQueue::new(num_workers, PARALLEL_QUEUE_SIZE); - let (queue_inbound, mut inrx) = ParallelQueue::new(num_workers, PARALLEL_QUEUE_SIZE); + let (work, mut consumers) = ParallelQueue::new(num_workers, PARALLEL_QUEUE_SIZE); let device = Device { inner: Arc::new(DeviceInner { + work, inbound: tun, - queue_inbound, outbound: RwLock::new((true, None)), - queue_outbound, - run_inbound: RunQueue::new(), - run_outbound: RunQueue::new(), recv: RwLock::new(HashMap::new()), table: RoutingTable::new(), }), }; // start worker threads - let mut threads = Vec::with_capacity(4 * num_workers); - - // inbound/decryption workers - for _ in 0..num_workers { - // parallel workers (parallel processing) - { - let device = device.clone(); - let rx = inrx.pop().unwrap(); - threads.push(thread::spawn(move || { - log::debug!("inbound parallel router worker started"); - inbound::parallel(device, rx) - })); - } - - // sequential workers (in-order processing) - { - let device = device.clone(); - threads.push(thread::spawn(move || { - log::debug!("inbound sequential router worker started"); - inbound::sequential(device) - })); - } - } - - // outbound/encryption workers - for _ in 0..num_workers { - // parallel workers (parallel processing) - { - let device = device.clone(); - let rx = outrx.pop().unwrap(); - threads.push(thread::spawn(move || { - log::debug!("outbound parallel router worker started"); - outbound::parallel(device, rx) - })); - } - - // sequential workers (in-order processing) - { - let device = device.clone(); - threads.push(thread::spawn(move || { - log::debug!("outbound sequential router worker started"); - outbound::sequential(device) - })); - } + let mut threads = Vec::with_capacity(num_workers); + while let Some(rx) = consumers.pop() { + threads.push(thread::spawn(move || worker(rx))); } - - debug_assert_eq!(threads.len(), num_workers * 4); + debug_assert_eq!(threads.len(), num_workers); // return exported device handle DeviceHandle { @@ -197,6 +135,16 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle< } } + pub fn send_raw(&self, msg : &[u8], dst: &mut E) -> Result<(), B::Error> { + let bind = self.state.outbound.read(); + if bind.0 { + if let Some(bind) = bind.1.as_ref() { + return bind.write(msg, dst); + } + } + return Ok(()) + } + /// Brings the router down. /// When the router is brought down it: /// - Prevents transmission of outbound messages. @@ -250,10 +198,7 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle< .ok_or(RouterError::NoCryptoKeyRoute)?; // schedule for encryption and transmission to peer - if let Some(job) = peer.send_job(msg, true) { - self.state.queue_outbound.send(job); - } - + peer.send(msg, true); Ok(()) } @@ -297,10 +242,13 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle< .get(&header.f_receiver.get()) .ok_or(RouterError::UnknownReceiverId)?; - // schedule for decryption and TUN write - if let Some(job) = dec.peer.recv_job(src, dec.clone(), msg) { - log::trace!("schedule decryption of transport message"); - self.state.queue_inbound.send(job); + // create inbound job + let job = ReceiveJob::new(msg, dec.clone(), src); + + // 1. add to sequential queue (drop if full) + // 2. then add to parallel work queue (wait if full) + if dec.peer.inbound.push(job.clone()) { + self.state.work.send(JobUnion::Inbound(job)); } Ok(()) } @@ -311,17 +259,4 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle< pub fn set_outbound_writer(&self, new: B) { self.state.outbound.write().1 = Some(new); } - - pub fn write(&self, msg: &[u8], endpoint: &mut E) -> Result<(), RouterError> { - let outbound = self.state.outbound.read(); - if outbound.0 { - outbound - .1 - .as_ref() - .ok_or(RouterError::SendError) - .and_then(|w| w.write(msg, endpoint).map_err(|_| RouterError::SendError)) - } else { - Ok(()) - } - } } |