From 7e727d120b4c7375b7dd2f1210a883c876531c06 Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Tue, 20 Aug 2019 14:33:11 +0200 Subject: Restructure and job stealing work queue --- src/main.rs | 4 +- src/router/device.rs | 394 +++---------------------------------------------- src/router/mod.rs | 6 +- src/router/outbound.rs | 32 ---- src/router/peer.rs | 285 +++++++++++++++++++++++++++++++++++ src/router/workers.rs | 153 +++++++++++++++++++ src/types/endpoint.rs | 6 + src/types/mod.rs | 4 +- src/types/udp.rs | 19 +-- 9 files changed, 487 insertions(+), 416 deletions(-) delete mode 100644 src/router/outbound.rs create mode 100644 src/router/peer.rs create mode 100644 src/router/workers.rs create mode 100644 src/types/endpoint.rs (limited to 'src') diff --git a/src/main.rs b/src/main.rs index 272c5e2..aa73fd2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,7 +14,7 @@ fn main() { // choose optimal crypto implementations for platform sodiumoxide::init().unwrap(); - let mut rdev = router::Device::new(8); + let mut router = router::Device::new(8); - let pref = rdev.new_peer(); + let peer = router.new_peer(); } diff --git a/src/router/device.rs b/src/router/device.rs index 8f3d485..d92250e 100644 --- a/src/router/device.rs +++ b/src/router/device.rs @@ -16,149 +16,37 @@ use spin; use super::super::constants::*; use super::super::types::KeyPair; use super::anti_replay::AntiReplay; - -use std::u64; - -const MAX_STAGED_PACKETS: usize = 128; - -struct DeviceInner { - stopped: AtomicBool, - injector: Injector<()>, // parallel enc/dec task injector - threads: Vec>, // join handles of worker threads - recv: spin::RwLock>, // receiver id -> decryption state - ipv4: spin::RwLock>>, // ipv4 cryptkey routing - ipv6: spin::RwLock>>, // ipv6 cryptkey routing -} - -struct PeerInner { - stopped: AtomicBool, - device: Arc, - thread_outbound: spin::Mutex>, - thread_inbound: spin::Mutex>, - inorder_outbound: SyncSender<()>, - inorder_inbound: SyncSender<()>, - staged_packets: spin::Mutex; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake - rx_bytes: AtomicU64, // received bytes - tx_bytes: AtomicU64, // transmitted bytes - keys: spin::Mutex, // key-wheel - ekey: spin::Mutex>, // encryption state - endpoint: spin::Mutex>>, -} - -struct EncryptionState { - key: [u8; 32], // encryption key - id: u32, // sender id - nonce: u64, // next available nonce - death: Instant, // time when the key no longer can be used for encryption - // (birth + reject-after-time - keepalive-timeout - rekey-timeout) +use super::peer; +use super::peer::{Peer, PeerInner}; +use super::workers; + +pub struct DeviceInner { + pub stopped: AtomicBool, + pub injector: Injector<()>, // parallel enc/dec task injector + pub threads: Vec>, // join handles of worker threads + pub recv: spin::RwLock>, // receiver id -> decryption state + pub ipv4: spin::RwLock>>, // ipv4 cryptkey routing + pub ipv6: spin::RwLock>>, // ipv6 cryptkey routing } -struct DecryptionState { - key: [u8; 32], - // keypair: Weak, - protector: spin::Mutex, - peer: Weak, - death: Instant, // time when the key can no longer be used for decryption +pub struct EncryptionState { + pub key: [u8; 32], // encryption key + pub id: u32, // sender id + pub nonce: u64, // next available nonce + pub death: Instant, // time when the key no longer can be used for encryption + // (birth + reject-after-time - keepalive-timeout - rekey-timeout) } -struct KeyWheel { - next: Option>, // next key state (unconfirmed) - current: Option>, // current key state (used for encryption) - previous: Option>, // old key state (used for decryption) - retired: Option, // retired id (previous id, after confirming key-pair) +pub struct DecryptionState { + pub key: [u8; 32], + pub keypair: Weak, + pub protector: spin::Mutex, + pub peer: Weak, + pub death: Instant, // time when the key can no longer be used for decryption } -pub struct Peer(Arc); pub struct Device(Arc); -fn treebit_list( - peer: &Arc, - table: &spin::RwLock>>, - callback: Box R>, -) -> Vec -where - A: Address, -{ - let mut res = Vec::new(); - for subnet in table.read().iter() { - let (ip, masklen, p) = subnet; - if let Some(p) = p.upgrade() { - if Arc::ptr_eq(&p, &peer) { - res.push(callback(ip, masklen)) - } - } - } - res -} - -fn treebit_remove(peer: &Peer, table: &spin::RwLock>>) -where - A: Address, -{ - let mut m = table.write(); - - // collect keys for value - let mut subnets = vec![]; - for subnet in m.iter() { - let (ip, masklen, p) = subnet; - if let Some(p) = p.upgrade() { - if Arc::ptr_eq(&p, &peer.0) { - subnets.push((ip, masklen)) - } - } - } - - // remove all key mappings - for subnet in subnets { - let r = m.remove(subnet.0, subnet.1); - debug_assert!(r.is_some()); - } -} - -impl Drop for Peer { - fn drop(&mut self) { - // mark peer as stopped - - let peer = &self.0; - peer.stopped.store(true, Ordering::SeqCst); - - // remove from cryptkey router - - treebit_remove(self, &peer.device.ipv4); - treebit_remove(self, &peer.device.ipv6); - - // unpark threads - - peer.thread_inbound.lock().thread().unpark(); - peer.thread_outbound.lock().thread().unpark(); - - // release ids from the receiver map - - let mut keys = peer.keys.lock(); - let mut release = Vec::with_capacity(3); - - keys.next.as_ref().map(|k| release.push(k.recv.id)); - keys.current.as_ref().map(|k| release.push(k.recv.id)); - keys.previous.as_ref().map(|k| release.push(k.recv.id)); - - if release.len() > 0 { - let mut recv = peer.device.recv.write(); - for id in &release { - recv.remove(id); - } - } - - // null key-material (TODO: extend) - - keys.next = None; - keys.current = None; - keys.previous = None; - - *peer.ekey.lock() = None; - *peer.endpoint.lock() = None; - } -} - impl Drop for Device { fn drop(&mut self) { // mark device as stopped @@ -175,163 +63,6 @@ impl Drop for Device { } } -impl PeerInner { - pub fn keypair_confirm(&self, kp: Weak) { - let mut keys = self.keys.lock(); - - // Attempt to upgrade Weak -> Arc - // (this should ensure that the key is in the key-wheel, - // which holds the only strong reference) - let kp = match kp.upgrade() { - Some(kp) => kp, - None => { - return; - } - }; - - debug_assert!( - keys.retired.is_none(), - "retired spot is not free for previous" - ); - - debug_assert!( - if let Some(key) = &keys.next { - Arc::ptr_eq(&kp, &key) - } else { - false - }, - "if next has been overwritten, before confirmation, the key-pair should have been dropped!" - ); - - // enable use for encryption and set confirmed - *self.ekey.lock() = Some(EncryptionState { - id: kp.send.id, - key: kp.send.key, - nonce: 0, - death: kp.birth + REJECT_AFTER_TIME, - }); - - // rotate the key-wheel - let release = keys.previous.as_ref().map(|k| k.recv.id); - keys.previous = keys.current.as_ref().map(|v| v.clone()); - keys.current = Some(kp.clone()); - keys.retired = release; - } -} - -/// Public interface and handle to the peer -impl Peer { - pub fn set_endpoint(&self, endpoint: SocketAddr) { - *self.0.endpoint.lock() = Some(Arc::new(endpoint)) - } - - /// Add a new keypair - /// - /// # Arguments - /// - /// - new: The new confirmed/unconfirmed key pair - /// - /// # Returns - /// - /// A vector of ids which has been released. - /// These should be released in the handshake module. - pub fn add_keypair(&self, new: KeyPair) -> Vec { - let mut keys = self.0.keys.lock(); - let mut release = Vec::with_capacity(2); - - // collect ids to be released - keys.retired.map(|v| release.push(v)); - keys.previous.as_ref().map(|k| release.push(k.recv.id)); - - // update key-wheel - if new.confirmed { - // start using key for encryption - *self.0.ekey.lock() = Some(EncryptionState { - id: new.send.id, - key: new.send.key, - nonce: 0, - death: new.birth + REJECT_AFTER_TIME, - }); - - // move current into previous - keys.previous = keys.current.as_ref().map(|v| v.clone());; - keys.current = Some(Arc::new(new)); - } else { - // store the key and await confirmation - keys.previous = keys.next.as_ref().map(|v| v.clone());; - keys.next = Some(Arc::new(new)); - }; - - // update incoming packet id map - { - let mut recv = self.0.device.recv.write(); - - // purge recv map of released ids - for id in &release { - recv.remove(&id); - } - - // map new id to keypair - debug_assert!(!recv.contains_key(&new.recv.id)); - - recv.insert( - new.recv.id, - DecryptionState { - key: new.recv.key, - protector: spin::Mutex::new(AntiReplay::new()), - peer: Arc::downgrade(&self.0), - death: new.birth + REJECT_AFTER_TIME, - }, - ); - } - - // return the released id (for handshake state machine) - release - } - - pub fn rx_bytes(&self) -> u64 { - self.0.rx_bytes.load(Ordering::Relaxed) - } - - pub fn tx_bytes(&self) -> u64 { - self.0.tx_bytes.load(Ordering::Relaxed) - } - - pub fn add_subnet(&self, ip: IpAddr, masklen: u32) { - match ip { - IpAddr::V4(v4) => { - self.0 - .device - .ipv4 - .write() - .insert(v4, masklen, Arc::downgrade(&self.0)) - } - IpAddr::V6(v6) => { - self.0 - .device - .ipv6 - .write() - .insert(v6, masklen, Arc::downgrade(&self.0)) - } - }; - } - - pub fn list_subnets(&self) -> Vec<(IpAddr, u32)> { - let mut res = Vec::new(); - res.append(&mut treebit_list( - &self.0, - &self.0.device.ipv4, - Box::new(|ip, masklen| (IpAddr::V4(ip), masklen)), - )); - res.append(&mut treebit_list( - &self.0, - &self.0.device.ipv6, - Box::new(|ip, masklen| (IpAddr::V6(ip), masklen)), - )); - res - } -} - impl Device { pub fn new(workers: usize) -> Device { Device(Arc::new(DeviceInner { @@ -350,34 +81,7 @@ impl Device { /// /// A atomic ref. counted peer (with liftime matching the device) pub fn new_peer(&self) -> Peer { - // spawn inbound thread - let (send_inbound, recv_inbound) = sync_channel(1); - let handle_inbound = thread::spawn(move || {}); - - // spawn outbound thread - let (send_outbound, recv_inbound) = sync_channel(1); - let handle_outbound = thread::spawn(move || {}); - - // allocate peer object - Peer(Arc::new(PeerInner { - stopped: AtomicBool::new(false), - device: self.0.clone(), - ekey: spin::Mutex::new(None), - endpoint: spin::Mutex::new(None), - inorder_inbound: send_inbound, - inorder_outbound: send_outbound, - keys: spin::Mutex::new(KeyWheel { - next: None, - current: None, - previous: None, - retired: None, - }), - rx_bytes: AtomicU64::new(0), - tx_bytes: AtomicU64::new(0), - staged_packets: spin::Mutex::new(ArrayDeque::new()), - thread_inbound: spin::Mutex::new(handle_inbound), - thread_outbound: spin::Mutex::new(handle_outbound), - })) + peer::new_peer(self.0.clone()) } /// Cryptkey routes and sends a plaintext message (IP packet) @@ -396,39 +100,6 @@ impl Device { unimplemented!(); } - /// Sends a message directly to the peer. - /// The router device takes care of discovering/managing the endpoint. - /// This is used for handshake initiation/response messages - /// - /// # Arguments - /// - /// - peer: Reference to the destination peer - /// - msg: Message to transmit - pub fn send_raw(&self, peer: Arc, msg: &mut [u8]) { - unimplemented!(); - } - - /// Flush the queue of buffered messages awaiting transmission - /// - /// # Arguments - /// - /// - peer: Reference for the peer to flush - pub fn flush_queue(&self, peer: Arc) { - unimplemented!(); - } - - /// Attempt to route, encrypt and send all elements buffered in the queue - /// - /// # Arguments - /// - /// # Returns - /// - /// A boolean indicating whether packages where sent. - /// Note: This is used for implicit confirmation of handshakes. - pub fn send_run_queue(&self, peer: Arc) -> bool { - unimplemented!(); - } - /// Receive an encrypted transport message /// /// # Arguments @@ -437,21 +108,4 @@ impl Device { pub fn recv(&self, ct_msg: &mut [u8]) { unimplemented!(); } - - /// Returns the current endpoint known for the peer - /// - /// # Arguments - /// - /// - peer: The peer to retrieve the endpoint for - pub fn get_endpoint(&self, peer: Arc) -> SocketAddr { - unimplemented!(); - } - - pub fn set_endpoint(&self, peer: Arc, endpoint: SocketAddr) { - unimplemented!(); - } - - pub fn new_keypair(&self, peer: Arc, keypair: KeyPair) { - unimplemented!(); - } } diff --git a/src/router/mod.rs b/src/router/mod.rs index 7055875..6a4dd61 100644 --- a/src/router/mod.rs +++ b/src/router/mod.rs @@ -2,6 +2,8 @@ mod anti_replay; mod buffer; mod device; // mod inbound; -// mod outbound; +mod workers; +mod peer; -pub use device::{Device, Peer}; \ No newline at end of file +pub use peer::Peer; +pub use device::Device; \ No newline at end of file diff --git a/src/router/outbound.rs b/src/router/outbound.rs deleted file mode 100644 index cc6a4bf..0000000 --- a/src/router/outbound.rs +++ /dev/null @@ -1,32 +0,0 @@ -use spin; -use std::thread; -use std::sync::Arc; -use std::sync::mpsc::{Receiver, sync_channel}; - -struct JobInner { - done : bool, // is encryption complete? - msg : Vec, // transport message (id, nonce already set) - key : [u8; 32], // encryption key - handle : thread::JoinHandle -} - -type Job = Arc>; - -fn worker_parallel() - -fn worker_inorder(channel : Receiver) { - for ordered in channel.recv().iter() { - loop { - // check if job is complete - match ordered.try_lock() { - None => (), - Some(guard) => if guard.done { - // write to UDP interface - } - } - - // wait for job to complete - thread::park(); - } - } -} \ No newline at end of file diff --git a/src/router/peer.rs b/src/router/peer.rs new file mode 100644 index 0000000..f7b8bf4 --- /dev/null +++ b/src/router/peer.rs @@ -0,0 +1,285 @@ +use std::sync::atomic::{AtomicU64, AtomicBool, Ordering}; +use std::sync::{Weak, Arc}; +use std::thread; + +use std::net::{IpAddr, SocketAddr}; + +use std::sync::mpsc::{sync_channel, SyncSender}; + +use spin; + +use arraydeque::{ArrayDeque, Wrapping}; + +use treebitmap::IpLookupTable; +use treebitmap::address::Address; + +use super::super::types::KeyPair; +use super::super::constants::*; + +use super::anti_replay::AntiReplay; +use super::device::DeviceInner; +use super::device::EncryptionState; +use super::device::DecryptionState; + +const MAX_STAGED_PACKETS: usize = 128; + +struct KeyWheel { + next: Option>, // next key state (unconfirmed) + current: Option>, // current key state (used for encryption) + previous: Option>, // old key state (used for decryption) + retired: Option, // retired id (previous id, after confirming key-pair) +} + +pub struct PeerInner { + stopped: AtomicBool, + device: Arc, + thread_outbound: spin::Mutex>, + thread_inbound: spin::Mutex>, + inorder_outbound: SyncSender<()>, + inorder_inbound: SyncSender<()>, + staged_packets: spin::Mutex; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake + rx_bytes: AtomicU64, // received bytes + tx_bytes: AtomicU64, // transmitted bytes + keys: spin::Mutex, // key-wheel + ekey: spin::Mutex>, // encryption state + endpoint: spin::Mutex>>, +} + +pub struct Peer(Arc); + +fn treebit_list( + peer: &Arc, + table: &spin::RwLock>>, + callback: Box R>, +) -> Vec +where + A: Address, +{ + let mut res = Vec::new(); + for subnet in table.read().iter() { + let (ip, masklen, p) = subnet; + if let Some(p) = p.upgrade() { + if Arc::ptr_eq(&p, &peer) { + res.push(callback(ip, masklen)) + } + } + } + res +} + +fn treebit_remove(peer: &Peer, table: &spin::RwLock>>) +where + A: Address, +{ + let mut m = table.write(); + + // collect keys for value + let mut subnets = vec![]; + for subnet in m.iter() { + let (ip, masklen, p) = subnet; + if let Some(p) = p.upgrade() { + if Arc::ptr_eq(&p, &peer.0) { + subnets.push((ip, masklen)) + } + } + } + + // remove all key mappings + for subnet in subnets { + let r = m.remove(subnet.0, subnet.1); + debug_assert!(r.is_some()); + } +} + +impl Drop for Peer { + fn drop(&mut self) { + // mark peer as stopped + + let peer = &self.0; + peer.stopped.store(true, Ordering::SeqCst); + + // remove from cryptkey router + + treebit_remove(self, &peer.device.ipv4); + treebit_remove(self, &peer.device.ipv6); + + // unpark threads + + peer.thread_inbound.lock().thread().unpark(); + peer.thread_outbound.lock().thread().unpark(); + + // release ids from the receiver map + + let mut keys = peer.keys.lock(); + let mut release = Vec::with_capacity(3); + + keys.next.as_ref().map(|k| release.push(k.recv.id)); + keys.current.as_ref().map(|k| release.push(k.recv.id)); + keys.previous.as_ref().map(|k| release.push(k.recv.id)); + + if release.len() > 0 { + let mut recv = peer.device.recv.write(); + for id in &release { + recv.remove(id); + } + } + + // null key-material (TODO: extend) + + keys.next = None; + keys.current = None; + keys.previous = None; + + *peer.ekey.lock() = None; + *peer.endpoint.lock() = None; + } +} + +pub fn new_peer(device: Arc) -> Peer { + // spawn inbound thread + let (send_inbound, recv_inbound) = sync_channel(1); + let handle_inbound = thread::spawn(move || {}); + + // spawn outbound thread + let (send_outbound, recv_inbound) = sync_channel(1); + let handle_outbound = thread::spawn(move || {}); + + // allocate peer object + Peer::new(PeerInner { + stopped: AtomicBool::new(false), + device: device, + ekey: spin::Mutex::new(None), + endpoint: spin::Mutex::new(None), + inorder_inbound: send_inbound, + inorder_outbound: send_outbound, + keys: spin::Mutex::new(KeyWheel { + next: None, + current: None, + previous: None, + retired: None, + }), + rx_bytes: AtomicU64::new(0), + tx_bytes: AtomicU64::new(0), + staged_packets: spin::Mutex::new(ArrayDeque::new()), + thread_inbound: spin::Mutex::new(handle_inbound), + thread_outbound: spin::Mutex::new(handle_outbound), + }) +} + +impl Peer { + fn new(inner : PeerInner) -> Peer { + Peer(Arc::new(inner)) + } + + pub fn set_endpoint(&self, endpoint: SocketAddr) { + *self.0.endpoint.lock() = Some(Arc::new(endpoint)) + } + + /// Add a new keypair + /// + /// # Arguments + /// + /// - new: The new confirmed/unconfirmed key pair + /// + /// # Returns + /// + /// A vector of ids which has been released. + /// These should be released in the handshake module. + pub fn add_keypair(&self, new: KeyPair) -> Vec { + let mut keys = self.0.keys.lock(); + let mut release = Vec::with_capacity(2); + let new = Arc::new(new); + + // collect ids to be released + keys.retired.map(|v| release.push(v)); + keys.previous.as_ref().map(|k| release.push(k.recv.id)); + + // update key-wheel + if new.confirmed { + // start using key for encryption + *self.0.ekey.lock() = Some(EncryptionState { + id: new.send.id, + key: new.send.key, + nonce: 0, + death: new.birth + REJECT_AFTER_TIME, + }); + + // move current into previous + keys.previous = keys.current.as_ref().map(|v| v.clone());; + keys.current = Some(new.clone()); + } else { + // store the key and await confirmation + keys.previous = keys.next.as_ref().map(|v| v.clone());; + keys.next = Some(new.clone()); + }; + + // update incoming packet id map + { + let mut recv = self.0.device.recv.write(); + + // purge recv map of released ids + for id in &release { + recv.remove(&id); + } + + // map new id to keypair + debug_assert!(!recv.contains_key(&new.recv.id)); + + recv.insert( + new.recv.id, + DecryptionState { + keypair: Arc::downgrade(&new), + key: new.recv.key, + protector: spin::Mutex::new(AntiReplay::new()), + peer: Arc::downgrade(&self.0), + death: new.birth + REJECT_AFTER_TIME, + }, + ); + } + + // return the released id (for handshake state machine) + release + } + + pub fn rx_bytes(&self) -> u64 { + self.0.rx_bytes.load(Ordering::Relaxed) + } + + pub fn tx_bytes(&self) -> u64 { + self.0.tx_bytes.load(Ordering::Relaxed) + } + + pub fn add_subnet(&self, ip: IpAddr, masklen: u32) { + match ip { + IpAddr::V4(v4) => { + self.0 + .device + .ipv4 + .write() + .insert(v4, masklen, Arc::downgrade(&self.0)) + } + IpAddr::V6(v6) => { + self.0 + .device + .ipv6 + .write() + .insert(v6, masklen, Arc::downgrade(&self.0)) + } + }; + } + + pub fn list_subnets(&self) -> Vec<(IpAddr, u32)> { + let mut res = Vec::new(); + res.append(&mut treebit_list( + &self.0, + &self.0.device.ipv4, + Box::new(|ip, masklen| (IpAddr::V4(ip), masklen)), + )); + res.append(&mut treebit_list( + &self.0, + &self.0.device.ipv6, + Box::new(|ip, masklen| (IpAddr::V6(ip), masklen)), + )); + res + } +} \ No newline at end of file diff --git a/src/router/workers.rs b/src/router/workers.rs new file mode 100644 index 0000000..2117190 --- /dev/null +++ b/src/router/workers.rs @@ -0,0 +1,153 @@ +use super::device::DecryptionState; +use super::device::DeviceInner; +use super::peer::PeerInner; + +use crossbeam_deque::{Injector, Steal, Stealer, Worker}; +use spin; +use std::iter; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{sync_channel, Receiver}; +use std::sync::{Arc, Weak}; +use std::thread; + +#[derive(PartialEq)] +enum Operation { + Encryption, + Decryption, +} + +#[derive(PartialEq)] +enum Status { + Fault, // unsealing failed + Done, // job valid and complete + Waiting, // job awaiting completion +} + +struct JobInner { + msg: Vec, // message buffer (nonce and receiver id set) + key: [u8; 32], // chacha20poly1305 key + status: Status, // state of the job + op: Operation, // should be buffer be encrypted / decrypted? +} + +type JobBuffer = Arc>; +type JobParallel = (Arc>, JobBuffer); +type JobInbound = (Arc, JobBuffer); +type JobOutbound = (Weak, JobBuffer); + +/* Strategy for workers acquiring a new job: + * + * 1. Try the local job queue (owned by the thread) + * 2. Try fetching a batch of jobs from the global injector + * 3. Attempt to steal jobs from other threads. + */ +fn find_task(local: &Worker, global: &Injector, stealers: &[Stealer]) -> Option { + local.pop().or_else(|| { + iter::repeat_with(|| { + global + .steal_batch_and_pop(local) + .or_else(|| stealers.iter().map(|s| s.steal()).collect()) + }) + .find(|s| !s.is_retry()) + .and_then(|s| s.success()) + }) +} + +fn worker_inbound( + device: Arc, // related device + peer: Arc, // related peer + recv: Receiver, // in order queue +) { + // reads from in order channel + for job in recv.recv().iter() { + loop { + let (state, buf) = job; + + // check if job is complete + match buf.try_lock() { + None => (), + Some(buf) => { + if buf.status != Status::Waiting { + // check replay protector + + // check if confirms keypair + + // write to tun device + + // continue to next job (no parking) + break; + } + } + } + + // wait for job to complete + thread::park(); + } + } +} + +fn worker_outbound( + device: Arc, // related device + peer: Arc, // related peer + recv: Receiver, // in order queue +) { + // reads from in order channel + for job in recv.recv().iter() { + loop { + let (peer, buf) = job; + + // check if job is complete + match buf.try_lock() { + None => (), + Some(buf) => { + if buf.status != Status::Waiting { + // send buffer to peer endpoint + break; + } + } + } + + // wait for job to complete + thread::park(); + } + } +} + +fn worker_parallel( + stopped: Arc, // stop workers (device has been dropped) + parked: Arc, // thread has been parked? + local: Worker, // local job queue (local to thread) + global: Injector, // global job injector + stealers: Vec>, // stealers (from other threads) +) { + while !stopped.load(Ordering::SeqCst) { + match find_task(&local, &global, &stealers) { + Some(job) => { + let (handle, buf) = job; + + // take ownership of the job buffer and complete it + { + let mut buf = buf.lock(); + match buf.op { + Operation::Encryption => { + // TODO: encryption + buf.status = Status::Done; + } + Operation::Decryption => { + // TODO: decryption + buf.status = Status::Done; + } + } + } + + // ensure consumer is unparked + handle.thread().unpark(); + } + None => { + // no jobs, park the worker + parked.store(true, Ordering::Release); + thread::park(); + } + } + } +} diff --git a/src/types/endpoint.rs b/src/types/endpoint.rs new file mode 100644 index 0000000..d97905a --- /dev/null +++ b/src/types/endpoint.rs @@ -0,0 +1,6 @@ +use std::net::SocketAddr; + +/* The generic implementation (not supporting "sticky-sockets"), + * is to simply use SocketAddr directly as the endpoint. + */ +pub trait Endpoint: Into {} diff --git a/src/types/mod.rs b/src/types/mod.rs index 868fb71..8da6d45 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -1,7 +1,9 @@ +mod endpoint; mod keys; mod tun; mod udp; +pub use endpoint::Endpoint; pub use keys::{Key, KeyPair}; pub use tun::Tun; -pub use udp::Bind; \ No newline at end of file +pub use udp::Bind; diff --git a/src/types/udp.rs b/src/types/udp.rs index f45cf85..00e218f 100644 --- a/src/types/udp.rs +++ b/src/types/udp.rs @@ -1,26 +1,27 @@ +use super::Endpoint; use std::error; /* Often times an a file descriptor in an atomic might suffice. */ -pub trait Bind: Send + Sync { - type Error : error::Error; +pub trait Bind: Send + Sync { + type Error: error::Error; + type Endpoint: Endpoint; fn new() -> Self; /// Updates the port of the Bind - /// + /// /// # Arguments - /// + /// /// - port, The new port to bind to. 0 means any available port. - /// + /// /// # Returns - /// + /// /// The unit type or an error, if binding fails fn set_port(&self, port: u16) -> Result<(), Self::Error>; /// Returns the current port of the bind fn get_port(&self) -> u16; - - fn recv(&self, dst: &mut [u8]) -> Endpoint; - fn send(&self, src: &[u8], dst: &Endpoint); + fn recv(&self, dst: &mut [u8]) -> Self::Endpoint; + fn send(&self, src: &[u8], dst: &Self::Endpoint); } -- cgit v1.2.3-59-g8ed1b