diff options
author | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-08-20 21:19:53 +0200 |
---|---|---|
committer | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-08-20 21:19:53 +0200 |
commit | 9cef264581ec8cf859113234b29ad5b58577ed8c (patch) | |
tree | 55320a3a619805b643e46d41068ceefe1628b0df /src/router/peer.rs | |
parent | Removed platform mod (diff) | |
download | wireguard-rs-9cef264581ec8cf859113234b29ad5b58577ed8c.tar.xz wireguard-rs-9cef264581ec8cf859113234b29ad5b58577ed8c.zip |
Ensure peer threads are stopped on drop
Diffstat (limited to 'src/router/peer.rs')
-rw-r--r-- | src/router/peer.rs | 130 |
1 files changed, 81 insertions, 49 deletions
diff --git a/src/router/peer.rs b/src/router/peer.rs index f7b8bf4..1edb635 100644 --- a/src/router/peer.rs +++ b/src/router/peer.rs @@ -1,29 +1,29 @@ -use std::sync::atomic::{AtomicU64, AtomicBool, Ordering}; -use std::sync::{Weak, Arc}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::{Arc, Weak}; use std::thread; - +use std::mem; use std::net::{IpAddr, SocketAddr}; - use std::sync::mpsc::{sync_channel, SyncSender}; use spin; use arraydeque::{ArrayDeque, Wrapping}; -use treebitmap::IpLookupTable; use treebitmap::address::Address; +use treebitmap::IpLookupTable; -use super::super::types::KeyPair; use super::super::constants::*; +use super::super::types::KeyPair; use super::anti_replay::AntiReplay; +use super::device::DecryptionState; use super::device::DeviceInner; use super::device::EncryptionState; -use super::device::DecryptionState; +use super::workers::{worker_inbound, worker_outbound, JobInbound, JobOutbound}; const MAX_STAGED_PACKETS: usize = 128; -struct KeyWheel { +pub struct KeyWheel { next: Option<Arc<KeyPair>>, // next key state (unconfirmed) current: Option<Arc<KeyPair>>, // current key state (used for encryption) previous: Option<Arc<KeyPair>>, // old key state (used for decryption) @@ -31,18 +31,18 @@ struct KeyWheel { } pub struct PeerInner { - stopped: AtomicBool, - device: Arc<DeviceInner>, - thread_outbound: spin::Mutex<thread::JoinHandle<()>>, - thread_inbound: spin::Mutex<thread::JoinHandle<()>>, - inorder_outbound: SyncSender<()>, - inorder_inbound: SyncSender<()>, - staged_packets: spin::Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake - rx_bytes: AtomicU64, // received bytes - tx_bytes: AtomicU64, // transmitted bytes - keys: spin::Mutex<KeyWheel>, // key-wheel - ekey: spin::Mutex<Option<EncryptionState>>, // encryption state - endpoint: spin::Mutex<Option<Arc<SocketAddr>>>, + pub stopped: AtomicBool, + pub device: Arc<DeviceInner>, + pub thread_outbound: spin::Mutex<Option<thread::JoinHandle<()>>>, + pub thread_inbound: spin::Mutex<Option<thread::JoinHandle<()>>>, + pub queue_outbound: SyncSender<JobOutbound>, + pub queue_inbound: SyncSender<JobInbound>, + pub staged_packets: spin::Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake + pub rx_bytes: AtomicU64, // received bytes + pub tx_bytes: AtomicU64, // transmitted bytes + pub keys: spin::Mutex<KeyWheel>, // key-wheel + pub ekey: spin::Mutex<Option<EncryptionState>>, // encryption state + pub endpoint: spin::Mutex<Option<Arc<SocketAddr>>>, } pub struct Peer(Arc<PeerInner>); @@ -93,6 +93,7 @@ where impl Drop for Peer { fn drop(&mut self) { + // mark peer as stopped let peer = &self.0; @@ -105,8 +106,19 @@ impl Drop for Peer { // unpark threads - peer.thread_inbound.lock().thread().unpark(); - peer.thread_outbound.lock().thread().unpark(); + peer.thread_inbound + .lock() + .as_ref() + .unwrap() + .thread() + .unpark(); + + peer.thread_outbound + .lock() + .as_ref() + .unwrap() + .thread() + .unpark(); // release ids from the receiver map @@ -132,42 +144,62 @@ impl Drop for Peer { *peer.ekey.lock() = None; *peer.endpoint.lock() = None; + } } pub fn new_peer(device: Arc<DeviceInner>) -> Peer { + // allocate in-order queues + let (send_inbound, recv_inbound) = sync_channel(MAX_STAGED_PACKETS); + let (send_outbound, recv_outbound) = sync_channel(MAX_STAGED_PACKETS); + + // allocate peer object + let peer = { + let device = device.clone(); + Arc::new(PeerInner { + stopped: AtomicBool::new(false), + device: device, + ekey: spin::Mutex::new(None), + endpoint: spin::Mutex::new(None), + queue_inbound: send_inbound, + queue_outbound: send_outbound, + keys: spin::Mutex::new(KeyWheel { + next: None, + current: None, + previous: None, + retired: None, + }), + rx_bytes: AtomicU64::new(0), + tx_bytes: AtomicU64::new(0), + staged_packets: spin::Mutex::new(ArrayDeque::new()), + thread_inbound: spin::Mutex::new(None), + thread_outbound: spin::Mutex::new(None), + }) + }; + // spawn inbound thread - let (send_inbound, recv_inbound) = sync_channel(1); - let handle_inbound = thread::spawn(move || {}); + *peer.thread_inbound.lock() = { + let peer = peer.clone(); + let device = device.clone(); + Some(thread::spawn(move || { + worker_outbound(device, peer, recv_outbound) + })) + }; // spawn outbound thread - let (send_outbound, recv_inbound) = sync_channel(1); - let handle_outbound = thread::spawn(move || {}); - - // allocate peer object - Peer::new(PeerInner { - stopped: AtomicBool::new(false), - device: device, - ekey: spin::Mutex::new(None), - endpoint: spin::Mutex::new(None), - inorder_inbound: send_inbound, - inorder_outbound: send_outbound, - keys: spin::Mutex::new(KeyWheel { - next: None, - current: None, - previous: None, - retired: None, - }), - rx_bytes: AtomicU64::new(0), - tx_bytes: AtomicU64::new(0), - staged_packets: spin::Mutex::new(ArrayDeque::new()), - thread_inbound: spin::Mutex::new(handle_inbound), - thread_outbound: spin::Mutex::new(handle_outbound), - }) + *peer.thread_outbound.lock() = { + let peer = peer.clone(); + let device = device.clone(); + Some(thread::spawn(move || { + worker_inbound(device, peer, recv_inbound) + })) + }; + + Peer(peer) } impl Peer { - fn new(inner : PeerInner) -> Peer { + fn new(inner: PeerInner) -> Peer { Peer(Arc::new(inner)) } @@ -282,4 +314,4 @@ impl Peer { )); res } -}
\ No newline at end of file +} |