summaryrefslogtreecommitdiffstats
path: root/src/router
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2019-08-16 22:00:48 +0200
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2019-08-16 22:00:48 +0200
commit5aeea9b619cbdc3776a23aa5a1c6547d42c7f427 (patch)
tree1af087bf6e9b3641e29069a57aa2d294acf46338 /src/router
parentLayout work on router (diff)
downloadwireguard-rs-5aeea9b619cbdc3776a23aa5a1c6547d42c7f427.tar.xz
wireguard-rs-5aeea9b619cbdc3776a23aa5a1c6547d42c7f427.zip
Begin drafting cross-platform interface
Diffstat (limited to 'src/router')
-rw-r--r--src/router/device.rs89
-rw-r--r--src/router/mod.rs2
2 files changed, 50 insertions, 41 deletions
diff --git a/src/router/device.rs b/src/router/device.rs
index 3b29312..5dfd22c 100644
--- a/src/router/device.rs
+++ b/src/router/device.rs
@@ -1,11 +1,11 @@
-use arraydeque::{ArrayDeque, Saturating, Wrapping};
+use arraydeque::{ArrayDeque, Wrapping};
use treebitmap::IpLookupTable;
+use crossbeam_deque::{Injector, Steal};
use std::collections::HashMap;
-use std::error::Error;
+use std::mem;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
-use std::ptr;
-use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU64, Ordering};
+use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::sync::{Arc, Mutex, Weak};
use std::thread;
@@ -21,12 +21,29 @@ use std::u64;
const REJECT_AFTER_MESSAGES: u64 = u64::MAX - (1 << 4);
const MAX_STAGED_PACKETS: usize = 128;
-pub struct Device {
+struct DeviceInner {
+ stopped: AtomicBool,
+ injector: Injector<()>, // parallel enc/dec task injector
+ threads: Vec<thread::JoinHandle<()>>,
recv: spin::RwLock<HashMap<u32, DecryptionState>>,
ipv4: IpLookupTable<Ipv4Addr, Weak<PeerInner>>,
ipv6: IpLookupTable<Ipv6Addr, Weak<PeerInner>>,
}
+struct PeerInner {
+ stopped: AtomicBool,
+ thread_outbound: spin::Mutex<thread::JoinHandle<()>>,
+ thread_inbound: spin::Mutex<thread::JoinHandle<()>>,
+ inorder_outbound: SyncSender<()>,
+ inorder_inbound: SyncSender<()>,
+ staged_packets: Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake
+ rx_bytes: AtomicU64, // received bytes
+ tx_bytes: AtomicU64, // transmitted bytes
+ keys: spin::Mutex<KeyWheel>, // key-wheel
+ ekey: spin::Mutex<EncryptionState>, // encryption state
+ endpoint: spin::Mutex<Option<Arc<SocketAddr>>>,
+}
+
struct EncryptionState {
key: [u8; 32], // encryption key
id: u32, // sender id
@@ -48,33 +65,34 @@ struct KeyWheel {
previous: Option<KeyPair>, // old key state (used for decryption)
}
-struct PeerInner {
- inorder_outbound: SyncSender<()>,
- inorder_inbound: SyncSender<()>,
- staged_packets: Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake
- rx_bytes: AtomicU64, // received bytes
- tx_bytes: AtomicU64, // transmitted bytes
- keys: spin::Mutex<KeyWheel>, // key-wheel
- ekey: spin::Mutex<EncryptionState>, // encryption state
- endpoint: spin::Mutex<Option<Arc<SocketAddr>>>,
-}
-
pub struct Peer(Arc<PeerInner>);
+pub struct Device(DeviceInner);
impl Drop for Peer {
fn drop(&mut self) {
- // stop threads and remove peer from device
+ // mark peer as stopped
+ let inner = &self.0;
+ inner.stopped.store(true, Ordering::SeqCst);
+
+ // unpark threads to stop
+ inner.thread_inbound.lock().thread().unpark();
+ inner.thread_outbound.lock().thread().unpark();
}
}
impl Drop for Device {
fn drop(&mut self) {
- // stop threads
+ // mark device as stopped
+ let inner = &self.0;
+ inner.stopped.store(true, Ordering::SeqCst);
+
+ // eat all parallel jobs
+ while inner.injector.steal() != Steal::Empty {}
}
}
impl Peer {
- fn set_endpoint(&self, endpoint: SocketAddr) {
+ pub fn set_endpoint(&self, endpoint: SocketAddr) {
*self.0.endpoint.lock() = Some(Arc::new(endpoint))
}
@@ -87,7 +105,7 @@ impl Peer {
};
}
- pub fn keypair_add(&self, new: KeyPair) -> Option<u32> {
+ fn keypair_add(&self, new: KeyPair) -> Option<u32> {
let mut keys = self.0.keys.lock();
let release = keys.previous.map(|k| k.recv.id);
@@ -124,30 +142,21 @@ impl Peer {
}
impl Device {
- pub fn new() -> Device {
- Device {
+ pub fn new(workers: usize) -> Device {
+ Device(DeviceInner {
+ threads: vec![],
+ stopped: AtomicBool::new(false),
+ injector: Injector::new(),
recv: spin::RwLock::new(HashMap::new()),
ipv4: IpLookupTable::new(),
ipv6: IpLookupTable::new(),
- }
- }
-
- pub fn release(&self, id: u32) {
- debug_assert!(
- if let Some(_) = self.recv.read().get(&id) {
- true
- } else {
- false
- },
- true
- );
- self.recv.write().remove(&id);
+ })
}
pub fn add_subnet(&mut self, ip: IpAddr, masklen: u32, peer: Peer) {
match ip {
- IpAddr::V4(v4) => self.ipv4.insert(v4, masklen, Arc::downgrade(&peer.0)),
- IpAddr::V6(v6) => self.ipv6.insert(v6, masklen, Arc::downgrade(&peer.0)),
+ IpAddr::V4(v4) => self.0.ipv4.insert(v4, masklen, Arc::downgrade(&peer.0)),
+ IpAddr::V6(v6) => self.0.ipv6.insert(v6, masklen, Arc::downgrade(&peer.0)),
};
}
@@ -155,7 +164,7 @@ impl Device {
let mut subnets = Vec::new();
// extract ipv4 entries
- for subnet in self.ipv4.iter() {
+ for subnet in self.0.ipv4.iter() {
let (ip, masklen, p) = subnet;
if let Some(p) = p.upgrade() {
if Arc::ptr_eq(&p, &peer.0) {
@@ -165,7 +174,7 @@ impl Device {
}
// extract ipv6 entries
- for subnet in self.ipv6.iter() {
+ for subnet in self.0.ipv6.iter() {
let (ip, masklen, p) = subnet;
if let Some(p) = p.upgrade() {
if Arc::ptr_eq(&p, &peer.0) {
@@ -182,7 +191,7 @@ impl Device {
let release = peer.keypair_add(new);
// update incoming packet id map
- let mut recv = self.recv.write();
+ let mut recv = self.0.recv.write();
// release id of previous keypair
if let Some(id) = release {
diff --git a/src/router/mod.rs b/src/router/mod.rs
index b53b10c..7055875 100644
--- a/src/router/mod.rs
+++ b/src/router/mod.rs
@@ -4,4 +4,4 @@ mod device;
// mod inbound;
// mod outbound;
-pub use device::Device; \ No newline at end of file
+pub use device::{Device, Peer}; \ No newline at end of file