aboutsummaryrefslogtreecommitdiffstats
path: root/src/wireguard/wireguard.rs
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2019-12-10 18:17:48 +0100
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2019-12-10 18:17:48 +0100
commit656679638750f84d1c8b75d8c44974ed45d36092 (patch)
tree9118055a5333afc6b1c3df3e77d315e55e5f8023 /src/wireguard/wireguard.rs
parentFormatting (diff)
downloadwireguard-rs-656679638750f84d1c8b75d8c44974ed45d36092.tar.xz
wireguard-rs-656679638750f84d1c8b75d8c44974ed45d36092.zip
Remove crossbeam dependency
Diffstat (limited to 'src/wireguard/wireguard.rs')
-rw-r--r--src/wireguard/wireguard.rs30
1 files changed, 15 insertions, 15 deletions
diff --git a/src/wireguard/wireguard.rs b/src/wireguard/wireguard.rs
index 2b0e779..d0c0e53 100644
--- a/src/wireguard/wireguard.rs
+++ b/src/wireguard/wireguard.rs
@@ -4,6 +4,7 @@ use super::router;
use super::timers::{Events, Timers};
use super::{Peer, PeerInner};
+use super::queue::ParallelQueue;
use super::tun;
use super::tun::Reader as TunReader;
@@ -35,7 +36,6 @@ use rand::Rng;
use spin::{Mutex, RwLock};
use byteorder::{ByteOrder, LittleEndian};
-use crossbeam_channel::{bounded, Sender};
use x25519_dalek::{PublicKey, StaticSecret};
const SIZE_HANDSHAKE_QUEUE: usize = 128;
@@ -99,7 +99,7 @@ pub struct WireguardInner<T: tun::Tun, B: udp::UDP> {
handshake: RwLock<handshake::Device>,
under_load: AtomicBool,
pending: AtomicUsize, // num of pending handshake packets in queue
- queue: Mutex<Sender<HandshakeJob<B::Endpoint>>>,
+ queue: ParallelQueue<HandshakeJob<B::Endpoint>>,
}
impl<T: tun::Tun, B: udp::UDP> PeerInner<T, B> {
@@ -123,7 +123,7 @@ impl<T: tun::Tun, B: udp::UDP> PeerInner<T, B> {
if !self.handshake_queued.swap(true, Ordering::SeqCst) {
self.wg.pending.fetch_add(1, Ordering::SeqCst);
- self.queue.lock().send(HandshakeJob::New(self.pk)).unwrap();
+ self.wg.queue.send(HandshakeJob::New(self.pk));
}
}
}
@@ -288,7 +288,6 @@ impl<T: tun::Tun, B: udp::UDP> Wireguard<T, B> {
walltime_last_handshake: Mutex::new(None),
last_handshake_sent: Mutex::new(Instant::now() - TIME_HORIZON),
handshake_queued: AtomicBool::new(false),
- queue: Mutex::new(self.state.queue.lock().clone()),
rx_bytes: AtomicU64::new(0),
tx_bytes: AtomicU64::new(0),
timers: RwLock::new(Timers::dummy(&self.runner)),
@@ -386,10 +385,7 @@ impl<T: tun::Tun, B: udp::UDP> Wireguard<T, B> {
}
// add to handshake queue
- wg.queue
- .lock()
- .send(HandshakeJob::Message(msg, src))
- .unwrap();
+ wg.queue.send(HandshakeJob::Message(msg, src));
}
router::TYPE_TRANSPORT => {
debug!("{} : reader, received transport message", wg);
@@ -477,12 +473,14 @@ impl<T: tun::Tun, B: udp::UDP> Wireguard<T, B> {
}
pub fn new(writer: T::Writer) -> Wireguard<T, B> {
+ // workers equal to number of physical cores
+ let cpus = num_cpus::get();
+
// create device state
let mut rng = OsRng::new().unwrap();
// handshake queue
- let (tx, rx): (Sender<HandshakeJob<B::Endpoint>>, _) = bounded(SIZE_HANDSHAKE_QUEUE);
-
+ let (tx, mut rxs) = ParallelQueue::new(cpus, 128);
let wg = Arc::new(WireguardInner {
enabled: RwLock::new(false),
tun_readers: WaitHandle::new(),
@@ -494,13 +492,12 @@ impl<T: tun::Tun, B: udp::UDP> Wireguard<T, B> {
pending: AtomicUsize::new(0),
handshake: RwLock::new(handshake::Device::new()),
under_load: AtomicBool::new(false),
- queue: Mutex::new(tx),
+ queue: tx,
});
// start handshake workers
- for _ in 0..num_cpus::get() {
+ while let Some(rx) = rxs.pop() {
let wg = wg.clone();
- let rx = rx.clone();
thread::spawn(move || {
debug!("{} : handshake worker, started", wg);
@@ -509,16 +506,18 @@ impl<T: tun::Tun, B: udp::UDP> Wireguard<T, B> {
// process elements from the handshake queue
for job in rx {
- // decrement pending
+ // decrement pending pakcets (under_load)
+ let job: HandshakeJob<B::Endpoint> = job;
wg.pending.fetch_sub(1, Ordering::SeqCst);
- let device = wg.handshake.read();
+ // demultiplex staged handshake jobs and handshake messages
match job {
HandshakeJob::Message(msg, src) => {
// feed message to handshake device
let src_validate = (&src).into_address(); // TODO avoid
// process message
+ let device = wg.handshake.read();
match device.process(
&mut rng,
&msg[..],
@@ -599,6 +598,7 @@ impl<T: tun::Tun, B: udp::UDP> Wireguard<T, B> {
"{} : handshake worker, new handshake requested for {}",
wg, peer
);
+ let device = wg.handshake.read();
let _ = device.begin(&mut rng, &peer.pk).map(|msg| {
let _ = peer.router.send(&msg[..]).map_err(|e| {
debug!("{} : handshake worker, failed to send handshake initiation, error = {}", wg, e)