aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2019-08-20 14:33:11 +0200
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2019-08-20 14:33:11 +0200
commit7e727d120b4c7375b7dd2f1210a883c876531c06 (patch)
tree43b30640038535c9f3d6640659707c5fd36b595b
parentImplemented keypair_confirm (diff)
downloadwireguard-rs-7e727d120b4c7375b7dd2f1210a883c876531c06.tar.xz
wireguard-rs-7e727d120b4c7375b7dd2f1210a883c876531c06.zip
Restructure and job stealing work queue
-rw-r--r--src/main.rs4
-rw-r--r--src/router/device.rs394
-rw-r--r--src/router/mod.rs6
-rw-r--r--src/router/outbound.rs32
-rw-r--r--src/router/peer.rs285
-rw-r--r--src/router/workers.rs153
-rw-r--r--src/types/endpoint.rs6
-rw-r--r--src/types/mod.rs4
-rw-r--r--src/types/udp.rs19
9 files changed, 487 insertions, 416 deletions
diff --git a/src/main.rs b/src/main.rs
index 272c5e2..aa73fd2 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -14,7 +14,7 @@ fn main() {
// choose optimal crypto implementations for platform
sodiumoxide::init().unwrap();
- let mut rdev = router::Device::new(8);
+ let mut router = router::Device::new(8);
- let pref = rdev.new_peer();
+ let peer = router.new_peer();
}
diff --git a/src/router/device.rs b/src/router/device.rs
index 8f3d485..d92250e 100644
--- a/src/router/device.rs
+++ b/src/router/device.rs
@@ -16,149 +16,37 @@ use spin;
use super::super::constants::*;
use super::super::types::KeyPair;
use super::anti_replay::AntiReplay;
-
-use std::u64;
-
-const MAX_STAGED_PACKETS: usize = 128;
-
-struct DeviceInner {
- stopped: AtomicBool,
- injector: Injector<()>, // parallel enc/dec task injector
- threads: Vec<thread::JoinHandle<()>>, // join handles of worker threads
- recv: spin::RwLock<HashMap<u32, DecryptionState>>, // receiver id -> decryption state
- ipv4: spin::RwLock<IpLookupTable<Ipv4Addr, Weak<PeerInner>>>, // ipv4 cryptkey routing
- ipv6: spin::RwLock<IpLookupTable<Ipv6Addr, Weak<PeerInner>>>, // ipv6 cryptkey routing
-}
-
-struct PeerInner {
- stopped: AtomicBool,
- device: Arc<DeviceInner>,
- thread_outbound: spin::Mutex<thread::JoinHandle<()>>,
- thread_inbound: spin::Mutex<thread::JoinHandle<()>>,
- inorder_outbound: SyncSender<()>,
- inorder_inbound: SyncSender<()>,
- staged_packets: spin::Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake
- rx_bytes: AtomicU64, // received bytes
- tx_bytes: AtomicU64, // transmitted bytes
- keys: spin::Mutex<KeyWheel>, // key-wheel
- ekey: spin::Mutex<Option<EncryptionState>>, // encryption state
- endpoint: spin::Mutex<Option<Arc<SocketAddr>>>,
-}
-
-struct EncryptionState {
- key: [u8; 32], // encryption key
- id: u32, // sender id
- nonce: u64, // next available nonce
- death: Instant, // time when the key no longer can be used for encryption
- // (birth + reject-after-time - keepalive-timeout - rekey-timeout)
+use super::peer;
+use super::peer::{Peer, PeerInner};
+use super::workers;
+
+pub struct DeviceInner {
+ pub stopped: AtomicBool,
+ pub injector: Injector<()>, // parallel enc/dec task injector
+ pub threads: Vec<thread::JoinHandle<()>>, // join handles of worker threads
+ pub recv: spin::RwLock<HashMap<u32, DecryptionState>>, // receiver id -> decryption state
+ pub ipv4: spin::RwLock<IpLookupTable<Ipv4Addr, Weak<PeerInner>>>, // ipv4 cryptkey routing
+ pub ipv6: spin::RwLock<IpLookupTable<Ipv6Addr, Weak<PeerInner>>>, // ipv6 cryptkey routing
}
-struct DecryptionState {
- key: [u8; 32],
- // keypair: Weak<KeyPair>,
- protector: spin::Mutex<AntiReplay>,
- peer: Weak<PeerInner>,
- death: Instant, // time when the key can no longer be used for decryption
+pub struct EncryptionState {
+ pub key: [u8; 32], // encryption key
+ pub id: u32, // sender id
+ pub nonce: u64, // next available nonce
+ pub death: Instant, // time when the key no longer can be used for encryption
+ // (birth + reject-after-time - keepalive-timeout - rekey-timeout)
}
-struct KeyWheel {
- next: Option<Arc<KeyPair>>, // next key state (unconfirmed)
- current: Option<Arc<KeyPair>>, // current key state (used for encryption)
- previous: Option<Arc<KeyPair>>, // old key state (used for decryption)
- retired: Option<u32>, // retired id (previous id, after confirming key-pair)
+pub struct DecryptionState {
+ pub key: [u8; 32],
+ pub keypair: Weak<KeyPair>,
+ pub protector: spin::Mutex<AntiReplay>,
+ pub peer: Weak<PeerInner>,
+ pub death: Instant, // time when the key can no longer be used for decryption
}
-pub struct Peer(Arc<PeerInner>);
pub struct Device(Arc<DeviceInner>);
-fn treebit_list<A, R>(
- peer: &Arc<PeerInner>,
- table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner>>>,
- callback: Box<dyn Fn(A, u32) -> R>,
-) -> Vec<R>
-where
- A: Address,
-{
- let mut res = Vec::new();
- for subnet in table.read().iter() {
- let (ip, masklen, p) = subnet;
- if let Some(p) = p.upgrade() {
- if Arc::ptr_eq(&p, &peer) {
- res.push(callback(ip, masklen))
- }
- }
- }
- res
-}
-
-fn treebit_remove<A>(peer: &Peer, table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner>>>)
-where
- A: Address,
-{
- let mut m = table.write();
-
- // collect keys for value
- let mut subnets = vec![];
- for subnet in m.iter() {
- let (ip, masklen, p) = subnet;
- if let Some(p) = p.upgrade() {
- if Arc::ptr_eq(&p, &peer.0) {
- subnets.push((ip, masklen))
- }
- }
- }
-
- // remove all key mappings
- for subnet in subnets {
- let r = m.remove(subnet.0, subnet.1);
- debug_assert!(r.is_some());
- }
-}
-
-impl Drop for Peer {
- fn drop(&mut self) {
- // mark peer as stopped
-
- let peer = &self.0;
- peer.stopped.store(true, Ordering::SeqCst);
-
- // remove from cryptkey router
-
- treebit_remove(self, &peer.device.ipv4);
- treebit_remove(self, &peer.device.ipv6);
-
- // unpark threads
-
- peer.thread_inbound.lock().thread().unpark();
- peer.thread_outbound.lock().thread().unpark();
-
- // release ids from the receiver map
-
- let mut keys = peer.keys.lock();
- let mut release = Vec::with_capacity(3);
-
- keys.next.as_ref().map(|k| release.push(k.recv.id));
- keys.current.as_ref().map(|k| release.push(k.recv.id));
- keys.previous.as_ref().map(|k| release.push(k.recv.id));
-
- if release.len() > 0 {
- let mut recv = peer.device.recv.write();
- for id in &release {
- recv.remove(id);
- }
- }
-
- // null key-material (TODO: extend)
-
- keys.next = None;
- keys.current = None;
- keys.previous = None;
-
- *peer.ekey.lock() = None;
- *peer.endpoint.lock() = None;
- }
-}
-
impl Drop for Device {
fn drop(&mut self) {
// mark device as stopped
@@ -175,163 +63,6 @@ impl Drop for Device {
}
}
-impl PeerInner {
- pub fn keypair_confirm(&self, kp: Weak<KeyPair>) {
- let mut keys = self.keys.lock();
-
- // Attempt to upgrade Weak -> Arc
- // (this should ensure that the key is in the key-wheel,
- // which holds the only strong reference)
- let kp = match kp.upgrade() {
- Some(kp) => kp,
- None => {
- return;
- }
- };
-
- debug_assert!(
- keys.retired.is_none(),
- "retired spot is not free for previous"
- );
-
- debug_assert!(
- if let Some(key) = &keys.next {
- Arc::ptr_eq(&kp, &key)
- } else {
- false
- },
- "if next has been overwritten, before confirmation, the key-pair should have been dropped!"
- );
-
- // enable use for encryption and set confirmed
- *self.ekey.lock() = Some(EncryptionState {
- id: kp.send.id,
- key: kp.send.key,
- nonce: 0,
- death: kp.birth + REJECT_AFTER_TIME,
- });
-
- // rotate the key-wheel
- let release = keys.previous.as_ref().map(|k| k.recv.id);
- keys.previous = keys.current.as_ref().map(|v| v.clone());
- keys.current = Some(kp.clone());
- keys.retired = release;
- }
-}
-
-/// Public interface and handle to the peer
-impl Peer {
- pub fn set_endpoint(&self, endpoint: SocketAddr) {
- *self.0.endpoint.lock() = Some(Arc::new(endpoint))
- }
-
- /// Add a new keypair
- ///
- /// # Arguments
- ///
- /// - new: The new confirmed/unconfirmed key pair
- ///
- /// # Returns
- ///
- /// A vector of ids which has been released.
- /// These should be released in the handshake module.
- pub fn add_keypair(&self, new: KeyPair) -> Vec<u32> {
- let mut keys = self.0.keys.lock();
- let mut release = Vec::with_capacity(2);
-
- // collect ids to be released
- keys.retired.map(|v| release.push(v));
- keys.previous.as_ref().map(|k| release.push(k.recv.id));
-
- // update key-wheel
- if new.confirmed {
- // start using key for encryption
- *self.0.ekey.lock() = Some(EncryptionState {
- id: new.send.id,
- key: new.send.key,
- nonce: 0,
- death: new.birth + REJECT_AFTER_TIME,
- });
-
- // move current into previous
- keys.previous = keys.current.as_ref().map(|v| v.clone());;
- keys.current = Some(Arc::new(new));
- } else {
- // store the key and await confirmation
- keys.previous = keys.next.as_ref().map(|v| v.clone());;
- keys.next = Some(Arc::new(new));
- };
-
- // update incoming packet id map
- {
- let mut recv = self.0.device.recv.write();
-
- // purge recv map of released ids
- for id in &release {
- recv.remove(&id);
- }
-
- // map new id to keypair
- debug_assert!(!recv.contains_key(&new.recv.id));
-
- recv.insert(
- new.recv.id,
- DecryptionState {
- key: new.recv.key,
- protector: spin::Mutex::new(AntiReplay::new()),
- peer: Arc::downgrade(&self.0),
- death: new.birth + REJECT_AFTER_TIME,
- },
- );
- }
-
- // return the released id (for handshake state machine)
- release
- }
-
- pub fn rx_bytes(&self) -> u64 {
- self.0.rx_bytes.load(Ordering::Relaxed)
- }
-
- pub fn tx_bytes(&self) -> u64 {
- self.0.tx_bytes.load(Ordering::Relaxed)
- }
-
- pub fn add_subnet(&self, ip: IpAddr, masklen: u32) {
- match ip {
- IpAddr::V4(v4) => {
- self.0
- .device
- .ipv4
- .write()
- .insert(v4, masklen, Arc::downgrade(&self.0))
- }
- IpAddr::V6(v6) => {
- self.0
- .device
- .ipv6
- .write()
- .insert(v6, masklen, Arc::downgrade(&self.0))
- }
- };
- }
-
- pub fn list_subnets(&self) -> Vec<(IpAddr, u32)> {
- let mut res = Vec::new();
- res.append(&mut treebit_list(
- &self.0,
- &self.0.device.ipv4,
- Box::new(|ip, masklen| (IpAddr::V4(ip), masklen)),
- ));
- res.append(&mut treebit_list(
- &self.0,
- &self.0.device.ipv6,
- Box::new(|ip, masklen| (IpAddr::V6(ip), masklen)),
- ));
- res
- }
-}
-
impl Device {
pub fn new(workers: usize) -> Device {
Device(Arc::new(DeviceInner {
@@ -350,34 +81,7 @@ impl Device {
///
/// A atomic ref. counted peer (with liftime matching the device)
pub fn new_peer(&self) -> Peer {
- // spawn inbound thread
- let (send_inbound, recv_inbound) = sync_channel(1);
- let handle_inbound = thread::spawn(move || {});
-
- // spawn outbound thread
- let (send_outbound, recv_inbound) = sync_channel(1);
- let handle_outbound = thread::spawn(move || {});
-
- // allocate peer object
- Peer(Arc::new(PeerInner {
- stopped: AtomicBool::new(false),
- device: self.0.clone(),
- ekey: spin::Mutex::new(None),
- endpoint: spin::Mutex::new(None),
- inorder_inbound: send_inbound,
- inorder_outbound: send_outbound,
- keys: spin::Mutex::new(KeyWheel {
- next: None,
- current: None,
- previous: None,
- retired: None,
- }),
- rx_bytes: AtomicU64::new(0),
- tx_bytes: AtomicU64::new(0),
- staged_packets: spin::Mutex::new(ArrayDeque::new()),
- thread_inbound: spin::Mutex::new(handle_inbound),
- thread_outbound: spin::Mutex::new(handle_outbound),
- }))
+ peer::new_peer(self.0.clone())
}
/// Cryptkey routes and sends a plaintext message (IP packet)
@@ -396,39 +100,6 @@ impl Device {
unimplemented!();
}
- /// Sends a message directly to the peer.
- /// The router device takes care of discovering/managing the endpoint.
- /// This is used for handshake initiation/response messages
- ///
- /// # Arguments
- ///
- /// - peer: Reference to the destination peer
- /// - msg: Message to transmit
- pub fn send_raw(&self, peer: Arc<Peer>, msg: &mut [u8]) {
- unimplemented!();
- }
-
- /// Flush the queue of buffered messages awaiting transmission
- ///
- /// # Arguments
- ///
- /// - peer: Reference for the peer to flush
- pub fn flush_queue(&self, peer: Arc<Peer>) {
- unimplemented!();
- }
-
- /// Attempt to route, encrypt and send all elements buffered in the queue
- ///
- /// # Arguments
- ///
- /// # Returns
- ///
- /// A boolean indicating whether packages where sent.
- /// Note: This is used for implicit confirmation of handshakes.
- pub fn send_run_queue(&self, peer: Arc<Peer>) -> bool {
- unimplemented!();
- }
-
/// Receive an encrypted transport message
///
/// # Arguments
@@ -437,21 +108,4 @@ impl Device {
pub fn recv(&self, ct_msg: &mut [u8]) {
unimplemented!();
}
-
- /// Returns the current endpoint known for the peer
- ///
- /// # Arguments
- ///
- /// - peer: The peer to retrieve the endpoint for
- pub fn get_endpoint(&self, peer: Arc<Peer>) -> SocketAddr {
- unimplemented!();
- }
-
- pub fn set_endpoint(&self, peer: Arc<Peer>, endpoint: SocketAddr) {
- unimplemented!();
- }
-
- pub fn new_keypair(&self, peer: Arc<Peer>, keypair: KeyPair) {
- unimplemented!();
- }
}
diff --git a/src/router/mod.rs b/src/router/mod.rs
index 7055875..6a4dd61 100644
--- a/src/router/mod.rs
+++ b/src/router/mod.rs
@@ -2,6 +2,8 @@ mod anti_replay;
mod buffer;
mod device;
// mod inbound;
-// mod outbound;
+mod workers;
+mod peer;
-pub use device::{Device, Peer}; \ No newline at end of file
+pub use peer::Peer;
+pub use device::Device; \ No newline at end of file
diff --git a/src/router/outbound.rs b/src/router/outbound.rs
deleted file mode 100644
index cc6a4bf..0000000
--- a/src/router/outbound.rs
+++ /dev/null
@@ -1,32 +0,0 @@
-use spin;
-use std::thread;
-use std::sync::Arc;
-use std::sync::mpsc::{Receiver, sync_channel};
-
-struct JobInner {
- done : bool, // is encryption complete?
- msg : Vec<u8>, // transport message (id, nonce already set)
- key : [u8; 32], // encryption key
- handle : thread::JoinHandle
-}
-
-type Job = Arc<spin::Mutex<JobInner>>;
-
-fn worker_parallel()
-
-fn worker_inorder(channel : Receiver<Job>) {
- for ordered in channel.recv().iter() {
- loop {
- // check if job is complete
- match ordered.try_lock() {
- None => (),
- Some(guard) => if guard.done {
- // write to UDP interface
- }
- }
-
- // wait for job to complete
- thread::park();
- }
- }
-} \ No newline at end of file
diff --git a/src/router/peer.rs b/src/router/peer.rs
new file mode 100644
index 0000000..f7b8bf4
--- /dev/null
+++ b/src/router/peer.rs
@@ -0,0 +1,285 @@
+use std::sync::atomic::{AtomicU64, AtomicBool, Ordering};
+use std::sync::{Weak, Arc};
+use std::thread;
+
+use std::net::{IpAddr, SocketAddr};
+
+use std::sync::mpsc::{sync_channel, SyncSender};
+
+use spin;
+
+use arraydeque::{ArrayDeque, Wrapping};
+
+use treebitmap::IpLookupTable;
+use treebitmap::address::Address;
+
+use super::super::types::KeyPair;
+use super::super::constants::*;
+
+use super::anti_replay::AntiReplay;
+use super::device::DeviceInner;
+use super::device::EncryptionState;
+use super::device::DecryptionState;
+
+const MAX_STAGED_PACKETS: usize = 128;
+
+struct KeyWheel {
+ next: Option<Arc<KeyPair>>, // next key state (unconfirmed)
+ current: Option<Arc<KeyPair>>, // current key state (used for encryption)
+ previous: Option<Arc<KeyPair>>, // old key state (used for decryption)
+ retired: Option<u32>, // retired id (previous id, after confirming key-pair)
+}
+
+pub struct PeerInner {
+ stopped: AtomicBool,
+ device: Arc<DeviceInner>,
+ thread_outbound: spin::Mutex<thread::JoinHandle<()>>,
+ thread_inbound: spin::Mutex<thread::JoinHandle<()>>,
+ inorder_outbound: SyncSender<()>,
+ inorder_inbound: SyncSender<()>,
+ staged_packets: spin::Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake
+ rx_bytes: AtomicU64, // received bytes
+ tx_bytes: AtomicU64, // transmitted bytes
+ keys: spin::Mutex<KeyWheel>, // key-wheel
+ ekey: spin::Mutex<Option<EncryptionState>>, // encryption state
+ endpoint: spin::Mutex<Option<Arc<SocketAddr>>>,
+}
+
+pub struct Peer(Arc<PeerInner>);
+
+fn treebit_list<A, R>(
+ peer: &Arc<PeerInner>,
+ table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner>>>,
+ callback: Box<dyn Fn(A, u32) -> R>,
+) -> Vec<R>
+where
+ A: Address,
+{
+ let mut res = Vec::new();
+ for subnet in table.read().iter() {
+ let (ip, masklen, p) = subnet;
+ if let Some(p) = p.upgrade() {
+ if Arc::ptr_eq(&p, &peer) {
+ res.push(callback(ip, masklen))
+ }
+ }
+ }
+ res
+}
+
+fn treebit_remove<A>(peer: &Peer, table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner>>>)
+where
+ A: Address,
+{
+ let mut m = table.write();
+
+ // collect keys for value
+ let mut subnets = vec![];
+ for subnet in m.iter() {
+ let (ip, masklen, p) = subnet;
+ if let Some(p) = p.upgrade() {
+ if Arc::ptr_eq(&p, &peer.0) {
+ subnets.push((ip, masklen))
+ }
+ }
+ }
+
+ // remove all key mappings
+ for subnet in subnets {
+ let r = m.remove(subnet.0, subnet.1);
+ debug_assert!(r.is_some());
+ }
+}
+
+impl Drop for Peer {
+ fn drop(&mut self) {
+ // mark peer as stopped
+
+ let peer = &self.0;
+ peer.stopped.store(true, Ordering::SeqCst);
+
+ // remove from cryptkey router
+
+ treebit_remove(self, &peer.device.ipv4);
+ treebit_remove(self, &peer.device.ipv6);
+
+ // unpark threads
+
+ peer.thread_inbound.lock().thread().unpark();
+ peer.thread_outbound.lock().thread().unpark();
+
+ // release ids from the receiver map
+
+ let mut keys = peer.keys.lock();
+ let mut release = Vec::with_capacity(3);
+
+ keys.next.as_ref().map(|k| release.push(k.recv.id));
+ keys.current.as_ref().map(|k| release.push(k.recv.id));
+ keys.previous.as_ref().map(|k| release.push(k.recv.id));
+
+ if release.len() > 0 {
+ let mut recv = peer.device.recv.write();
+ for id in &release {
+ recv.remove(id);
+ }
+ }
+
+ // null key-material (TODO: extend)
+
+ keys.next = None;
+ keys.current = None;
+ keys.previous = None;
+
+ *peer.ekey.lock() = None;
+ *peer.endpoint.lock() = None;
+ }
+}
+
+pub fn new_peer(device: Arc<DeviceInner>) -> Peer {
+ // spawn inbound thread
+ let (send_inbound, recv_inbound) = sync_channel(1);
+ let handle_inbound = thread::spawn(move || {});
+
+ // spawn outbound thread
+ let (send_outbound, recv_inbound) = sync_channel(1);
+ let handle_outbound = thread::spawn(move || {});
+
+ // allocate peer object
+ Peer::new(PeerInner {
+ stopped: AtomicBool::new(false),
+ device: device,
+ ekey: spin::Mutex::new(None),
+ endpoint: spin::Mutex::new(None),
+ inorder_inbound: send_inbound,
+ inorder_outbound: send_outbound,
+ keys: spin::Mutex::new(KeyWheel {
+ next: None,
+ current: None,
+ previous: None,
+ retired: None,
+ }),
+ rx_bytes: AtomicU64::new(0),
+ tx_bytes: AtomicU64::new(0),
+ staged_packets: spin::Mutex::new(ArrayDeque::new()),
+ thread_inbound: spin::Mutex::new(handle_inbound),
+ thread_outbound: spin::Mutex::new(handle_outbound),
+ })
+}
+
+impl Peer {
+ fn new(inner : PeerInner) -> Peer {
+ Peer(Arc::new(inner))
+ }
+
+ pub fn set_endpoint(&self, endpoint: SocketAddr) {
+ *self.0.endpoint.lock() = Some(Arc::new(endpoint))
+ }
+
+ /// Add a new keypair
+ ///
+ /// # Arguments
+ ///
+ /// - new: The new confirmed/unconfirmed key pair
+ ///
+ /// # Returns
+ ///
+ /// A vector of ids which has been released.
+ /// These should be released in the handshake module.
+ pub fn add_keypair(&self, new: KeyPair) -> Vec<u32> {
+ let mut keys = self.0.keys.lock();
+ let mut release = Vec::with_capacity(2);
+ let new = Arc::new(new);
+
+ // collect ids to be released
+ keys.retired.map(|v| release.push(v));
+ keys.previous.as_ref().map(|k| release.push(k.recv.id));
+
+ // update key-wheel
+ if new.confirmed {
+ // start using key for encryption
+ *self.0.ekey.lock() = Some(EncryptionState {
+ id: new.send.id,
+ key: new.send.key,
+ nonce: 0,
+ death: new.birth + REJECT_AFTER_TIME,
+ });
+
+ // move current into previous
+ keys.previous = keys.current.as_ref().map(|v| v.clone());;
+ keys.current = Some(new.clone());
+ } else {
+ // store the key and await confirmation
+ keys.previous = keys.next.as_ref().map(|v| v.clone());;
+ keys.next = Some(new.clone());
+ };
+
+ // update incoming packet id map
+ {
+ let mut recv = self.0.device.recv.write();
+
+ // purge recv map of released ids
+ for id in &release {
+ recv.remove(&id);
+ }
+
+ // map new id to keypair
+ debug_assert!(!recv.contains_key(&new.recv.id));
+
+ recv.insert(
+ new.recv.id,
+ DecryptionState {
+ keypair: Arc::downgrade(&new),
+ key: new.recv.key,
+ protector: spin::Mutex::new(AntiReplay::new()),
+ peer: Arc::downgrade(&self.0),
+ death: new.birth + REJECT_AFTER_TIME,
+ },
+ );
+ }
+
+ // return the released id (for handshake state machine)
+ release
+ }
+
+ pub fn rx_bytes(&self) -> u64 {
+ self.0.rx_bytes.load(Ordering::Relaxed)
+ }
+
+ pub fn tx_bytes(&self) -> u64 {
+ self.0.tx_bytes.load(Ordering::Relaxed)
+ }
+
+ pub fn add_subnet(&self, ip: IpAddr, masklen: u32) {
+ match ip {
+ IpAddr::V4(v4) => {
+ self.0
+ .device
+ .ipv4
+ .write()
+ .insert(v4, masklen, Arc::downgrade(&self.0))
+ }
+ IpAddr::V6(v6) => {
+ self.0
+ .device
+ .ipv6
+ .write()
+ .insert(v6, masklen, Arc::downgrade(&self.0))
+ }
+ };
+ }
+
+ pub fn list_subnets(&self) -> Vec<(IpAddr, u32)> {
+ let mut res = Vec::new();
+ res.append(&mut treebit_list(
+ &self.0,
+ &self.0.device.ipv4,
+ Box::new(|ip, masklen| (IpAddr::V4(ip), masklen)),
+ ));
+ res.append(&mut treebit_list(
+ &self.0,
+ &self.0.device.ipv6,
+ Box::new(|ip, masklen| (IpAddr::V6(ip), masklen)),
+ ));
+ res
+ }
+} \ No newline at end of file
diff --git a/src/router/workers.rs b/src/router/workers.rs
new file mode 100644
index 0000000..2117190
--- /dev/null
+++ b/src/router/workers.rs
@@ -0,0 +1,153 @@
+use super::device::DecryptionState;
+use super::device::DeviceInner;
+use super::peer::PeerInner;
+
+use crossbeam_deque::{Injector, Steal, Stealer, Worker};
+use spin;
+use std::iter;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::mpsc::{sync_channel, Receiver};
+use std::sync::{Arc, Weak};
+use std::thread;
+
+#[derive(PartialEq)]
+enum Operation {
+ Encryption,
+ Decryption,
+}
+
+#[derive(PartialEq)]
+enum Status {
+ Fault, // unsealing failed
+ Done, // job valid and complete
+ Waiting, // job awaiting completion
+}
+
+struct JobInner {
+ msg: Vec<u8>, // message buffer (nonce and receiver id set)
+ key: [u8; 32], // chacha20poly1305 key
+ status: Status, // state of the job
+ op: Operation, // should be buffer be encrypted / decrypted?
+}
+
+type JobBuffer = Arc<spin::Mutex<JobInner>>;
+type JobParallel = (Arc<thread::JoinHandle<()>>, JobBuffer);
+type JobInbound = (Arc<DecryptionState>, JobBuffer);
+type JobOutbound = (Weak<PeerInner>, JobBuffer);
+
+/* Strategy for workers acquiring a new job:
+ *
+ * 1. Try the local job queue (owned by the thread)
+ * 2. Try fetching a batch of jobs from the global injector
+ * 3. Attempt to steal jobs from other threads.
+ */
+fn find_task<T>(local: &Worker<T>, global: &Injector<T>, stealers: &[Stealer<T>]) -> Option<T> {
+ local.pop().or_else(|| {
+ iter::repeat_with(|| {
+ global
+ .steal_batch_and_pop(local)
+ .or_else(|| stealers.iter().map(|s| s.steal()).collect())
+ })
+ .find(|s| !s.is_retry())
+ .and_then(|s| s.success())
+ })
+}
+
+fn worker_inbound(
+ device: Arc<DeviceInner>, // related device
+ peer: Arc<PeerInner>, // related peer
+ recv: Receiver<JobInbound>, // in order queue
+) {
+ // reads from in order channel
+ for job in recv.recv().iter() {
+ loop {
+ let (state, buf) = job;
+
+ // check if job is complete
+ match buf.try_lock() {
+ None => (),
+ Some(buf) => {
+ if buf.status != Status::Waiting {
+ // check replay protector
+
+ // check if confirms keypair
+
+ // write to tun device
+
+ // continue to next job (no parking)
+ break;
+ }
+ }
+ }
+
+ // wait for job to complete
+ thread::park();
+ }
+ }
+}
+
+fn worker_outbound(
+ device: Arc<DeviceInner>, // related device
+ peer: Arc<PeerInner>, // related peer
+ recv: Receiver<JobInbound>, // in order queue
+) {
+ // reads from in order channel
+ for job in recv.recv().iter() {
+ loop {
+ let (peer, buf) = job;
+
+ // check if job is complete
+ match buf.try_lock() {
+ None => (),
+ Some(buf) => {
+ if buf.status != Status::Waiting {
+ // send buffer to peer endpoint
+ break;
+ }
+ }
+ }
+
+ // wait for job to complete
+ thread::park();
+ }
+ }
+}
+
+fn worker_parallel(
+ stopped: Arc<AtomicBool>, // stop workers (device has been dropped)
+ parked: Arc<AtomicBool>, // thread has been parked?
+ local: Worker<JobParallel>, // local job queue (local to thread)
+ global: Injector<JobParallel>, // global job injector
+ stealers: Vec<Stealer<JobParallel>>, // stealers (from other threads)
+) {
+ while !stopped.load(Ordering::SeqCst) {
+ match find_task(&local, &global, &stealers) {
+ Some(job) => {
+ let (handle, buf) = job;
+
+ // take ownership of the job buffer and complete it
+ {
+ let mut buf = buf.lock();
+ match buf.op {
+ Operation::Encryption => {
+ // TODO: encryption
+ buf.status = Status::Done;
+ }
+ Operation::Decryption => {
+ // TODO: decryption
+ buf.status = Status::Done;
+ }
+ }
+ }
+
+ // ensure consumer is unparked
+ handle.thread().unpark();
+ }
+ None => {
+ // no jobs, park the worker
+ parked.store(true, Ordering::Release);
+ thread::park();
+ }
+ }
+ }
+}
diff --git a/src/types/endpoint.rs b/src/types/endpoint.rs
new file mode 100644
index 0000000..d97905a
--- /dev/null
+++ b/src/types/endpoint.rs
@@ -0,0 +1,6 @@
+use std::net::SocketAddr;
+
+/* The generic implementation (not supporting "sticky-sockets"),
+ * is to simply use SocketAddr directly as the endpoint.
+ */
+pub trait Endpoint: Into<SocketAddr> {}
diff --git a/src/types/mod.rs b/src/types/mod.rs
index 868fb71..8da6d45 100644
--- a/src/types/mod.rs
+++ b/src/types/mod.rs
@@ -1,7 +1,9 @@
+mod endpoint;
mod keys;
mod tun;
mod udp;
+pub use endpoint::Endpoint;
pub use keys::{Key, KeyPair};
pub use tun::Tun;
-pub use udp::Bind; \ No newline at end of file
+pub use udp::Bind;
diff --git a/src/types/udp.rs b/src/types/udp.rs
index f45cf85..00e218f 100644
--- a/src/types/udp.rs
+++ b/src/types/udp.rs
@@ -1,26 +1,27 @@
+use super::Endpoint;
use std::error;
/* Often times an a file descriptor in an atomic might suffice.
*/
-pub trait Bind<Endpoint>: Send + Sync {
- type Error : error::Error;
+pub trait Bind: Send + Sync {
+ type Error: error::Error;
+ type Endpoint: Endpoint;
fn new() -> Self;
/// Updates the port of the Bind
- ///
+ ///
/// # Arguments
- ///
+ ///
/// - port, The new port to bind to. 0 means any available port.
- ///
+ ///
/// # Returns
- ///
+ ///
/// The unit type or an error, if binding fails
fn set_port(&self, port: u16) -> Result<(), Self::Error>;
/// Returns the current port of the bind
fn get_port(&self) -> u16;
-
- fn recv(&self, dst: &mut [u8]) -> Endpoint;
- fn send(&self, src: &[u8], dst: &Endpoint);
+ fn recv(&self, dst: &mut [u8]) -> Self::Endpoint;
+ fn send(&self, src: &[u8], dst: &Self::Endpoint);
}