diff options
Diffstat (limited to 'src/wireguard/router/device.rs')
-rw-r--r-- | src/wireguard/router/device.rs | 101 |
1 files changed, 71 insertions, 30 deletions
diff --git a/src/wireguard/router/device.rs b/src/wireguard/router/device.rs index e405446..9bba199 100644 --- a/src/wireguard/router/device.rs +++ b/src/wireguard/router/device.rs @@ -20,6 +20,7 @@ use super::peer::{new_peer, Peer, PeerHandle}; use super::types::{Callbacks, RouterError}; use super::SIZE_MESSAGE_PREFIX; +use super::runq::RunQueue; use super::route::RoutingTable; use super::super::{tun, udp, Endpoint, KeyPair}; @@ -37,8 +38,12 @@ pub struct DeviceInner<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer pub table: RoutingTable<Peer<E, C, T, B>>, // work queues - pub outbound_queue: ParallelQueue<Job<Peer<E, C, T, B>, outbound::Outbound>>, - pub inbound_queue: ParallelQueue<Job<Peer<E, C, T, B>, inbound::Inbound<E, C, T, B>>>, + pub queue_outbound: ParallelQueue<Job<Peer<E, C, T, B>, outbound::Outbound>>, + pub queue_inbound: ParallelQueue<Job<Peer<E, C, T, B>, inbound::Inbound<E, C, T, B>>>, + + // run queues + pub run_inbound: RunQueue<Peer<E, C, T, B>>, + pub run_outbound: RunQueue<Peer<E, C, T, B>>, } pub struct EncryptionState { @@ -96,8 +101,12 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Drop debug!("router: dropping device"); // close worker queues - self.state.outbound_queue.close(); - self.state.inbound_queue.close(); + self.state.queue_outbound.close(); + self.state.queue_inbound.close(); + + // close run queues + self.state.run_outbound.close(); + self.state.run_inbound.close(); // join all worker threads while match self.handles.pop() { @@ -116,43 +125,73 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Drop impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle<E, C, T, B> { pub fn new(num_workers: usize, tun: T) -> DeviceHandle<E, C, T, B> { // allocate shared device state - let (mut outrx, outbound_queue) = ParallelQueue::new(num_workers); - let (mut inrx, inbound_queue) = ParallelQueue::new(num_workers); - let inner = DeviceInner { - inbound: tun, - inbound_queue, - outbound: RwLock::new((true, None)), - outbound_queue, - recv: RwLock::new(HashMap::new()), - table: RoutingTable::new(), + let (mut outrx, queue_outbound) = ParallelQueue::new(num_workers); + let (mut inrx, queue_inbound) = ParallelQueue::new(num_workers); + let device = Device { + inner: Arc::new(DeviceInner { + inbound: tun, + queue_inbound, + outbound: RwLock::new((true, None)), + queue_outbound, + run_inbound: RunQueue::new(), + run_outbound: RunQueue::new(), + recv: RwLock::new(HashMap::new()), + table: RoutingTable::new(), + }) }; // start worker threads let mut threads = Vec::with_capacity(num_workers); + // inbound/decryption workers for _ in 0..num_workers { - let rx = inrx.pop().unwrap(); - threads.push(thread::spawn(move || { - log::debug!("inbound router worker started"); - inbound::worker(rx) - })); + // parallel workers (parallel processing) + { + let device = device.clone(); + let rx = inrx.pop().unwrap(); + threads.push(thread::spawn(move || { + log::debug!("inbound parallel router worker started"); + inbound::parallel(device, rx) + })); + } + + // sequential workers (in-order processing) + { + let device = device.clone(); + threads.push(thread::spawn(move || { + log::debug!("inbound sequential router worker started"); + inbound::sequential(device) + })); + } } + // outbound/encryption workers for _ in 0..num_workers { - let rx = outrx.pop().unwrap(); - threads.push(thread::spawn(move || { - log::debug!("outbound router worker started"); - outbound::worker(rx) - })); + // parallel workers (parallel processing) + { + let device = device.clone(); + let rx = outrx.pop().unwrap(); + threads.push(thread::spawn(move || { + log::debug!("outbound parallel router worker started"); + outbound::parallel(device, rx) + })); + } + + // sequential workers (in-order processing) + { + let device = device.clone(); + threads.push(thread::spawn(move || { + log::debug!("outbound sequential router worker started"); + outbound::sequential(device) + })); + } } - debug_assert_eq!(threads.len(), num_workers * 2); + debug_assert_eq!(threads.len(), num_workers * 4); // return exported device handle DeviceHandle { - state: Device { - inner: Arc::new(inner), - }, + state: device, handles: threads, } } @@ -192,7 +231,7 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle< pub fn send(&self, msg: Vec<u8>) -> Result<(), RouterError> { debug_assert!(msg.len() > SIZE_MESSAGE_PREFIX); log::trace!( - "Router, outbound packet = {}", + "send, packet = {}", hex::encode(&msg[SIZE_MESSAGE_PREFIX..]) ); @@ -208,7 +247,7 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle< // schedule for encryption and transmission to peer if let Some(job) = peer.send_job(msg, true) { - self.state.outbound_queue.send(job); + self.state.queue_outbound.send(job); } Ok(()) @@ -225,6 +264,8 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle< /// /// pub fn recv(&self, src: E, msg: Vec<u8>) -> Result<(), RouterError> { + log::trace!("receive, src: {}", src.into_address()); + // parse / cast let (header, _) = match LayoutVerified::new_from_prefix(&msg[..]) { Some(v) => v, @@ -255,7 +296,7 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle< // schedule for decryption and TUN write if let Some(job) = dec.peer.recv_job(src, dec.clone(), msg) { log::trace!("schedule decryption of transport message"); - self.state.inbound_queue.send(job); + self.state.queue_inbound.send(job); } Ok(()) } |