diff options
author | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-09-02 23:32:07 +0200 |
---|---|---|
committer | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-09-02 23:32:07 +0200 |
commit | f55014ef8ff7308d37be9c92f41b9ccf809c719d (patch) | |
tree | 0a8ea562dd872b1a5a0a2271256bd1b6d6d5d731 /src/router/peer.rs | |
parent | Reconsider inorder queueing (diff) | |
download | wireguard-rs-f55014ef8ff7308d37be9c92f41b9ccf809c719d.tar.xz wireguard-rs-f55014ef8ff7308d37be9c92f41b9ccf809c719d.zip |
Wake workers when submitting work
Diffstat (limited to '')
-rw-r--r-- | src/router/peer.rs | 50 |
1 files changed, 17 insertions, 33 deletions
diff --git a/src/router/peer.rs b/src/router/peer.rs index c1762ad..a85d87a 100644 --- a/src/router/peer.rs +++ b/src/router/peer.rs @@ -7,7 +7,7 @@ use std::thread; use spin::Mutex; -use arraydeque::{ArrayDeque, Wrapping, Saturating}; +use arraydeque::{ArrayDeque, Saturating, Wrapping}; use zerocopy::{AsBytes, LayoutVerified}; use treebitmap::address::Address; @@ -40,19 +40,17 @@ pub struct KeyWheel { pub struct PeerInner<C: Callbacks, T: Tun, B: Bind> { pub stopped: AtomicBool, pub opaque: C::Opaque, - pub outbound: Mutex<ArrayDeque<[JobOutbound; MAX_STAGED_PACKETS], Wrapping>>, - pub inbound: Mutex<ArrayDeque<[JobInbound<C, T, B>; MAX_STAGED_PACKETS], Wrapping>>, + pub outbound: Mutex<ArrayDeque<[JobOutbound; MAX_STAGED_PACKETS], Saturating>>, + pub inbound: Mutex<ArrayDeque<[JobInbound<C, T, B>; MAX_STAGED_PACKETS], Saturating>>, pub device: Arc<DeviceInner<C, T, B>>, - pub thread_outbound: spin::Mutex<Option<thread::JoinHandle<()>>>, - pub thread_inbound: spin::Mutex<Option<thread::JoinHandle<()>>>, - pub queue_outbound: SyncSender<JobOutbound>, - pub queue_inbound: SyncSender<JobInbound<C, T, B>>, - pub staged_packets: spin::Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake - pub rx_bytes: AtomicU64, // received bytes - pub tx_bytes: AtomicU64, // transmitted bytes - pub keys: spin::Mutex<KeyWheel>, // key-wheel - pub ekey: spin::Mutex<Option<EncryptionState>>, // encryption state - pub endpoint: spin::Mutex<Option<Arc<SocketAddr>>>, + pub thread_outbound: Mutex<Option<thread::JoinHandle<()>>>, + pub thread_inbound: Mutex<Option<thread::JoinHandle<()>>>, + pub staged_packets: Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake + pub rx_bytes: AtomicU64, // received bytes + pub tx_bytes: AtomicU64, // transmitted bytes + pub keys: Mutex<KeyWheel>, // key-wheel + pub ekey: Mutex<Option<EncryptionState>>, // encryption state + pub endpoint: Mutex<Option<Arc<SocketAddr>>>, } pub struct Peer<C: Callbacks, T: Tun, B: Bind>(Arc<PeerInner<C, T, B>>); @@ -103,7 +101,6 @@ fn treebit_remove<A: Address, C: Callbacks, T: Tun, B: Bind>( impl<C: Callbacks, T: Tun, B: Bind> Drop for Peer<C, T, B> { fn drop(&mut self) { - println!("drop"); // mark peer as stopped let peer = &self.0; @@ -161,10 +158,6 @@ pub fn new_peer<C: Callbacks, T: Tun, B: Bind>( device: Arc<DeviceInner<C, T, B>>, opaque: C::Opaque, ) -> Peer<C, T, B> { - // allocate in-order queues - let (send_inbound, recv_inbound) = sync_channel(MAX_STAGED_PACKETS); - let (send_outbound, recv_outbound) = sync_channel(MAX_STAGED_PACKETS); - // allocate peer object let peer = { let device = device.clone(); @@ -176,8 +169,6 @@ pub fn new_peer<C: Callbacks, T: Tun, B: Bind>( device: device, ekey: spin::Mutex::new(None), endpoint: spin::Mutex::new(None), - queue_inbound: send_inbound, - queue_outbound: send_outbound, keys: spin::Mutex::new(KeyWheel { next: None, current: None, @@ -192,22 +183,18 @@ pub fn new_peer<C: Callbacks, T: Tun, B: Bind>( }) }; - // spawn inbound thread + // spawn outbound thread *peer.thread_inbound.lock() = { let peer = peer.clone(); let device = device.clone(); - Some(thread::spawn(move || { - worker_outbound(device, peer, recv_outbound) - })) + Some(thread::spawn(move || worker_outbound(device, peer))) }; - // spawn outbound thread + // spawn inbound thread *peer.thread_outbound.lock() = { let peer = peer.clone(); let device = device.clone(); - Some(thread::spawn(move || { - worker_inbound(device, peer, recv_inbound) - })) + Some(thread::spawn(move || worker_inbound(device, peer))) }; Peer(peer) @@ -261,12 +248,9 @@ impl<C: Callbacks, T: Tun, B: Bind> PeerInner<C, T, B> { })); // add job to in-order queue and return to device for inclusion in worker pool - match self.queue_outbound.try_send(job.clone()) { + match self.outbound.lock().push_back(job.clone()) { Ok(_) => Some(job), - Err(e) => { - println!("{:?}", e); - None - } + Err(_) => None, } } } |