summaryrefslogtreecommitdiffstats
path: root/src/router/peer.rs
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2019-09-04 19:08:13 +0200
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2019-09-04 19:08:13 +0200
commit6d11da441bde4fa75eef755bef4c97f0d1f6a29b (patch)
tree41575f0851461e0080258af4a11615de5a9d99c3 /src/router/peer.rs
parentWake workers when submitting work (diff)
downloadwireguard-rs-6d11da441bde4fa75eef755bef4c97f0d1f6a29b.tar.xz
wireguard-rs-6d11da441bde4fa75eef755bef4c97f0d1f6a29b.zip
Simply passing of JobBuffer ownership
Diffstat (limited to 'src/router/peer.rs')
-rw-r--r--src/router/peer.rs133
1 files changed, 62 insertions, 71 deletions
diff --git a/src/router/peer.rs b/src/router/peer.rs
index a85d87a..a31dfcf 100644
--- a/src/router/peer.rs
+++ b/src/router/peer.rs
@@ -22,9 +22,12 @@ use super::device::DeviceInner;
use super::device::EncryptionState;
use super::messages::TransportHeader;
+use futures::sync::oneshot;
+use futures::*;
+
+use super::workers::Operation;
use super::workers::{worker_inbound, worker_outbound};
-use super::workers::{JobBuffer, JobInbound, JobInner, JobOutbound};
-use super::workers::{Operation, Status};
+use super::workers::{JobBuffer, JobInbound, JobOutbound, JobParallel};
use super::types::Callbacks;
@@ -40,11 +43,9 @@ pub struct KeyWheel {
pub struct PeerInner<C: Callbacks, T: Tun, B: Bind> {
pub stopped: AtomicBool,
pub opaque: C::Opaque,
- pub outbound: Mutex<ArrayDeque<[JobOutbound; MAX_STAGED_PACKETS], Saturating>>,
- pub inbound: Mutex<ArrayDeque<[JobInbound<C, T, B>; MAX_STAGED_PACKETS], Saturating>>,
+ pub outbound: Mutex<SyncSender<JobOutbound>>,
+ pub inbound: Mutex<SyncSender<JobInbound<C, T, B>>>,
pub device: Arc<DeviceInner<C, T, B>>,
- pub thread_outbound: Mutex<Option<thread::JoinHandle<()>>>,
- pub thread_inbound: Mutex<Option<thread::JoinHandle<()>>>,
pub staged_packets: Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake
pub rx_bytes: AtomicU64, // received bytes
pub tx_bytes: AtomicU64, // transmitted bytes
@@ -53,7 +54,11 @@ pub struct PeerInner<C: Callbacks, T: Tun, B: Bind> {
pub endpoint: Mutex<Option<Arc<SocketAddr>>>,
}
-pub struct Peer<C: Callbacks, T: Tun, B: Bind>(Arc<PeerInner<C, T, B>>);
+pub struct Peer<C: Callbacks, T: Tun, B: Bind> {
+ state: Arc<PeerInner<C, T, B>>,
+ thread_outbound: thread::JoinHandle<()>,
+ thread_inbound: thread::JoinHandle<()>,
+}
fn treebit_list<A, E, C: Callbacks, T: Tun, B: Bind>(
peer: &Arc<PeerInner<C, T, B>>,
@@ -86,15 +91,15 @@ fn treebit_remove<A: Address, C: Callbacks, T: Tun, B: Bind>(
for subnet in m.iter() {
let (ip, masklen, p) = subnet;
if let Some(p) = p.upgrade() {
- if Arc::ptr_eq(&p, &peer.0) {
+ if Arc::ptr_eq(&p, &peer.state) {
subnets.push((ip, masklen))
}
}
}
// remove all key mappings
- for subnet in subnets {
- let r = m.remove(subnet.0, subnet.1);
+ for (ip, masklen) in subnets {
+ let r = m.remove(ip, masklen);
debug_assert!(r.is_some());
}
}
@@ -103,7 +108,7 @@ impl<C: Callbacks, T: Tun, B: Bind> Drop for Peer<C, T, B> {
fn drop(&mut self) {
// mark peer as stopped
- let peer = &self.0;
+ let peer = &self.state;
peer.stopped.store(true, Ordering::SeqCst);
// remove from cryptkey router
@@ -111,22 +116,6 @@ impl<C: Callbacks, T: Tun, B: Bind> Drop for Peer<C, T, B> {
treebit_remove(self, &peer.device.ipv4);
treebit_remove(self, &peer.device.ipv6);
- // unpark threads
-
- peer.thread_inbound
- .lock()
- .as_ref()
- .unwrap()
- .thread()
- .unpark();
-
- peer.thread_outbound
- .lock()
- .as_ref()
- .unwrap()
- .thread()
- .unpark();
-
// release ids from the receiver map
let mut keys = peer.keys.lock();
@@ -158,13 +147,16 @@ pub fn new_peer<C: Callbacks, T: Tun, B: Bind>(
device: Arc<DeviceInner<C, T, B>>,
opaque: C::Opaque,
) -> Peer<C, T, B> {
+ let (out_tx, out_rx) = sync_channel(128);
+ let (in_tx, in_rx) = sync_channel(128);
+
// allocate peer object
let peer = {
let device = device.clone();
Arc::new(PeerInner {
opaque,
- inbound: Mutex::new(ArrayDeque::new()),
- outbound: Mutex::new(ArrayDeque::new()),
+ inbound: Mutex::new(in_tx),
+ outbound: Mutex::new(out_tx),
stopped: AtomicBool::new(false),
device: device,
ekey: spin::Mutex::new(None),
@@ -178,26 +170,28 @@ pub fn new_peer<C: Callbacks, T: Tun, B: Bind>(
rx_bytes: AtomicU64::new(0),
tx_bytes: AtomicU64::new(0),
staged_packets: spin::Mutex::new(ArrayDeque::new()),
- thread_inbound: spin::Mutex::new(None),
- thread_outbound: spin::Mutex::new(None),
})
};
// spawn outbound thread
- *peer.thread_inbound.lock() = {
+ let thread_inbound = {
let peer = peer.clone();
let device = device.clone();
- Some(thread::spawn(move || worker_outbound(device, peer)))
+ thread::spawn(move || worker_outbound(device, peer, out_rx))
};
// spawn inbound thread
- *peer.thread_outbound.lock() = {
+ let thread_outbound = {
let peer = peer.clone();
let device = device.clone();
- Some(thread::spawn(move || worker_inbound(device, peer)))
+ thread::spawn(move || worker_inbound(device, peer, in_rx))
};
- Peer(peer)
+ Peer {
+ state: peer,
+ thread_inbound,
+ thread_outbound,
+ }
}
impl<C: Callbacks, T: Tun, B: Bind> PeerInner<C, T, B> {
@@ -209,7 +203,7 @@ impl<C: Callbacks, T: Tun, B: Bind> PeerInner<C, T, B> {
// rotate key-wheel
}
- pub fn send_job(&self, mut msg: Vec<u8>) -> Option<JobBuffer> {
+ pub fn send_job(&self, mut msg: Vec<u8>) -> Option<JobParallel> {
debug_assert!(msg.len() >= mem::size_of::<TransportHeader>());
// parse / cast
@@ -239,29 +233,26 @@ impl<C: Callbacks, T: Tun, B: Bind> PeerInner<C, T, B> {
}
};
- // create job
- let job = Arc::new(spin::Mutex::new(JobInner {
- msg,
- key,
- status: Status::Waiting,
- op: Operation::Encryption,
- }));
-
- // add job to in-order queue and return to device for inclusion in worker pool
- match self.outbound.lock().push_back(job.clone()) {
- Ok(_) => Some(job),
+ // add job to in-order queue and return sendeer to device for inclusion in worker pool
+ let (tx, rx) = oneshot();
+ match self.outbound.lock().try_send(rx) {
+ Ok(_) => Some((
+ tx,
+ JobBuffer {
+ msg,
+ key,
+ okay: false,
+ op: Operation::Encryption,
+ },
+ )),
Err(_) => None,
}
}
}
impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> {
- fn new(inner: PeerInner<C, T, B>) -> Peer<C, T, B> {
- Peer(Arc::new(inner))
- }
-
pub fn set_endpoint(&self, endpoint: SocketAddr) {
- *self.0.endpoint.lock() = Some(Arc::new(endpoint))
+ *self.state.endpoint.lock() = Some(Arc::new(endpoint))
}
/// Add a new keypair
@@ -275,7 +266,7 @@ impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> {
/// A vector of ids which has been released.
/// These should be released in the handshake module.
pub fn add_keypair(&self, new: KeyPair) -> Vec<u32> {
- let mut keys = self.0.keys.lock();
+ let mut keys = self.state.keys.lock();
let mut release = Vec::with_capacity(2);
let new = Arc::new(new);
@@ -286,7 +277,7 @@ impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> {
// update key-wheel
if new.initiator {
// start using key for encryption
- *self.0.ekey.lock() = Some(EncryptionState {
+ *self.state.ekey.lock() = Some(EncryptionState {
id: new.send.id,
key: new.send.key,
nonce: 0,
@@ -294,17 +285,17 @@ impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> {
});
// move current into previous
- keys.previous = keys.current.as_ref().map(|v| v.clone());;
+ keys.previous = keys.current.as_ref().map(|v| v.clone());
keys.current = Some(new.clone());
} else {
// store the key and await confirmation
- keys.previous = keys.next.as_ref().map(|v| v.clone());;
+ keys.previous = keys.next.as_ref().map(|v| v.clone());
keys.next = Some(new.clone());
};
// update incoming packet id map
{
- let mut recv = self.0.device.recv.write();
+ let mut recv = self.state.device.recv.write();
// purge recv map of released ids
for id in &release {
@@ -321,7 +312,7 @@ impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> {
keypair: Arc::downgrade(&new),
key: new.recv.key,
protector: spin::Mutex::new(AntiReplay::new()),
- peer: Arc::downgrade(&self.0),
+ peer: Arc::downgrade(&self.state),
death: new.birth + REJECT_AFTER_TIME,
},
);
@@ -332,28 +323,28 @@ impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> {
}
pub fn rx_bytes(&self) -> u64 {
- self.0.rx_bytes.load(Ordering::Relaxed)
+ self.state.rx_bytes.load(Ordering::Relaxed)
}
pub fn tx_bytes(&self) -> u64 {
- self.0.tx_bytes.load(Ordering::Relaxed)
+ self.state.tx_bytes.load(Ordering::Relaxed)
}
pub fn add_subnet(&self, ip: IpAddr, masklen: u32) {
match ip {
IpAddr::V4(v4) => {
- self.0
+ self.state
.device
.ipv4
.write()
- .insert(v4, masklen, Arc::downgrade(&self.0))
+ .insert(v4, masklen, Arc::downgrade(&self.state))
}
IpAddr::V6(v6) => {
- self.0
+ self.state
.device
.ipv6
.write()
- .insert(v6, masklen, Arc::downgrade(&self.0))
+ .insert(v6, masklen, Arc::downgrade(&self.state))
}
};
}
@@ -361,21 +352,21 @@ impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> {
pub fn list_subnets(&self) -> Vec<(IpAddr, u32)> {
let mut res = Vec::new();
res.append(&mut treebit_list(
- &self.0,
- &self.0.device.ipv4,
+ &self.state,
+ &self.state.device.ipv4,
Box::new(|ip, masklen| (IpAddr::V4(ip), masklen)),
));
res.append(&mut treebit_list(
- &self.0,
- &self.0.device.ipv6,
+ &self.state,
+ &self.state.device.ipv6,
Box::new(|ip, masklen| (IpAddr::V6(ip), masklen)),
));
res
}
pub fn remove_subnets(&self) {
- treebit_remove(self, &self.0.device.ipv4);
- treebit_remove(self, &self.0.device.ipv6);
+ treebit_remove(self, &self.state.device.ipv4);
+ treebit_remove(self, &self.state.device.ipv6);
}
fn send(&self, msg: Vec<u8>) {}