aboutsummaryrefslogtreecommitdiffstats
path: root/src/wireguard/router/device.rs
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2019-12-06 21:45:21 +0100
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2019-12-06 21:45:21 +0100
commit74e576a9c21b0de451e0588428fbbb99b24eb074 (patch)
tree381ad26325ae4bee1f6a17449110ac941c2a9192 /src/wireguard/router/device.rs
parentMoving away from peer threads (diff)
downloadwireguard-rs-74e576a9c21b0de451e0588428fbbb99b24eb074.tar.xz
wireguard-rs-74e576a9c21b0de451e0588428fbbb99b24eb074.zip
Fixed inbound job bug (add to sequential queue)
Diffstat (limited to 'src/wireguard/router/device.rs')
-rw-r--r--src/wireguard/router/device.rs63
1 files changed, 15 insertions, 48 deletions
diff --git a/src/wireguard/router/device.rs b/src/wireguard/router/device.rs
index 88eeae1..e405446 100644
--- a/src/wireguard/router/device.rs
+++ b/src/wireguard/router/device.rs
@@ -1,8 +1,6 @@
use std::collections::HashMap;
use std::ops::Deref;
-use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
-use std::sync::mpsc::sync_channel;
-use std::sync::mpsc::{Receiver, SyncSender};
+use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::thread;
use std::time::Instant;
@@ -25,47 +23,7 @@ use super::SIZE_MESSAGE_PREFIX;
use super::route::RoutingTable;
use super::super::{tun, udp, Endpoint, KeyPair};
-
-pub struct ParallelQueue<T> {
- next: AtomicUsize, // next round-robin index
- queues: Vec<Mutex<SyncSender<T>>>, // work queues (1 per thread)
-}
-
-impl<T> ParallelQueue<T> {
- fn new(queues: usize) -> (Vec<Receiver<T>>, Self) {
- let mut rxs = vec![];
- let mut txs = vec![];
-
- for _ in 0..queues {
- let (tx, rx) = sync_channel(128);
- txs.push(Mutex::new(tx));
- rxs.push(rx);
- }
-
- (
- rxs,
- ParallelQueue {
- next: AtomicUsize::new(0),
- queues: txs,
- },
- )
- }
-
- pub fn send(&self, v: T) {
- let len = self.queues.len();
- let idx = self.next.fetch_add(1, Ordering::SeqCst);
- let que = self.queues[idx % len].lock();
- que.send(v).unwrap();
- }
-
- pub fn close(&self) {
- for i in 0..self.queues.len() {
- let (tx, _) = sync_channel(0);
- let queue = &self.queues[i];
- *queue.lock() = tx;
- }
- }
-}
+use super::queue::ParallelQueue;
pub struct DeviceInner<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> {
// inbound writer (TUN)
@@ -171,16 +129,25 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle<
// start worker threads
let mut threads = Vec::with_capacity(num_workers);
+
for _ in 0..num_workers {
let rx = inrx.pop().unwrap();
- threads.push(thread::spawn(move || inbound::worker(rx)));
+ threads.push(thread::spawn(move || {
+ log::debug!("inbound router worker started");
+ inbound::worker(rx)
+ }));
}
for _ in 0..num_workers {
let rx = outrx.pop().unwrap();
- threads.push(thread::spawn(move || outbound::worker(rx)));
+ threads.push(thread::spawn(move || {
+ log::debug!("outbound router worker started");
+ outbound::worker(rx)
+ }));
}
+ debug_assert_eq!(threads.len(), num_workers * 2);
+
// return exported device handle
DeviceHandle {
state: Device {
@@ -274,7 +241,7 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle<
);
log::trace!(
- "Router, handle transport message: (receiver = {}, counter = {})",
+ "handle transport message: (receiver = {}, counter = {})",
header.f_receiver,
header.f_counter
);
@@ -287,9 +254,9 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle<
// schedule for decryption and TUN write
if let Some(job) = dec.peer.recv_job(src, dec.clone(), msg) {
+ log::trace!("schedule decryption of transport message");
self.state.inbound_queue.send(job);
}
-
Ok(())
}