aboutsummaryrefslogtreecommitdiffstats
path: root/src/wireguard/router
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2019-12-10 18:17:48 +0100
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2019-12-10 18:17:48 +0100
commit656679638750f84d1c8b75d8c44974ed45d36092 (patch)
tree9118055a5333afc6b1c3df3e77d315e55e5f8023 /src/wireguard/router
parentFormatting (diff)
downloadwireguard-rs-656679638750f84d1c8b75d8c44974ed45d36092.tar.xz
wireguard-rs-656679638750f84d1c8b75d8c44974ed45d36092.zip
Remove crossbeam dependency
Diffstat (limited to 'src/wireguard/router')
-rw-r--r--src/wireguard/router/device.rs6
-rw-r--r--src/wireguard/router/mod.rs4
-rw-r--r--src/wireguard/router/queue.rs46
3 files changed, 4 insertions, 52 deletions
diff --git a/src/wireguard/router/device.rs b/src/wireguard/router/device.rs
index febea45..1d3b743 100644
--- a/src/wireguard/router/device.rs
+++ b/src/wireguard/router/device.rs
@@ -24,7 +24,7 @@ use super::route::RoutingTable;
use super::runq::RunQueue;
use super::super::{tun, udp, Endpoint, KeyPair};
-use super::queue::ParallelQueue;
+use super::ParallelQueue;
pub struct DeviceInner<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> {
// inbound writer (TUN)
@@ -125,8 +125,8 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Drop
impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle<E, C, T, B> {
pub fn new(num_workers: usize, tun: T) -> DeviceHandle<E, C, T, B> {
// allocate shared device state
- let (mut outrx, queue_outbound) = ParallelQueue::new(num_workers);
- let (mut inrx, queue_inbound) = ParallelQueue::new(num_workers);
+ let (queue_outbound, mut outrx) = ParallelQueue::new(num_workers, 128);
+ let (queue_inbound, mut inrx) = ParallelQueue::new(num_workers, 128);
let device = Device {
inner: Arc::new(DeviceInner {
inbound: tun,
diff --git a/src/wireguard/router/mod.rs b/src/wireguard/router/mod.rs
index 49a4f96..8238d32 100644
--- a/src/wireguard/router/mod.rs
+++ b/src/wireguard/router/mod.rs
@@ -7,13 +7,10 @@ mod messages;
mod outbound;
mod peer;
mod pool;
-mod queue;
mod route;
mod runq;
mod types;
-// mod workers;
-
#[cfg(test)]
mod tests;
@@ -21,6 +18,7 @@ use messages::TransportHeader;
use std::mem;
use super::constants::REJECT_AFTER_MESSAGES;
+use super::queue::ParallelQueue;
use super::types::*;
use super::{tun, udp, Endpoint};
diff --git a/src/wireguard/router/queue.rs b/src/wireguard/router/queue.rs
deleted file mode 100644
index 5d0165c..0000000
--- a/src/wireguard/router/queue.rs
+++ /dev/null
@@ -1,46 +0,0 @@
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::mpsc::sync_channel;
-use std::sync::mpsc::{Receiver, SyncSender};
-
-use spin::Mutex;
-
-pub struct ParallelQueue<T> {
- next: AtomicUsize, // next round-robin index
- queues: Vec<Mutex<SyncSender<T>>>, // work queues (1 per thread)
-}
-
-impl<T> ParallelQueue<T> {
- pub fn new(queues: usize) -> (Vec<Receiver<T>>, Self) {
- let mut rxs = vec![];
- let mut txs = vec![];
-
- for _ in 0..queues {
- let (tx, rx) = sync_channel(128);
- txs.push(Mutex::new(tx));
- rxs.push(rx);
- }
-
- (
- rxs,
- ParallelQueue {
- next: AtomicUsize::new(0),
- queues: txs,
- },
- )
- }
-
- pub fn send(&self, v: T) {
- let len = self.queues.len();
- let idx = self.next.fetch_add(1, Ordering::SeqCst);
- let que = self.queues[idx % len].lock();
- que.send(v).unwrap();
- }
-
- pub fn close(&self) {
- for i in 0..self.queues.len() {
- let (tx, _) = sync_channel(0);
- let queue = &self.queues[i];
- *queue.lock() = tx;
- }
- }
-}