From 5aeea9b619cbdc3776a23aa5a1c6547d42c7f427 Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Fri, 16 Aug 2019 22:00:48 +0200 Subject: Begin drafting cross-platform interface --- src/main.rs | 3 +- src/platform/mod.rs | 2 ++ src/platform/tun.rs | 10 ++++++ src/platform/udp.rs | 11 +++++++ src/router/device.rs | 89 +++++++++++++++++++++++++++++----------------------- src/router/mod.rs | 2 +- 6 files changed, 75 insertions(+), 42 deletions(-) create mode 100644 src/platform/mod.rs create mode 100644 src/platform/tun.rs create mode 100644 src/platform/udp.rs (limited to 'src') diff --git a/src/main.rs b/src/main.rs index 90ba496..22e1585 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ #![feature(test)] mod handshake; +mod platform; mod router; mod types; @@ -13,7 +14,7 @@ fn main() { // choose optimal crypto implementations for platform sodiumoxide::init().unwrap(); - let mut rdev = router::Device::new(); + let mut rdev = router::Device::new(8); let pref = rdev.add(); } diff --git a/src/platform/mod.rs b/src/platform/mod.rs new file mode 100644 index 0000000..46f712a --- /dev/null +++ b/src/platform/mod.rs @@ -0,0 +1,2 @@ +mod tun; +mod udp; diff --git a/src/platform/tun.rs b/src/platform/tun.rs new file mode 100644 index 0000000..45fd591 --- /dev/null +++ b/src/platform/tun.rs @@ -0,0 +1,10 @@ +use std::sync::atomic::AtomicUsize; +use std::sync::Arc; + +pub trait Tun: Send + Sync { + type Error; + + fn new(mtu: Arc) -> Self; + fn read(&self, dst: &mut [u8]) -> Result; + fn write(&self, src: &[u8]) -> Result<(), Self::Error>; +} diff --git a/src/platform/udp.rs b/src/platform/udp.rs new file mode 100644 index 0000000..f21a3d3 --- /dev/null +++ b/src/platform/udp.rs @@ -0,0 +1,11 @@ +/* Often times an a file descriptor in an atomic might suffice. + */ +pub trait Bind: Send + Sync { + type Error; + + fn new() -> Self; + fn set_port(&self, port: u16) -> Result<(), Self::Error>; + fn get_port(&self) -> u16; + fn recv(&self, dst: &mut [u8]) -> Endpoint; + fn send(&self, src: &[u8], dst: &Endpoint); +} diff --git a/src/router/device.rs b/src/router/device.rs index 3b29312..5dfd22c 100644 --- a/src/router/device.rs +++ b/src/router/device.rs @@ -1,11 +1,11 @@ -use arraydeque::{ArrayDeque, Saturating, Wrapping}; +use arraydeque::{ArrayDeque, Wrapping}; use treebitmap::IpLookupTable; +use crossbeam_deque::{Injector, Steal}; use std::collections::HashMap; -use std::error::Error; +use std::mem; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; -use std::ptr; -use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; use std::sync::{Arc, Mutex, Weak}; use std::thread; @@ -21,12 +21,29 @@ use std::u64; const REJECT_AFTER_MESSAGES: u64 = u64::MAX - (1 << 4); const MAX_STAGED_PACKETS: usize = 128; -pub struct Device { +struct DeviceInner { + stopped: AtomicBool, + injector: Injector<()>, // parallel enc/dec task injector + threads: Vec>, recv: spin::RwLock>, ipv4: IpLookupTable>, ipv6: IpLookupTable>, } +struct PeerInner { + stopped: AtomicBool, + thread_outbound: spin::Mutex>, + thread_inbound: spin::Mutex>, + inorder_outbound: SyncSender<()>, + inorder_inbound: SyncSender<()>, + staged_packets: Mutex; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake + rx_bytes: AtomicU64, // received bytes + tx_bytes: AtomicU64, // transmitted bytes + keys: spin::Mutex, // key-wheel + ekey: spin::Mutex, // encryption state + endpoint: spin::Mutex>>, +} + struct EncryptionState { key: [u8; 32], // encryption key id: u32, // sender id @@ -48,33 +65,34 @@ struct KeyWheel { previous: Option, // old key state (used for decryption) } -struct PeerInner { - inorder_outbound: SyncSender<()>, - inorder_inbound: SyncSender<()>, - staged_packets: Mutex; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake - rx_bytes: AtomicU64, // received bytes - tx_bytes: AtomicU64, // transmitted bytes - keys: spin::Mutex, // key-wheel - ekey: spin::Mutex, // encryption state - endpoint: spin::Mutex>>, -} - pub struct Peer(Arc); +pub struct Device(DeviceInner); impl Drop for Peer { fn drop(&mut self) { - // stop threads and remove peer from device + // mark peer as stopped + let inner = &self.0; + inner.stopped.store(true, Ordering::SeqCst); + + // unpark threads to stop + inner.thread_inbound.lock().thread().unpark(); + inner.thread_outbound.lock().thread().unpark(); } } impl Drop for Device { fn drop(&mut self) { - // stop threads + // mark device as stopped + let inner = &self.0; + inner.stopped.store(true, Ordering::SeqCst); + + // eat all parallel jobs + while inner.injector.steal() != Steal::Empty {} } } impl Peer { - fn set_endpoint(&self, endpoint: SocketAddr) { + pub fn set_endpoint(&self, endpoint: SocketAddr) { *self.0.endpoint.lock() = Some(Arc::new(endpoint)) } @@ -87,7 +105,7 @@ impl Peer { }; } - pub fn keypair_add(&self, new: KeyPair) -> Option { + fn keypair_add(&self, new: KeyPair) -> Option { let mut keys = self.0.keys.lock(); let release = keys.previous.map(|k| k.recv.id); @@ -124,30 +142,21 @@ impl Peer { } impl Device { - pub fn new() -> Device { - Device { + pub fn new(workers: usize) -> Device { + Device(DeviceInner { + threads: vec![], + stopped: AtomicBool::new(false), + injector: Injector::new(), recv: spin::RwLock::new(HashMap::new()), ipv4: IpLookupTable::new(), ipv6: IpLookupTable::new(), - } - } - - pub fn release(&self, id: u32) { - debug_assert!( - if let Some(_) = self.recv.read().get(&id) { - true - } else { - false - }, - true - ); - self.recv.write().remove(&id); + }) } pub fn add_subnet(&mut self, ip: IpAddr, masklen: u32, peer: Peer) { match ip { - IpAddr::V4(v4) => self.ipv4.insert(v4, masklen, Arc::downgrade(&peer.0)), - IpAddr::V6(v6) => self.ipv6.insert(v6, masklen, Arc::downgrade(&peer.0)), + IpAddr::V4(v4) => self.0.ipv4.insert(v4, masklen, Arc::downgrade(&peer.0)), + IpAddr::V6(v6) => self.0.ipv6.insert(v6, masklen, Arc::downgrade(&peer.0)), }; } @@ -155,7 +164,7 @@ impl Device { let mut subnets = Vec::new(); // extract ipv4 entries - for subnet in self.ipv4.iter() { + for subnet in self.0.ipv4.iter() { let (ip, masklen, p) = subnet; if let Some(p) = p.upgrade() { if Arc::ptr_eq(&p, &peer.0) { @@ -165,7 +174,7 @@ impl Device { } // extract ipv6 entries - for subnet in self.ipv6.iter() { + for subnet in self.0.ipv6.iter() { let (ip, masklen, p) = subnet; if let Some(p) = p.upgrade() { if Arc::ptr_eq(&p, &peer.0) { @@ -182,7 +191,7 @@ impl Device { let release = peer.keypair_add(new); // update incoming packet id map - let mut recv = self.recv.write(); + let mut recv = self.0.recv.write(); // release id of previous keypair if let Some(id) = release { diff --git a/src/router/mod.rs b/src/router/mod.rs index b53b10c..7055875 100644 --- a/src/router/mod.rs +++ b/src/router/mod.rs @@ -4,4 +4,4 @@ mod device; // mod inbound; // mod outbound; -pub use device::Device; \ No newline at end of file +pub use device::{Device, Peer}; \ No newline at end of file -- cgit v1.2.3-59-g8ed1b