diff options
author | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-12-06 21:45:21 +0100 |
---|---|---|
committer | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-12-06 21:45:21 +0100 |
commit | 74e576a9c21b0de451e0588428fbbb99b24eb074 (patch) | |
tree | 381ad26325ae4bee1f6a17449110ac941c2a9192 /src/wireguard/router/device.rs | |
parent | Moving away from peer threads (diff) | |
download | wireguard-rs-74e576a9c21b0de451e0588428fbbb99b24eb074.tar.xz wireguard-rs-74e576a9c21b0de451e0588428fbbb99b24eb074.zip |
Fixed inbound job bug (add to sequential queue)
Diffstat (limited to 'src/wireguard/router/device.rs')
-rw-r--r-- | src/wireguard/router/device.rs | 63 |
1 files changed, 15 insertions, 48 deletions
diff --git a/src/wireguard/router/device.rs b/src/wireguard/router/device.rs index 88eeae1..e405446 100644 --- a/src/wireguard/router/device.rs +++ b/src/wireguard/router/device.rs @@ -1,8 +1,6 @@ use std::collections::HashMap; use std::ops::Deref; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::mpsc::sync_channel; -use std::sync::mpsc::{Receiver, SyncSender}; +use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::thread; use std::time::Instant; @@ -25,47 +23,7 @@ use super::SIZE_MESSAGE_PREFIX; use super::route::RoutingTable; use super::super::{tun, udp, Endpoint, KeyPair}; - -pub struct ParallelQueue<T> { - next: AtomicUsize, // next round-robin index - queues: Vec<Mutex<SyncSender<T>>>, // work queues (1 per thread) -} - -impl<T> ParallelQueue<T> { - fn new(queues: usize) -> (Vec<Receiver<T>>, Self) { - let mut rxs = vec![]; - let mut txs = vec![]; - - for _ in 0..queues { - let (tx, rx) = sync_channel(128); - txs.push(Mutex::new(tx)); - rxs.push(rx); - } - - ( - rxs, - ParallelQueue { - next: AtomicUsize::new(0), - queues: txs, - }, - ) - } - - pub fn send(&self, v: T) { - let len = self.queues.len(); - let idx = self.next.fetch_add(1, Ordering::SeqCst); - let que = self.queues[idx % len].lock(); - que.send(v).unwrap(); - } - - pub fn close(&self) { - for i in 0..self.queues.len() { - let (tx, _) = sync_channel(0); - let queue = &self.queues[i]; - *queue.lock() = tx; - } - } -} +use super::queue::ParallelQueue; pub struct DeviceInner<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> { // inbound writer (TUN) @@ -171,16 +129,25 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle< // start worker threads let mut threads = Vec::with_capacity(num_workers); + for _ in 0..num_workers { let rx = inrx.pop().unwrap(); - threads.push(thread::spawn(move || inbound::worker(rx))); + threads.push(thread::spawn(move || { + log::debug!("inbound router worker started"); + inbound::worker(rx) + })); } for _ in 0..num_workers { let rx = outrx.pop().unwrap(); - threads.push(thread::spawn(move || outbound::worker(rx))); + threads.push(thread::spawn(move || { + log::debug!("outbound router worker started"); + outbound::worker(rx) + })); } + debug_assert_eq!(threads.len(), num_workers * 2); + // return exported device handle DeviceHandle { state: Device { @@ -274,7 +241,7 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle< ); log::trace!( - "Router, handle transport message: (receiver = {}, counter = {})", + "handle transport message: (receiver = {}, counter = {})", header.f_receiver, header.f_counter ); @@ -287,9 +254,9 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle< // schedule for decryption and TUN write if let Some(job) = dec.peer.recv_job(src, dec.clone(), msg) { + log::trace!("schedule decryption of transport message"); self.state.inbound_queue.send(job); } - Ok(()) } |