From 656679638750f84d1c8b75d8c44974ed45d36092 Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Tue, 10 Dec 2019 18:17:48 +0100 Subject: Remove crossbeam dependency --- src/wireguard/router/device.rs | 6 +++--- src/wireguard/router/mod.rs | 4 +--- src/wireguard/router/queue.rs | 46 ------------------------------------------ 3 files changed, 4 insertions(+), 52 deletions(-) delete mode 100644 src/wireguard/router/queue.rs (limited to 'src/wireguard/router') diff --git a/src/wireguard/router/device.rs b/src/wireguard/router/device.rs index febea45..1d3b743 100644 --- a/src/wireguard/router/device.rs +++ b/src/wireguard/router/device.rs @@ -24,7 +24,7 @@ use super::route::RoutingTable; use super::runq::RunQueue; use super::super::{tun, udp, Endpoint, KeyPair}; -use super::queue::ParallelQueue; +use super::ParallelQueue; pub struct DeviceInner> { // inbound writer (TUN) @@ -125,8 +125,8 @@ impl> Drop impl> DeviceHandle { pub fn new(num_workers: usize, tun: T) -> DeviceHandle { // allocate shared device state - let (mut outrx, queue_outbound) = ParallelQueue::new(num_workers); - let (mut inrx, queue_inbound) = ParallelQueue::new(num_workers); + let (queue_outbound, mut outrx) = ParallelQueue::new(num_workers, 128); + let (queue_inbound, mut inrx) = ParallelQueue::new(num_workers, 128); let device = Device { inner: Arc::new(DeviceInner { inbound: tun, diff --git a/src/wireguard/router/mod.rs b/src/wireguard/router/mod.rs index 49a4f96..8238d32 100644 --- a/src/wireguard/router/mod.rs +++ b/src/wireguard/router/mod.rs @@ -7,13 +7,10 @@ mod messages; mod outbound; mod peer; mod pool; -mod queue; mod route; mod runq; mod types; -// mod workers; - #[cfg(test)] mod tests; @@ -21,6 +18,7 @@ use messages::TransportHeader; use std::mem; use super::constants::REJECT_AFTER_MESSAGES; +use super::queue::ParallelQueue; use super::types::*; use super::{tun, udp, Endpoint}; diff --git a/src/wireguard/router/queue.rs b/src/wireguard/router/queue.rs deleted file mode 100644 index 5d0165c..0000000 --- a/src/wireguard/router/queue.rs +++ /dev/null @@ -1,46 +0,0 @@ -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::mpsc::sync_channel; -use std::sync::mpsc::{Receiver, SyncSender}; - -use spin::Mutex; - -pub struct ParallelQueue { - next: AtomicUsize, // next round-robin index - queues: Vec>>, // work queues (1 per thread) -} - -impl ParallelQueue { - pub fn new(queues: usize) -> (Vec>, Self) { - let mut rxs = vec![]; - let mut txs = vec![]; - - for _ in 0..queues { - let (tx, rx) = sync_channel(128); - txs.push(Mutex::new(tx)); - rxs.push(rx); - } - - ( - rxs, - ParallelQueue { - next: AtomicUsize::new(0), - queues: txs, - }, - ) - } - - pub fn send(&self, v: T) { - let len = self.queues.len(); - let idx = self.next.fetch_add(1, Ordering::SeqCst); - let que = self.queues[idx % len].lock(); - que.send(v).unwrap(); - } - - pub fn close(&self) { - for i in 0..self.queues.len() { - let (tx, _) = sync_channel(0); - let queue = &self.queues[i]; - *queue.lock() = tx; - } - } -} -- cgit v1.2.3-59-g8ed1b