aboutsummaryrefslogtreecommitdiffstats
path: root/src/router/peer.rs
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2019-09-02 23:32:07 +0200
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2019-09-02 23:32:07 +0200
commitf55014ef8ff7308d37be9c92f41b9ccf809c719d (patch)
tree0a8ea562dd872b1a5a0a2271256bd1b6d6d5d731 /src/router/peer.rs
parentReconsider inorder queueing (diff)
downloadwireguard-rs-f55014ef8ff7308d37be9c92f41b9ccf809c719d.tar.xz
wireguard-rs-f55014ef8ff7308d37be9c92f41b9ccf809c719d.zip
Wake workers when submitting work
Diffstat (limited to '')
-rw-r--r--src/router/peer.rs50
1 files changed, 17 insertions, 33 deletions
diff --git a/src/router/peer.rs b/src/router/peer.rs
index c1762ad..a85d87a 100644
--- a/src/router/peer.rs
+++ b/src/router/peer.rs
@@ -7,7 +7,7 @@ use std::thread;
use spin::Mutex;
-use arraydeque::{ArrayDeque, Wrapping, Saturating};
+use arraydeque::{ArrayDeque, Saturating, Wrapping};
use zerocopy::{AsBytes, LayoutVerified};
use treebitmap::address::Address;
@@ -40,19 +40,17 @@ pub struct KeyWheel {
pub struct PeerInner<C: Callbacks, T: Tun, B: Bind> {
pub stopped: AtomicBool,
pub opaque: C::Opaque,
- pub outbound: Mutex<ArrayDeque<[JobOutbound; MAX_STAGED_PACKETS], Wrapping>>,
- pub inbound: Mutex<ArrayDeque<[JobInbound<C, T, B>; MAX_STAGED_PACKETS], Wrapping>>,
+ pub outbound: Mutex<ArrayDeque<[JobOutbound; MAX_STAGED_PACKETS], Saturating>>,
+ pub inbound: Mutex<ArrayDeque<[JobInbound<C, T, B>; MAX_STAGED_PACKETS], Saturating>>,
pub device: Arc<DeviceInner<C, T, B>>,
- pub thread_outbound: spin::Mutex<Option<thread::JoinHandle<()>>>,
- pub thread_inbound: spin::Mutex<Option<thread::JoinHandle<()>>>,
- pub queue_outbound: SyncSender<JobOutbound>,
- pub queue_inbound: SyncSender<JobInbound<C, T, B>>,
- pub staged_packets: spin::Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake
- pub rx_bytes: AtomicU64, // received bytes
- pub tx_bytes: AtomicU64, // transmitted bytes
- pub keys: spin::Mutex<KeyWheel>, // key-wheel
- pub ekey: spin::Mutex<Option<EncryptionState>>, // encryption state
- pub endpoint: spin::Mutex<Option<Arc<SocketAddr>>>,
+ pub thread_outbound: Mutex<Option<thread::JoinHandle<()>>>,
+ pub thread_inbound: Mutex<Option<thread::JoinHandle<()>>>,
+ pub staged_packets: Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake
+ pub rx_bytes: AtomicU64, // received bytes
+ pub tx_bytes: AtomicU64, // transmitted bytes
+ pub keys: Mutex<KeyWheel>, // key-wheel
+ pub ekey: Mutex<Option<EncryptionState>>, // encryption state
+ pub endpoint: Mutex<Option<Arc<SocketAddr>>>,
}
pub struct Peer<C: Callbacks, T: Tun, B: Bind>(Arc<PeerInner<C, T, B>>);
@@ -103,7 +101,6 @@ fn treebit_remove<A: Address, C: Callbacks, T: Tun, B: Bind>(
impl<C: Callbacks, T: Tun, B: Bind> Drop for Peer<C, T, B> {
fn drop(&mut self) {
- println!("drop");
// mark peer as stopped
let peer = &self.0;
@@ -161,10 +158,6 @@ pub fn new_peer<C: Callbacks, T: Tun, B: Bind>(
device: Arc<DeviceInner<C, T, B>>,
opaque: C::Opaque,
) -> Peer<C, T, B> {
- // allocate in-order queues
- let (send_inbound, recv_inbound) = sync_channel(MAX_STAGED_PACKETS);
- let (send_outbound, recv_outbound) = sync_channel(MAX_STAGED_PACKETS);
-
// allocate peer object
let peer = {
let device = device.clone();
@@ -176,8 +169,6 @@ pub fn new_peer<C: Callbacks, T: Tun, B: Bind>(
device: device,
ekey: spin::Mutex::new(None),
endpoint: spin::Mutex::new(None),
- queue_inbound: send_inbound,
- queue_outbound: send_outbound,
keys: spin::Mutex::new(KeyWheel {
next: None,
current: None,
@@ -192,22 +183,18 @@ pub fn new_peer<C: Callbacks, T: Tun, B: Bind>(
})
};
- // spawn inbound thread
+ // spawn outbound thread
*peer.thread_inbound.lock() = {
let peer = peer.clone();
let device = device.clone();
- Some(thread::spawn(move || {
- worker_outbound(device, peer, recv_outbound)
- }))
+ Some(thread::spawn(move || worker_outbound(device, peer)))
};
- // spawn outbound thread
+ // spawn inbound thread
*peer.thread_outbound.lock() = {
let peer = peer.clone();
let device = device.clone();
- Some(thread::spawn(move || {
- worker_inbound(device, peer, recv_inbound)
- }))
+ Some(thread::spawn(move || worker_inbound(device, peer)))
};
Peer(peer)
@@ -261,12 +248,9 @@ impl<C: Callbacks, T: Tun, B: Bind> PeerInner<C, T, B> {
}));
// add job to in-order queue and return to device for inclusion in worker pool
- match self.queue_outbound.try_send(job.clone()) {
+ match self.outbound.lock().push_back(job.clone()) {
Ok(_) => Some(job),
- Err(e) => {
- println!("{:?}", e);
- None
- }
+ Err(_) => None,
}
}
}