From 5a7f762d6ce6b5bbdbd10f5966adc909597f37d6 Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Tue, 3 Dec 2019 21:49:08 +0100 Subject: Moving away from peer threads --- src/wireguard/router/device.rs | 141 +++++++++++++++++++++++++++++++---------- 1 file changed, 108 insertions(+), 33 deletions(-) (limited to 'src/wireguard/router/device.rs') diff --git a/src/wireguard/router/device.rs b/src/wireguard/router/device.rs index 621010b..88eeae1 100644 --- a/src/wireguard/router/device.rs +++ b/src/wireguard/router/device.rs @@ -1,7 +1,8 @@ use std::collections::HashMap; +use std::ops::Deref; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::mpsc::sync_channel; -use std::sync::mpsc::SyncSender; +use std::sync::mpsc::{Receiver, SyncSender}; use std::sync::Arc; use std::thread; use std::time::Instant; @@ -11,18 +12,61 @@ use spin::{Mutex, RwLock}; use zerocopy::LayoutVerified; use super::anti_replay::AntiReplay; -use super::constants::*; +use super::pool::Job; + +use super::inbound; +use super::outbound; use super::messages::{TransportHeader, TYPE_TRANSPORT}; -use super::peer::{new_peer, Peer, PeerInner}; +use super::peer::{new_peer, Peer, PeerHandle}; use super::types::{Callbacks, RouterError}; -use super::workers::{worker_parallel, JobParallel}; use super::SIZE_MESSAGE_PREFIX; use super::route::RoutingTable; use super::super::{tun, udp, Endpoint, KeyPair}; +pub struct ParallelQueue { + next: AtomicUsize, // next round-robin index + queues: Vec>>, // work queues (1 per thread) +} + +impl ParallelQueue { + fn new(queues: usize) -> (Vec>, Self) { + let mut rxs = vec![]; + let mut txs = vec![]; + + for _ in 0..queues { + let (tx, rx) = sync_channel(128); + txs.push(Mutex::new(tx)); + rxs.push(rx); + } + + ( + rxs, + ParallelQueue { + next: AtomicUsize::new(0), + queues: txs, + }, + ) + } + + pub fn send(&self, v: T) { + let len = self.queues.len(); + let idx = self.next.fetch_add(1, Ordering::SeqCst); + let que = self.queues[idx % len].lock(); + que.send(v).unwrap(); + } + + pub fn close(&self) { + for i in 0..self.queues.len() { + let (tx, _) = sync_channel(0); + let queue = &self.queues[i]; + *queue.lock() = tx; + } + } +} + pub struct DeviceInner> { // inbound writer (TUN) pub inbound: T, @@ -32,11 +76,11 @@ pub struct DeviceInner>>>, // receiver id -> decryption state - pub table: RoutingTable>, + pub table: RoutingTable>, // work queues - pub queue_next: AtomicUsize, // next round-robin index - pub queues: Mutex>>, // work queues (1 per thread) + pub outbound_queue: ParallelQueue, outbound::Outbound>>, + pub inbound_queue: ParallelQueue, inbound::Inbound>>, } pub struct EncryptionState { @@ -49,24 +93,53 @@ pub struct DecryptionState, pub confirmed: AtomicBool, pub protector: Mutex, - pub peer: Arc>, + pub peer: Peer, pub death: Instant, // time when the key can no longer be used for decryption } pub struct Device> { - state: Arc>, // reference to device state + inner: Arc>, +} + +impl> Clone for Device { + fn clone(&self) -> Self { + Device { + inner: self.inner.clone(), + } + } +} + +impl> PartialEq + for Device +{ + fn eq(&self, other: &Self) -> bool { + Arc::ptr_eq(&self.inner, &other.inner) + } +} + +impl> Eq for Device {} + +impl> Deref for Device { + type Target = DeviceInner; + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +pub struct DeviceHandle> { + state: Device, // reference to device state handles: Vec>, // join handles for workers } -impl> Drop for Device { +impl> Drop + for DeviceHandle +{ fn drop(&mut self) { debug!("router: dropping device"); - // drop all queues - { - let mut queues = self.state.queues.lock(); - while queues.pop().is_some() {} - } + // close worker queues + self.state.outbound_queue.close(); + self.state.inbound_queue.close(); // join all worker threads while match self.handles.pop() { @@ -82,14 +155,16 @@ impl> Drop for Devi } } -impl> Device { - pub fn new(num_workers: usize, tun: T) -> Device { +impl> DeviceHandle { + pub fn new(num_workers: usize, tun: T) -> DeviceHandle { // allocate shared device state + let (mut outrx, outbound_queue) = ParallelQueue::new(num_workers); + let (mut inrx, inbound_queue) = ParallelQueue::new(num_workers); let inner = DeviceInner { inbound: tun, + inbound_queue, outbound: RwLock::new((true, None)), - queues: Mutex::new(Vec::with_capacity(num_workers)), - queue_next: AtomicUsize::new(0), + outbound_queue, recv: RwLock::new(HashMap::new()), table: RoutingTable::new(), }; @@ -97,14 +172,20 @@ impl> Device> Device Peer { + pub fn new_peer(&self, opaque: C::Opaque) -> PeerHandle { new_peer(self.state.clone(), opaque) } @@ -160,10 +241,7 @@ impl> Device> Device