aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2019-08-16 22:00:48 +0200
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2019-08-16 22:00:48 +0200
commit5aeea9b619cbdc3776a23aa5a1c6547d42c7f427 (patch)
tree1af087bf6e9b3641e29069a57aa2d294acf46338
parentLayout work on router (diff)
downloadwireguard-rs-5aeea9b619cbdc3776a23aa5a1c6547d42c7f427.tar.xz
wireguard-rs-5aeea9b619cbdc3776a23aa5a1c6547d42c7f427.zip
Begin drafting cross-platform interface
-rw-r--r--Cargo.lock1
-rw-r--r--Cargo.toml1
-rw-r--r--src/main.rs3
-rw-r--r--src/platform/mod.rs2
-rw-r--r--src/platform/tun.rs10
-rw-r--r--src/platform/udp.rs11
-rw-r--r--src/router/device.rs89
-rw-r--r--src/router/mod.rs2
8 files changed, 77 insertions, 42 deletions
diff --git a/Cargo.lock b/Cargo.lock
index d25d5fe..9aad567 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1026,6 +1026,7 @@ dependencies = [
"arraydeque 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
"blake2 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "crossbeam-deque 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"digest 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)",
"generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)",
diff --git a/Cargo.toml b/Cargo.toml
index 5b81617..82d31ec 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -21,6 +21,7 @@ tokio = "0.1.22"
futures = "0.1.28"
arraydeque = "^0.4"
treebitmap = "^0.4"
+crossbeam-deque = "0.7"
[dependencies.x25519-dalek]
version = "^0.5"
diff --git a/src/main.rs b/src/main.rs
index 90ba496..22e1585 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,6 +1,7 @@
#![feature(test)]
mod handshake;
+mod platform;
mod router;
mod types;
@@ -13,7 +14,7 @@ fn main() {
// choose optimal crypto implementations for platform
sodiumoxide::init().unwrap();
- let mut rdev = router::Device::new();
+ let mut rdev = router::Device::new(8);
let pref = rdev.add();
}
diff --git a/src/platform/mod.rs b/src/platform/mod.rs
new file mode 100644
index 0000000..46f712a
--- /dev/null
+++ b/src/platform/mod.rs
@@ -0,0 +1,2 @@
+mod tun;
+mod udp;
diff --git a/src/platform/tun.rs b/src/platform/tun.rs
new file mode 100644
index 0000000..45fd591
--- /dev/null
+++ b/src/platform/tun.rs
@@ -0,0 +1,10 @@
+use std::sync::atomic::AtomicUsize;
+use std::sync::Arc;
+
+pub trait Tun: Send + Sync {
+ type Error;
+
+ fn new(mtu: Arc<AtomicUsize>) -> Self;
+ fn read(&self, dst: &mut [u8]) -> Result<usize, Self::Error>;
+ fn write(&self, src: &[u8]) -> Result<(), Self::Error>;
+}
diff --git a/src/platform/udp.rs b/src/platform/udp.rs
new file mode 100644
index 0000000..f21a3d3
--- /dev/null
+++ b/src/platform/udp.rs
@@ -0,0 +1,11 @@
+/* Often times an a file descriptor in an atomic might suffice.
+ */
+pub trait Bind<Endpoint>: Send + Sync {
+ type Error;
+
+ fn new() -> Self;
+ fn set_port(&self, port: u16) -> Result<(), Self::Error>;
+ fn get_port(&self) -> u16;
+ fn recv(&self, dst: &mut [u8]) -> Endpoint;
+ fn send(&self, src: &[u8], dst: &Endpoint);
+}
diff --git a/src/router/device.rs b/src/router/device.rs
index 3b29312..5dfd22c 100644
--- a/src/router/device.rs
+++ b/src/router/device.rs
@@ -1,11 +1,11 @@
-use arraydeque::{ArrayDeque, Saturating, Wrapping};
+use arraydeque::{ArrayDeque, Wrapping};
use treebitmap::IpLookupTable;
+use crossbeam_deque::{Injector, Steal};
use std::collections::HashMap;
-use std::error::Error;
+use std::mem;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
-use std::ptr;
-use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU64, Ordering};
+use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::sync::{Arc, Mutex, Weak};
use std::thread;
@@ -21,12 +21,29 @@ use std::u64;
const REJECT_AFTER_MESSAGES: u64 = u64::MAX - (1 << 4);
const MAX_STAGED_PACKETS: usize = 128;
-pub struct Device {
+struct DeviceInner {
+ stopped: AtomicBool,
+ injector: Injector<()>, // parallel enc/dec task injector
+ threads: Vec<thread::JoinHandle<()>>,
recv: spin::RwLock<HashMap<u32, DecryptionState>>,
ipv4: IpLookupTable<Ipv4Addr, Weak<PeerInner>>,
ipv6: IpLookupTable<Ipv6Addr, Weak<PeerInner>>,
}
+struct PeerInner {
+ stopped: AtomicBool,
+ thread_outbound: spin::Mutex<thread::JoinHandle<()>>,
+ thread_inbound: spin::Mutex<thread::JoinHandle<()>>,
+ inorder_outbound: SyncSender<()>,
+ inorder_inbound: SyncSender<()>,
+ staged_packets: Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake
+ rx_bytes: AtomicU64, // received bytes
+ tx_bytes: AtomicU64, // transmitted bytes
+ keys: spin::Mutex<KeyWheel>, // key-wheel
+ ekey: spin::Mutex<EncryptionState>, // encryption state
+ endpoint: spin::Mutex<Option<Arc<SocketAddr>>>,
+}
+
struct EncryptionState {
key: [u8; 32], // encryption key
id: u32, // sender id
@@ -48,33 +65,34 @@ struct KeyWheel {
previous: Option<KeyPair>, // old key state (used for decryption)
}
-struct PeerInner {
- inorder_outbound: SyncSender<()>,
- inorder_inbound: SyncSender<()>,
- staged_packets: Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake
- rx_bytes: AtomicU64, // received bytes
- tx_bytes: AtomicU64, // transmitted bytes
- keys: spin::Mutex<KeyWheel>, // key-wheel
- ekey: spin::Mutex<EncryptionState>, // encryption state
- endpoint: spin::Mutex<Option<Arc<SocketAddr>>>,
-}
-
pub struct Peer(Arc<PeerInner>);
+pub struct Device(DeviceInner);
impl Drop for Peer {
fn drop(&mut self) {
- // stop threads and remove peer from device
+ // mark peer as stopped
+ let inner = &self.0;
+ inner.stopped.store(true, Ordering::SeqCst);
+
+ // unpark threads to stop
+ inner.thread_inbound.lock().thread().unpark();
+ inner.thread_outbound.lock().thread().unpark();
}
}
impl Drop for Device {
fn drop(&mut self) {
- // stop threads
+ // mark device as stopped
+ let inner = &self.0;
+ inner.stopped.store(true, Ordering::SeqCst);
+
+ // eat all parallel jobs
+ while inner.injector.steal() != Steal::Empty {}
}
}
impl Peer {
- fn set_endpoint(&self, endpoint: SocketAddr) {
+ pub fn set_endpoint(&self, endpoint: SocketAddr) {
*self.0.endpoint.lock() = Some(Arc::new(endpoint))
}
@@ -87,7 +105,7 @@ impl Peer {
};
}
- pub fn keypair_add(&self, new: KeyPair) -> Option<u32> {
+ fn keypair_add(&self, new: KeyPair) -> Option<u32> {
let mut keys = self.0.keys.lock();
let release = keys.previous.map(|k| k.recv.id);
@@ -124,30 +142,21 @@ impl Peer {
}
impl Device {
- pub fn new() -> Device {
- Device {
+ pub fn new(workers: usize) -> Device {
+ Device(DeviceInner {
+ threads: vec![],
+ stopped: AtomicBool::new(false),
+ injector: Injector::new(),
recv: spin::RwLock::new(HashMap::new()),
ipv4: IpLookupTable::new(),
ipv6: IpLookupTable::new(),
- }
- }
-
- pub fn release(&self, id: u32) {
- debug_assert!(
- if let Some(_) = self.recv.read().get(&id) {
- true
- } else {
- false
- },
- true
- );
- self.recv.write().remove(&id);
+ })
}
pub fn add_subnet(&mut self, ip: IpAddr, masklen: u32, peer: Peer) {
match ip {
- IpAddr::V4(v4) => self.ipv4.insert(v4, masklen, Arc::downgrade(&peer.0)),
- IpAddr::V6(v6) => self.ipv6.insert(v6, masklen, Arc::downgrade(&peer.0)),
+ IpAddr::V4(v4) => self.0.ipv4.insert(v4, masklen, Arc::downgrade(&peer.0)),
+ IpAddr::V6(v6) => self.0.ipv6.insert(v6, masklen, Arc::downgrade(&peer.0)),
};
}
@@ -155,7 +164,7 @@ impl Device {
let mut subnets = Vec::new();
// extract ipv4 entries
- for subnet in self.ipv4.iter() {
+ for subnet in self.0.ipv4.iter() {
let (ip, masklen, p) = subnet;
if let Some(p) = p.upgrade() {
if Arc::ptr_eq(&p, &peer.0) {
@@ -165,7 +174,7 @@ impl Device {
}
// extract ipv6 entries
- for subnet in self.ipv6.iter() {
+ for subnet in self.0.ipv6.iter() {
let (ip, masklen, p) = subnet;
if let Some(p) = p.upgrade() {
if Arc::ptr_eq(&p, &peer.0) {
@@ -182,7 +191,7 @@ impl Device {
let release = peer.keypair_add(new);
// update incoming packet id map
- let mut recv = self.recv.write();
+ let mut recv = self.0.recv.write();
// release id of previous keypair
if let Some(id) = release {
diff --git a/src/router/mod.rs b/src/router/mod.rs
index b53b10c..7055875 100644
--- a/src/router/mod.rs
+++ b/src/router/mod.rs
@@ -4,4 +4,4 @@ mod device;
// mod inbound;
// mod outbound;
-pub use device::Device; \ No newline at end of file
+pub use device::{Device, Peer}; \ No newline at end of file