diff options
author | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-09-04 19:08:13 +0200 |
---|---|---|
committer | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-09-04 19:08:13 +0200 |
commit | 6d11da441bde4fa75eef755bef4c97f0d1f6a29b (patch) | |
tree | 41575f0851461e0080258af4a11615de5a9d99c3 /src/router/peer.rs | |
parent | Wake workers when submitting work (diff) | |
download | wireguard-rs-6d11da441bde4fa75eef755bef4c97f0d1f6a29b.tar.xz wireguard-rs-6d11da441bde4fa75eef755bef4c97f0d1f6a29b.zip |
Simply passing of JobBuffer ownership
Diffstat (limited to 'src/router/peer.rs')
-rw-r--r-- | src/router/peer.rs | 133 |
1 files changed, 62 insertions, 71 deletions
diff --git a/src/router/peer.rs b/src/router/peer.rs index a85d87a..a31dfcf 100644 --- a/src/router/peer.rs +++ b/src/router/peer.rs @@ -22,9 +22,12 @@ use super::device::DeviceInner; use super::device::EncryptionState; use super::messages::TransportHeader; +use futures::sync::oneshot; +use futures::*; + +use super::workers::Operation; use super::workers::{worker_inbound, worker_outbound}; -use super::workers::{JobBuffer, JobInbound, JobInner, JobOutbound}; -use super::workers::{Operation, Status}; +use super::workers::{JobBuffer, JobInbound, JobOutbound, JobParallel}; use super::types::Callbacks; @@ -40,11 +43,9 @@ pub struct KeyWheel { pub struct PeerInner<C: Callbacks, T: Tun, B: Bind> { pub stopped: AtomicBool, pub opaque: C::Opaque, - pub outbound: Mutex<ArrayDeque<[JobOutbound; MAX_STAGED_PACKETS], Saturating>>, - pub inbound: Mutex<ArrayDeque<[JobInbound<C, T, B>; MAX_STAGED_PACKETS], Saturating>>, + pub outbound: Mutex<SyncSender<JobOutbound>>, + pub inbound: Mutex<SyncSender<JobInbound<C, T, B>>>, pub device: Arc<DeviceInner<C, T, B>>, - pub thread_outbound: Mutex<Option<thread::JoinHandle<()>>>, - pub thread_inbound: Mutex<Option<thread::JoinHandle<()>>>, pub staged_packets: Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake pub rx_bytes: AtomicU64, // received bytes pub tx_bytes: AtomicU64, // transmitted bytes @@ -53,7 +54,11 @@ pub struct PeerInner<C: Callbacks, T: Tun, B: Bind> { pub endpoint: Mutex<Option<Arc<SocketAddr>>>, } -pub struct Peer<C: Callbacks, T: Tun, B: Bind>(Arc<PeerInner<C, T, B>>); +pub struct Peer<C: Callbacks, T: Tun, B: Bind> { + state: Arc<PeerInner<C, T, B>>, + thread_outbound: thread::JoinHandle<()>, + thread_inbound: thread::JoinHandle<()>, +} fn treebit_list<A, E, C: Callbacks, T: Tun, B: Bind>( peer: &Arc<PeerInner<C, T, B>>, @@ -86,15 +91,15 @@ fn treebit_remove<A: Address, C: Callbacks, T: Tun, B: Bind>( for subnet in m.iter() { let (ip, masklen, p) = subnet; if let Some(p) = p.upgrade() { - if Arc::ptr_eq(&p, &peer.0) { + if Arc::ptr_eq(&p, &peer.state) { subnets.push((ip, masklen)) } } } // remove all key mappings - for subnet in subnets { - let r = m.remove(subnet.0, subnet.1); + for (ip, masklen) in subnets { + let r = m.remove(ip, masklen); debug_assert!(r.is_some()); } } @@ -103,7 +108,7 @@ impl<C: Callbacks, T: Tun, B: Bind> Drop for Peer<C, T, B> { fn drop(&mut self) { // mark peer as stopped - let peer = &self.0; + let peer = &self.state; peer.stopped.store(true, Ordering::SeqCst); // remove from cryptkey router @@ -111,22 +116,6 @@ impl<C: Callbacks, T: Tun, B: Bind> Drop for Peer<C, T, B> { treebit_remove(self, &peer.device.ipv4); treebit_remove(self, &peer.device.ipv6); - // unpark threads - - peer.thread_inbound - .lock() - .as_ref() - .unwrap() - .thread() - .unpark(); - - peer.thread_outbound - .lock() - .as_ref() - .unwrap() - .thread() - .unpark(); - // release ids from the receiver map let mut keys = peer.keys.lock(); @@ -158,13 +147,16 @@ pub fn new_peer<C: Callbacks, T: Tun, B: Bind>( device: Arc<DeviceInner<C, T, B>>, opaque: C::Opaque, ) -> Peer<C, T, B> { + let (out_tx, out_rx) = sync_channel(128); + let (in_tx, in_rx) = sync_channel(128); + // allocate peer object let peer = { let device = device.clone(); Arc::new(PeerInner { opaque, - inbound: Mutex::new(ArrayDeque::new()), - outbound: Mutex::new(ArrayDeque::new()), + inbound: Mutex::new(in_tx), + outbound: Mutex::new(out_tx), stopped: AtomicBool::new(false), device: device, ekey: spin::Mutex::new(None), @@ -178,26 +170,28 @@ pub fn new_peer<C: Callbacks, T: Tun, B: Bind>( rx_bytes: AtomicU64::new(0), tx_bytes: AtomicU64::new(0), staged_packets: spin::Mutex::new(ArrayDeque::new()), - thread_inbound: spin::Mutex::new(None), - thread_outbound: spin::Mutex::new(None), }) }; // spawn outbound thread - *peer.thread_inbound.lock() = { + let thread_inbound = { let peer = peer.clone(); let device = device.clone(); - Some(thread::spawn(move || worker_outbound(device, peer))) + thread::spawn(move || worker_outbound(device, peer, out_rx)) }; // spawn inbound thread - *peer.thread_outbound.lock() = { + let thread_outbound = { let peer = peer.clone(); let device = device.clone(); - Some(thread::spawn(move || worker_inbound(device, peer))) + thread::spawn(move || worker_inbound(device, peer, in_rx)) }; - Peer(peer) + Peer { + state: peer, + thread_inbound, + thread_outbound, + } } impl<C: Callbacks, T: Tun, B: Bind> PeerInner<C, T, B> { @@ -209,7 +203,7 @@ impl<C: Callbacks, T: Tun, B: Bind> PeerInner<C, T, B> { // rotate key-wheel } - pub fn send_job(&self, mut msg: Vec<u8>) -> Option<JobBuffer> { + pub fn send_job(&self, mut msg: Vec<u8>) -> Option<JobParallel> { debug_assert!(msg.len() >= mem::size_of::<TransportHeader>()); // parse / cast @@ -239,29 +233,26 @@ impl<C: Callbacks, T: Tun, B: Bind> PeerInner<C, T, B> { } }; - // create job - let job = Arc::new(spin::Mutex::new(JobInner { - msg, - key, - status: Status::Waiting, - op: Operation::Encryption, - })); - - // add job to in-order queue and return to device for inclusion in worker pool - match self.outbound.lock().push_back(job.clone()) { - Ok(_) => Some(job), + // add job to in-order queue and return sendeer to device for inclusion in worker pool + let (tx, rx) = oneshot(); + match self.outbound.lock().try_send(rx) { + Ok(_) => Some(( + tx, + JobBuffer { + msg, + key, + okay: false, + op: Operation::Encryption, + }, + )), Err(_) => None, } } } impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> { - fn new(inner: PeerInner<C, T, B>) -> Peer<C, T, B> { - Peer(Arc::new(inner)) - } - pub fn set_endpoint(&self, endpoint: SocketAddr) { - *self.0.endpoint.lock() = Some(Arc::new(endpoint)) + *self.state.endpoint.lock() = Some(Arc::new(endpoint)) } /// Add a new keypair @@ -275,7 +266,7 @@ impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> { /// A vector of ids which has been released. /// These should be released in the handshake module. pub fn add_keypair(&self, new: KeyPair) -> Vec<u32> { - let mut keys = self.0.keys.lock(); + let mut keys = self.state.keys.lock(); let mut release = Vec::with_capacity(2); let new = Arc::new(new); @@ -286,7 +277,7 @@ impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> { // update key-wheel if new.initiator { // start using key for encryption - *self.0.ekey.lock() = Some(EncryptionState { + *self.state.ekey.lock() = Some(EncryptionState { id: new.send.id, key: new.send.key, nonce: 0, @@ -294,17 +285,17 @@ impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> { }); // move current into previous - keys.previous = keys.current.as_ref().map(|v| v.clone());; + keys.previous = keys.current.as_ref().map(|v| v.clone()); keys.current = Some(new.clone()); } else { // store the key and await confirmation - keys.previous = keys.next.as_ref().map(|v| v.clone());; + keys.previous = keys.next.as_ref().map(|v| v.clone()); keys.next = Some(new.clone()); }; // update incoming packet id map { - let mut recv = self.0.device.recv.write(); + let mut recv = self.state.device.recv.write(); // purge recv map of released ids for id in &release { @@ -321,7 +312,7 @@ impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> { keypair: Arc::downgrade(&new), key: new.recv.key, protector: spin::Mutex::new(AntiReplay::new()), - peer: Arc::downgrade(&self.0), + peer: Arc::downgrade(&self.state), death: new.birth + REJECT_AFTER_TIME, }, ); @@ -332,28 +323,28 @@ impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> { } pub fn rx_bytes(&self) -> u64 { - self.0.rx_bytes.load(Ordering::Relaxed) + self.state.rx_bytes.load(Ordering::Relaxed) } pub fn tx_bytes(&self) -> u64 { - self.0.tx_bytes.load(Ordering::Relaxed) + self.state.tx_bytes.load(Ordering::Relaxed) } pub fn add_subnet(&self, ip: IpAddr, masklen: u32) { match ip { IpAddr::V4(v4) => { - self.0 + self.state .device .ipv4 .write() - .insert(v4, masklen, Arc::downgrade(&self.0)) + .insert(v4, masklen, Arc::downgrade(&self.state)) } IpAddr::V6(v6) => { - self.0 + self.state .device .ipv6 .write() - .insert(v6, masklen, Arc::downgrade(&self.0)) + .insert(v6, masklen, Arc::downgrade(&self.state)) } }; } @@ -361,21 +352,21 @@ impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> { pub fn list_subnets(&self) -> Vec<(IpAddr, u32)> { let mut res = Vec::new(); res.append(&mut treebit_list( - &self.0, - &self.0.device.ipv4, + &self.state, + &self.state.device.ipv4, Box::new(|ip, masklen| (IpAddr::V4(ip), masklen)), )); res.append(&mut treebit_list( - &self.0, - &self.0.device.ipv6, + &self.state, + &self.state.device.ipv6, Box::new(|ip, masklen| (IpAddr::V6(ip), masklen)), )); res } pub fn remove_subnets(&self) { - treebit_remove(self, &self.0.device.ipv4); - treebit_remove(self, &self.0.device.ipv6); + treebit_remove(self, &self.state.device.ipv4); + treebit_remove(self, &self.state.device.ipv6); } fn send(&self, msg: Vec<u8>) {} |