aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2019-12-16 15:26:15 +0100
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2019-12-16 15:26:15 +0100
commitfd3ba63e80e7f97ae89ab5938fe6594c2d1c0869 (patch)
treeeb4d998b2e2b19440466c17a22d5742cd9ebdfc7 /src
parentRemoved unused atexit (diff)
downloadwireguard-rs-fd3ba63e80e7f97ae89ab5938fe6594c2d1c0869.tar.xz
wireguard-rs-fd3ba63e80e7f97ae89ab5938fe6594c2d1c0869.zip
Revert to crossbeam
Diffstat (limited to 'src')
-rw-r--r--src/main.rs9
-rw-r--r--src/wireguard/queue.rs47
-rw-r--r--src/wireguard/router/inbound.rs2
-rw-r--r--src/wireguard/router/outbound.rs2
-rw-r--r--src/wireguard/router/pool.rs7
-rw-r--r--src/wireguard/router/workers.rs5
6 files changed, 21 insertions, 51 deletions
diff --git a/src/main.rs b/src/main.rs
index 087b54b..59b21e5 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -180,14 +180,11 @@ fn main() {
});
}
Err(err) => {
- log::info!("UAPI error: {}", err);
- break;
+ log::info!("UAPI connection error: {}", err);
+ profiler_stop();
+ exit(0);
}
}
-
- // exit
- profiler_stop();
- exit(0);
});
// block until all tun readers closed
diff --git a/src/wireguard/queue.rs b/src/wireguard/queue.rs
index f484320..4c004c4 100644
--- a/src/wireguard/queue.rs
+++ b/src/wireguard/queue.rs
@@ -1,19 +1,8 @@
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::mpsc::sync_channel;
-use std::sync::mpsc::{Receiver, SyncSender};
+use crossbeam_channel::{bounded, Receiver, Sender};
use std::sync::Mutex;
-/// A simple parallel queue used to pass work to a worker pool.
-///
-/// Unlike e.g. the crossbeam multi-producer multi-consumer queue
-/// the ParallelQueue offers fewer features and instead improves speed:
-///
-/// The crossbeam channel ensures that elements are consumed
-/// even if not every Receiver is being read from.
-/// This is not ensured by ParallelQueue.
pub struct ParallelQueue<T> {
- next: AtomicUsize, // next round-robin index
- queues: Vec<Mutex<Option<SyncSender<T>>>>, // work queues (1 per thread)
+ queue: Mutex<Option<Sender<T>>>, // work queues (1 per thread)
}
impl<T> ParallelQueue<T> {
@@ -25,40 +14,26 @@ impl<T> ParallelQueue<T> {
/// - `capacity`: capacity of each internal queue
///
pub fn new(queues: usize, capacity: usize) -> (Self, Vec<Receiver<T>>) {
- let mut rxs = vec![];
- let mut txs = vec![];
-
+ let mut receivers = Vec::with_capacity(queues);
+ let (tx, rx) = bounded(capacity);
for _ in 0..queues {
- let (tx, rx) = sync_channel(capacity);
- txs.push(Mutex::new(Some(tx)));
- rxs.push(rx);
+ receivers.push(rx.clone());
}
-
(
ParallelQueue {
- next: AtomicUsize::new(0),
- queues: txs,
+ queue: Mutex::new(Some(tx)),
},
- rxs,
+ receivers,
)
}
pub fn send(&self, v: T) {
- let len = self.queues.len();
- let idx = self.next.fetch_add(1, Ordering::SeqCst);
- match self.queues[idx % len].lock().unwrap().as_ref() {
- Some(que) => {
- // TODO: consider best way to propergate Result
- let _ = que.send(v);
- }
- _ => (),
- }
+ self.queue.lock().unwrap().as_ref().map(|s| {
+ let _ = s.send(v);
+ });
}
pub fn close(&self) {
- for i in 0..self.queues.len() {
- let queue = &self.queues[i];
- *queue.lock().unwrap() = None;
- }
+ *self.queue.lock().unwrap() = None;
}
}
diff --git a/src/wireguard/router/inbound.rs b/src/wireguard/router/inbound.rs
index db6d3f3..96c2e33 100644
--- a/src/wireguard/router/inbound.rs
+++ b/src/wireguard/router/inbound.rs
@@ -8,12 +8,12 @@ use super::runq::RunQueue;
use super::types::Callbacks;
use super::{tun, udp, Endpoint};
+use crossbeam_channel::Receiver;
use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305};
use zerocopy::{AsBytes, LayoutVerified};
use std::mem;
use std::sync::atomic::Ordering;
-use std::sync::mpsc::Receiver;
use std::sync::Arc;
pub const SIZE_TAG: usize = 16;
diff --git a/src/wireguard/router/outbound.rs b/src/wireguard/router/outbound.rs
index a555ecb..a0a1c72 100644
--- a/src/wireguard/router/outbound.rs
+++ b/src/wireguard/router/outbound.rs
@@ -8,9 +8,9 @@ use super::KeyPair;
use super::REJECT_AFTER_MESSAGES;
use super::{tun, udp, Endpoint};
-use std::sync::mpsc::Receiver;
use std::sync::Arc;
+use crossbeam_channel::Receiver;
use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305};
use zerocopy::{AsBytes, LayoutVerified};
diff --git a/src/wireguard/router/pool.rs b/src/wireguard/router/pool.rs
index 686c788..3fc0026 100644
--- a/src/wireguard/router/pool.rs
+++ b/src/wireguard/router/pool.rs
@@ -1,9 +1,10 @@
-use arraydeque::ArrayDeque;
-use spin::{Mutex, MutexGuard};
use std::mem;
-use std::sync::mpsc::Receiver;
use std::sync::Arc;
+use arraydeque::ArrayDeque;
+use crossbeam_channel::Receiver;
+use spin::{Mutex, MutexGuard};
+
use super::constants::INORDER_QUEUE_SIZE;
use super::runq::{RunQueue, ToKey};
diff --git a/src/wireguard/router/workers.rs b/src/wireguard/router/workers.rs
index 3ed6311..43464a0 100644
--- a/src/wireguard/router/workers.rs
+++ b/src/wireguard/router/workers.rs
@@ -1,13 +1,10 @@
-use std::sync::mpsc::Receiver;
use std::sync::Arc;
-use futures::sync::oneshot;
-use futures::*;
-
use log::{debug, trace};
use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305};
+use crossbeam_channel::Receiver;
use std::sync::atomic::Ordering;
use zerocopy::{AsBytes, LayoutVerified};