From 6d11da441bde4fa75eef755bef4c97f0d1f6a29b Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Wed, 4 Sep 2019 19:08:13 +0200 Subject: Simply passing of JobBuffer ownership --- Cargo.lock | 25 +++++ Cargo.toml | 1 + src/router/device.rs | 80 +++++++--------- src/router/peer.rs | 133 ++++++++++++-------------- src/router/tests.rs | 2 +- src/router/workers.rs | 259 +++++++++++++++----------------------------------- src/types/bind.rs | 73 ++++++++++++++ 7 files changed, 274 insertions(+), 299 deletions(-) create mode 100644 src/types/bind.rs diff --git a/Cargo.lock b/Cargo.lock index ecebf22..ae27b0e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -236,6 +236,11 @@ dependencies = [ "generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "either" +version = "1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "failure" version = "0.1.5" @@ -285,6 +290,22 @@ name = "futures" version = "0.1.28" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "futures-channel" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures-core 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "futures-core" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "either 1.5.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "generic-array" version = "0.12.3" @@ -1486,6 +1507,7 @@ dependencies = [ "crossbeam-deque 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "digest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-channel 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)", "hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "hjul 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1578,6 +1600,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum crypto-mac 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4434400df11d95d556bac068ddfedd482915eb18fe8bea89bc80b6e4b1c179e5" "checksum curve25519-dalek 1.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8b7dcd30ba50cdf88b55b033456138b7c0ac4afdc436d82e1b79f370f24cc66d" "checksum digest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5" +"checksum either 1.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "5527cfe0d098f36e3f8839852688e63c8fff1c90b2b405aef730615f9a7bcf7b" "checksum failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "795bd83d3abeb9220f257e597aa0080a508b27533824adf336529648f6abf7e2" "checksum failure_derive 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "ea1063915fd7ef4309e222a5a07cf9c319fb9c7836b1f89b85458672dbb127e1" "checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3" @@ -1585,6 +1608,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" "checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" "checksum futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)" = "45dc39533a6cae6da2b56da48edae506bb767ec07370f86f70fc062e9d435869" +"checksum futures-channel 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "bbb37ec6418c577b25f5b129c0f4456ad7ce8714ec43c59712aa7e4cd2cb6b85" +"checksum futures-core 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7455c91eb2eae38f33b013f77ebe766c75761af333efd9d550e154045c63e225" "checksum generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c68f0274ae0e023facc3c97b2e00f076be70e254bc851d972503b328db79b2ec" "checksum getrandom 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "fc344b02d3868feb131e8b5fe2b9b0a1cc42942679af493061fc13b853243872" "checksum glob 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "8be18de09a56b60ed0edf84bc9df007e30040691af7acd1c41874faac5895bfb" diff --git a/Cargo.toml b/Cargo.toml index 620c97f..f0fe908 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ chacha20poly1305 = "^0.1" aead = "^0.1.1" clear_on_drop = "0.2.3" parking_lot = "^0.9" +futures-channel = "^0.2" [dependencies.x25519-dalek] version = "^0.5" diff --git a/src/router/device.rs b/src/router/device.rs index ec6453d..58ca2f6 100644 --- a/src/router/device.rs +++ b/src/router/device.rs @@ -1,14 +1,13 @@ use std::cmp; use std::collections::HashMap; use std::net::{Ipv4Addr, Ipv6Addr}; -use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::mpsc::sync_channel; +use std::sync::mpsc::SyncSender; use std::sync::{Arc, Weak}; use std::thread; use std::time::Instant; -use parking_lot::{Condvar, Mutex}; - -use crossbeam_deque::{Injector, Worker}; use spin; use treebitmap::IpLookupTable; @@ -41,11 +40,6 @@ pub struct DeviceInner { pub call_send: C::CallbackSend, pub call_need_key: C::CallbackKey, - // threading and workers - pub waker: (Mutex<()>, Condvar), - pub running: AtomicBool, // workers running? - pub injector: Injector>, // parallel enc/dec task injector - // routing pub recv: spin::RwLock>>, // receiver id -> decryption state pub ipv4: spin::RwLock>>>, // ipv4 cryptkey routing @@ -68,19 +62,20 @@ pub struct DecryptionState { pub death: Instant, // time when the key can no longer be used for decryption } -pub struct Device( - Arc>, // reference to device state - Vec>, // join handles for workers -); +pub struct Device { + pub state: Arc>, // reference to device state + pub handles: Vec>, // join handles for workers + pub queue_next: AtomicUsize, // next round-robin index + pub queues: Vec>>, // work queues (1 per thread) +} impl Drop for Device { fn drop(&mut self) { - // mark device as stopped - let device = &self.0; - device.running.store(false, Ordering::SeqCst); + // drop all queues + while self.queues.pop().is_some() {} // join all worker threads - while match self.1.pop() { + while match self.handles.pop() { Some(handle) => { handle.thread().unpark(); handle.join().unwrap(); @@ -98,8 +93,8 @@ impl, S: Callback, K: KeyCallback, T: Tun, B: Bi num_workers: usize, tun: T, bind: B, - call_recv: R, call_send: S, + call_recv: R, call_need_key: K, ) -> Device, T, B> { // allocate shared device state @@ -109,36 +104,31 @@ impl, S: Callback, K: KeyCallback, T: Tun, B: Bi call_recv, call_send, call_need_key, - waker: (Mutex::new(()), Condvar::new()), - running: AtomicBool::new(true), - injector: Injector::new(), recv: spin::RwLock::new(HashMap::new()), ipv4: spin::RwLock::new(IpLookupTable::new()), ipv6: spin::RwLock::new(IpLookupTable::new()), }); - // allocate work pool resources - let mut workers = Vec::with_capacity(num_workers); - let mut stealers = Vec::with_capacity(num_workers); - for _ in 0..num_workers { - let w = Worker::new_fifo(); - stealers.push(w.stealer()); - workers.push(w); - } - // start worker threads + let mut queues = Vec::with_capacity(num_workers); let mut threads = Vec::with_capacity(num_workers); for _ in 0..num_workers { + // allocate work queue + let (tx, rx) = sync_channel(128); + queues.push(spin::Mutex::new(tx)); + + // start worker thread let device = inner.clone(); - let stealers = stealers.clone(); - let worker = workers.pop().unwrap(); - threads.push(thread::spawn(move || { - worker_parallel(device, worker, stealers) - })); + threads.push(thread::spawn(move || worker_parallel(device, rx))); } // return exported device handle - Device(inner, threads) + Device { + state: inner, + handles: threads, + queue_next: AtomicUsize::new(0), + queues: queues, + } } } @@ -149,7 +139,7 @@ impl Device { /// /// A atomic ref. counted peer (with liftime matching the device) pub fn new_peer(&self, opaque: C::Opaque) -> Peer { - peer::new_peer(self.0.clone(), opaque) + peer::new_peer(self.state.clone(), opaque) } /// Cryptkey routes and sends a plaintext message (IP packet) @@ -177,7 +167,7 @@ impl Device { let dst = Ipv4Addr::from(dst); // lookup peer (project unto and clone "value" field) - self.0 + self.state .ipv4 .read() .longest_match(dst) @@ -195,7 +185,7 @@ impl Device { let dst = Ipv6Addr::from(dst); // lookup peer (project unto and clone "value" field) - self.0 + self.state .ipv6 .read() .longest_match(dst) @@ -210,12 +200,12 @@ impl Device { // schedule for encryption and transmission to peer if let Some(job) = peer.send_job(msg) { - // add job to parallel worker pool - self.0.injector.push((peer.clone(), job)); - - // ensure workers running, TODO: something faster - let &(_, ref cvar) = &self.0.waker; - cvar.notify_all(); + // add job to worker queue + let idx = self.queue_next.fetch_add(1, Ordering::SeqCst); + self.queues[idx % self.queues.len()] + .lock() + .send(job) + .unwrap(); } Ok(()) diff --git a/src/router/peer.rs b/src/router/peer.rs index a85d87a..a31dfcf 100644 --- a/src/router/peer.rs +++ b/src/router/peer.rs @@ -22,9 +22,12 @@ use super::device::DeviceInner; use super::device::EncryptionState; use super::messages::TransportHeader; +use futures::sync::oneshot; +use futures::*; + +use super::workers::Operation; use super::workers::{worker_inbound, worker_outbound}; -use super::workers::{JobBuffer, JobInbound, JobInner, JobOutbound}; -use super::workers::{Operation, Status}; +use super::workers::{JobBuffer, JobInbound, JobOutbound, JobParallel}; use super::types::Callbacks; @@ -40,11 +43,9 @@ pub struct KeyWheel { pub struct PeerInner { pub stopped: AtomicBool, pub opaque: C::Opaque, - pub outbound: Mutex>, - pub inbound: Mutex; MAX_STAGED_PACKETS], Saturating>>, + pub outbound: Mutex>, + pub inbound: Mutex>>, pub device: Arc>, - pub thread_outbound: Mutex>>, - pub thread_inbound: Mutex>>, pub staged_packets: Mutex; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake pub rx_bytes: AtomicU64, // received bytes pub tx_bytes: AtomicU64, // transmitted bytes @@ -53,7 +54,11 @@ pub struct PeerInner { pub endpoint: Mutex>>, } -pub struct Peer(Arc>); +pub struct Peer { + state: Arc>, + thread_outbound: thread::JoinHandle<()>, + thread_inbound: thread::JoinHandle<()>, +} fn treebit_list( peer: &Arc>, @@ -86,15 +91,15 @@ fn treebit_remove( for subnet in m.iter() { let (ip, masklen, p) = subnet; if let Some(p) = p.upgrade() { - if Arc::ptr_eq(&p, &peer.0) { + if Arc::ptr_eq(&p, &peer.state) { subnets.push((ip, masklen)) } } } // remove all key mappings - for subnet in subnets { - let r = m.remove(subnet.0, subnet.1); + for (ip, masklen) in subnets { + let r = m.remove(ip, masklen); debug_assert!(r.is_some()); } } @@ -103,7 +108,7 @@ impl Drop for Peer { fn drop(&mut self) { // mark peer as stopped - let peer = &self.0; + let peer = &self.state; peer.stopped.store(true, Ordering::SeqCst); // remove from cryptkey router @@ -111,22 +116,6 @@ impl Drop for Peer { treebit_remove(self, &peer.device.ipv4); treebit_remove(self, &peer.device.ipv6); - // unpark threads - - peer.thread_inbound - .lock() - .as_ref() - .unwrap() - .thread() - .unpark(); - - peer.thread_outbound - .lock() - .as_ref() - .unwrap() - .thread() - .unpark(); - // release ids from the receiver map let mut keys = peer.keys.lock(); @@ -158,13 +147,16 @@ pub fn new_peer( device: Arc>, opaque: C::Opaque, ) -> Peer { + let (out_tx, out_rx) = sync_channel(128); + let (in_tx, in_rx) = sync_channel(128); + // allocate peer object let peer = { let device = device.clone(); Arc::new(PeerInner { opaque, - inbound: Mutex::new(ArrayDeque::new()), - outbound: Mutex::new(ArrayDeque::new()), + inbound: Mutex::new(in_tx), + outbound: Mutex::new(out_tx), stopped: AtomicBool::new(false), device: device, ekey: spin::Mutex::new(None), @@ -178,26 +170,28 @@ pub fn new_peer( rx_bytes: AtomicU64::new(0), tx_bytes: AtomicU64::new(0), staged_packets: spin::Mutex::new(ArrayDeque::new()), - thread_inbound: spin::Mutex::new(None), - thread_outbound: spin::Mutex::new(None), }) }; // spawn outbound thread - *peer.thread_inbound.lock() = { + let thread_inbound = { let peer = peer.clone(); let device = device.clone(); - Some(thread::spawn(move || worker_outbound(device, peer))) + thread::spawn(move || worker_outbound(device, peer, out_rx)) }; // spawn inbound thread - *peer.thread_outbound.lock() = { + let thread_outbound = { let peer = peer.clone(); let device = device.clone(); - Some(thread::spawn(move || worker_inbound(device, peer))) + thread::spawn(move || worker_inbound(device, peer, in_rx)) }; - Peer(peer) + Peer { + state: peer, + thread_inbound, + thread_outbound, + } } impl PeerInner { @@ -209,7 +203,7 @@ impl PeerInner { // rotate key-wheel } - pub fn send_job(&self, mut msg: Vec) -> Option { + pub fn send_job(&self, mut msg: Vec) -> Option { debug_assert!(msg.len() >= mem::size_of::()); // parse / cast @@ -239,29 +233,26 @@ impl PeerInner { } }; - // create job - let job = Arc::new(spin::Mutex::new(JobInner { - msg, - key, - status: Status::Waiting, - op: Operation::Encryption, - })); - - // add job to in-order queue and return to device for inclusion in worker pool - match self.outbound.lock().push_back(job.clone()) { - Ok(_) => Some(job), + // add job to in-order queue and return sendeer to device for inclusion in worker pool + let (tx, rx) = oneshot(); + match self.outbound.lock().try_send(rx) { + Ok(_) => Some(( + tx, + JobBuffer { + msg, + key, + okay: false, + op: Operation::Encryption, + }, + )), Err(_) => None, } } } impl Peer { - fn new(inner: PeerInner) -> Peer { - Peer(Arc::new(inner)) - } - pub fn set_endpoint(&self, endpoint: SocketAddr) { - *self.0.endpoint.lock() = Some(Arc::new(endpoint)) + *self.state.endpoint.lock() = Some(Arc::new(endpoint)) } /// Add a new keypair @@ -275,7 +266,7 @@ impl Peer { /// A vector of ids which has been released. /// These should be released in the handshake module. pub fn add_keypair(&self, new: KeyPair) -> Vec { - let mut keys = self.0.keys.lock(); + let mut keys = self.state.keys.lock(); let mut release = Vec::with_capacity(2); let new = Arc::new(new); @@ -286,7 +277,7 @@ impl Peer { // update key-wheel if new.initiator { // start using key for encryption - *self.0.ekey.lock() = Some(EncryptionState { + *self.state.ekey.lock() = Some(EncryptionState { id: new.send.id, key: new.send.key, nonce: 0, @@ -294,17 +285,17 @@ impl Peer { }); // move current into previous - keys.previous = keys.current.as_ref().map(|v| v.clone());; + keys.previous = keys.current.as_ref().map(|v| v.clone()); keys.current = Some(new.clone()); } else { // store the key and await confirmation - keys.previous = keys.next.as_ref().map(|v| v.clone());; + keys.previous = keys.next.as_ref().map(|v| v.clone()); keys.next = Some(new.clone()); }; // update incoming packet id map { - let mut recv = self.0.device.recv.write(); + let mut recv = self.state.device.recv.write(); // purge recv map of released ids for id in &release { @@ -321,7 +312,7 @@ impl Peer { keypair: Arc::downgrade(&new), key: new.recv.key, protector: spin::Mutex::new(AntiReplay::new()), - peer: Arc::downgrade(&self.0), + peer: Arc::downgrade(&self.state), death: new.birth + REJECT_AFTER_TIME, }, ); @@ -332,28 +323,28 @@ impl Peer { } pub fn rx_bytes(&self) -> u64 { - self.0.rx_bytes.load(Ordering::Relaxed) + self.state.rx_bytes.load(Ordering::Relaxed) } pub fn tx_bytes(&self) -> u64 { - self.0.tx_bytes.load(Ordering::Relaxed) + self.state.tx_bytes.load(Ordering::Relaxed) } pub fn add_subnet(&self, ip: IpAddr, masklen: u32) { match ip { IpAddr::V4(v4) => { - self.0 + self.state .device .ipv4 .write() - .insert(v4, masklen, Arc::downgrade(&self.0)) + .insert(v4, masklen, Arc::downgrade(&self.state)) } IpAddr::V6(v6) => { - self.0 + self.state .device .ipv6 .write() - .insert(v6, masklen, Arc::downgrade(&self.0)) + .insert(v6, masklen, Arc::downgrade(&self.state)) } }; } @@ -361,21 +352,21 @@ impl Peer { pub fn list_subnets(&self) -> Vec<(IpAddr, u32)> { let mut res = Vec::new(); res.append(&mut treebit_list( - &self.0, - &self.0.device.ipv4, + &self.state, + &self.state.device.ipv4, Box::new(|ip, masklen| (IpAddr::V4(ip), masklen)), )); res.append(&mut treebit_list( - &self.0, - &self.0.device.ipv6, + &self.state, + &self.state.device.ipv6, Box::new(|ip, masklen| (IpAddr::V6(ip), masklen)), )); res } pub fn remove_subnets(&self) { - treebit_remove(self, &self.0.device.ipv4); - treebit_remove(self, &self.0.device.ipv6); + treebit_remove(self, &self.state.device.ipv4); + treebit_remove(self, &self.state.device.ipv6); } fn send(&self, msg: Vec) {} diff --git a/src/router/tests.rs b/src/router/tests.rs index d626aeb..1e049c4 100644 --- a/src/router/tests.rs +++ b/src/router/tests.rs @@ -131,7 +131,7 @@ fn test_outbound() { workers, TunTest {}, BindTest {}, - |t: &Arc, data: bool, sent: bool| {}, + |t: &Arc, data: bool, sent: bool| println!("send"), |t: &Arc, data: bool, sent: bool| {}, |t: &Arc| t.store(true, Ordering::SeqCst), ); diff --git a/src/router/workers.rs b/src/router/workers.rs index 0e68954..e79502f 100644 --- a/src/router/workers.rs +++ b/src/router/workers.rs @@ -5,6 +5,9 @@ use std::sync::mpsc::{sync_channel, Receiver, TryRecvError}; use std::sync::{Arc, Weak}; use std::thread; +use futures::sync::oneshot; +use futures::*; + use spin; use crossbeam_deque::{Injector, Steal, Stealer, Worker}; @@ -25,93 +28,27 @@ pub enum Operation { Decryption, } -#[derive(PartialEq, Debug)] -pub enum Status { - Fault, // unsealing failed - Done, // job valid and complete - Waiting, // job awaiting completion +pub struct JobBuffer { + pub msg: Vec, // message buffer (nonce and receiver id set) + pub key: [u8; 32], // chacha20poly1305 key + pub okay: bool, // state of the job + pub op: Operation, // should be buffer be encrypted / decrypted? } -pub struct JobInner { - pub msg: Vec, // message buffer (nonce and receiver id set) - pub key: [u8; 32], // chacha20poly1305 key - pub status: Status, // state of the job - pub op: Operation, // should be buffer be encrypted / decrypted? -} - -pub type JobBuffer = Arc>; -pub type JobParallel = (Arc>, JobBuffer); -pub type JobInbound = (Weak>, JobBuffer); -pub type JobOutbound = JobBuffer; - -/* Strategy for workers acquiring a new job: - * - * 1. Try the local job queue (owned by the thread) - * 2. Try fetching a batch of jobs from the global injector - * 3. Attempt to steal jobs from other threads. - */ -fn find_task(local: &Worker, global: &Injector, stealers: &[Stealer]) -> Option { - local.pop().or_else(|| { - iter::repeat_with(|| { - global - .steal_batch_and_pop(local) - .or_else(|| stealers.iter().map(|s| s.steal()).collect()) - }) - .find(|s| !s.is_retry()) - .and_then(|s| s.success()) - }) -} - -fn wait_buffer(running: AtomicBool, buf: &JobBuffer) { - while running.load(Ordering::Acquire) { - match buf.try_lock() { - None => (), - Some(buf) => { - if buf.status == Status::Waiting { - return; - } - } - }; - thread::park(); - } -} - -fn wait_recv(running: &AtomicBool, recv: &Receiver) -> Result { - while running.load(Ordering::Acquire) { - match recv.try_recv() { - Err(TryRecvError::Empty) => (), - value => { - return value; - } - }; - thread::park(); - } - return Err(TryRecvError::Disconnected); -} +pub type JobParallel = (oneshot::Sender, JobBuffer); +pub type JobInbound = (Weak>, oneshot::Receiver); +pub type JobOutbound = oneshot::Receiver; pub fn worker_inbound( device: Arc>, // related device peer: Arc>, // related peer + receiver: Receiver>, ) { - while !peer.stopped.load(Ordering::Acquire) { - inner(&device, &peer) - } - + /* fn inner( device: &Arc>, peer: &Arc>, ) { - // wait for job to be submitted - let (state, buf) = loop { - match peer.inbound.lock().pop_front() { - Some(elem) => break elem, - _ => (), - } - - // default is to park - thread::park() - }; - // wait for job to complete loop { match buf.try_lock() { @@ -167,136 +104,94 @@ pub fn worker_inbound( thread::park() } } + */ } pub fn worker_outbound( device: Arc>, // related device peer: Arc>, // related peer + receiver: Receiver, ) { - while !peer.stopped.load(Ordering::Acquire) { - inner(&device, &peer) - } - - fn inner( - device: &Arc>, - peer: &Arc>, - ) { - // wait for job to be submitted - let (state, buf) = loop { - match peer.inbound.lock().pop_front() { - Some(elem) => break elem, - _ => (), + loop { + // fetch job + let rx = match receiver.recv() { + Ok(v) => v, + _ => { + return; } - - // default is to park - thread::park() }; // wait for job to complete - loop { - match buf.try_lock() { - None => (), - Some(buf) => match buf.status { - Status::Fault => break (), - Status::Done => { - // parse / cast - let (header, packet) = match LayoutVerified::new_from_prefix(&buf.msg[..]) { - Some(v) => v, - None => continue, - }; - let header: LayoutVerified<&[u8], TransportHeader> = header; - - // write to UDP device, TODO - let xmit = false; - - // trigger callback - (device.call_send)( - &peer.opaque, - buf.msg.len() - > CHACHA20_POLY1305.nonce_len() + mem::size_of::(), - xmit, - ); - break; - } - _ => (), - }, - }; - - // default is to park - thread::park() - } + let _ = rx + .map(|buf| { + if buf.okay { + // write to UDP device, TODO + let xmit = false; + + // trigger callback + (device.call_send)( + &peer.opaque, + buf.msg.len() + > CHACHA20_POLY1305.nonce_len() + mem::size_of::(), + xmit, + ); + } + }) + .wait(); } } pub fn worker_parallel( device: Arc>, - local: Worker>, // local job queue (local to thread) - stealers: Vec>>, // stealers (from other threads) + receiver: Receiver, ) { - while device.running.load(Ordering::SeqCst) { - match find_task(&local, &device.injector, &stealers) { - Some(job) => { - let (peer, buf) = job; - - // take ownership of the job buffer and complete it - { - let mut buf = buf.lock(); - - // cast and check size of packet - let (header, packet) = match LayoutVerified::new_from_prefix(&buf.msg[..]) { - Some(v) => v, - None => continue, - }; + loop { + // fetch next job + let (tx, mut buf) = match receiver.recv() { + Err(_) => { + return; + } + Ok(val) => val, + }; - if packet.len() < CHACHA20_POLY1305.nonce_len() { - continue; - } + // cast and check size of packet + let (header, packet) = match LayoutVerified::new_from_prefix(&buf.msg[..]) { + Some(v) => v, + None => continue, + }; - let header: LayoutVerified<&[u8], TransportHeader> = header; + if packet.len() < CHACHA20_POLY1305.nonce_len() { + continue; + } - // do the weird ring AEAD dance - let key = LessSafeKey::new( - UnboundKey::new(&CHACHA20_POLY1305, &buf.key[..]).unwrap(), - ); + let header: LayoutVerified<&[u8], TransportHeader> = header; - // create a nonce object - let mut nonce = [0u8; 12]; - debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len()); // why the this is not a constant, god knows... - nonce[4..].copy_from_slice(header.f_counter.as_bytes()); - let nonce = Nonce::assume_unique_for_key(nonce); + // do the weird ring AEAD dance + let key = LessSafeKey::new(UnboundKey::new(&CHACHA20_POLY1305, &buf.key[..]).unwrap()); - match buf.op { - Operation::Encryption => { - // note: extends the vector to accommodate the tag - key.seal_in_place_append_tag(nonce, Aad::empty(), &mut buf.msg) - .unwrap(); - buf.status = Status::Done; - } - Operation::Decryption => { - // opening failure is signaled by fault state - buf.status = match key.open_in_place(nonce, Aad::empty(), &mut buf.msg) - { - Ok(_) => Status::Done, - Err(_) => Status::Fault, - }; - } - } - } + // create a nonce object + let mut nonce = [0u8; 12]; + debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len()); + nonce[4..].copy_from_slice(header.f_counter.as_bytes()); + let nonce = Nonce::assume_unique_for_key(nonce); - // ensure consumer is unparked (TODO: better looking + wrap in atomic?) - peer.thread_outbound - .lock() - .as_ref() - .unwrap() - .thread() - .unpark(); + match buf.op { + Operation::Encryption => { + // note: extends the vector to accommodate the tag + key.seal_in_place_append_tag(nonce, Aad::empty(), &mut buf.msg) + .unwrap(); + buf.okay = true; } - None => { - // wait for notification from device - let &(ref lock, ref cvar) = &device.waker; - let mut guard = lock.lock(); - cvar.wait(&mut guard); + Operation::Decryption => { + // opening failure is signaled by fault state + buf.okay = match key.open_in_place(nonce, Aad::empty(), &mut buf.msg) { + Ok(_) => true, + Err(_) => false, + }; } } + + // pass ownership to consumer + let _ = tx.send(buf); } } diff --git a/src/types/bind.rs b/src/types/bind.rs new file mode 100644 index 0000000..62adbbb --- /dev/null +++ b/src/types/bind.rs @@ -0,0 +1,73 @@ +use super::Endpoint; +use std::error; + +/// Traits representing the "internet facing" end of the VPN. +/// +/// In practice this is a UDP socket (but the router interface is agnostic). +/// Often these traits will be implemented on the same type. + +/// Bind interface provided to the router code +pub trait RouterBind: Send + Sync { + type Error: error::Error; + type Endpoint: Endpoint; + + /// Receive a buffer on the bind + /// + /// # Arguments + /// + /// - `buf`, buffer for storing the packet. If the buffer is too short, the packet should just be truncated. + /// + /// # Note + /// + /// The size of the buffer is derieved from the MTU of the Tun device. + fn recv(&self, buf: &mut [u8]) -> Result<(usize, Self::Endpoint), Self::Error>; + + /// Send a buffer to the endpoint + /// + /// # Arguments + /// + /// - `buf`, packet src buffer (in practice the body of a UDP datagram) + /// - `dst`, destination endpoint (in practice, src: (ip, port) + dst: (ip, port) for sticky sockets) + /// + /// # Returns + /// + /// The unit type or an error if transmission failed + fn send(&self, buf: &[u8], dst: &Self::Endpoint) -> Result<(), Self::Error>; +} + +/// Bind interface provided for configuration (setting / getting the port) +pub trait ConfigBind { + type Error: error::Error; + + /// Return a new (unbound) instance of a configuration bind + fn new() -> Self; + + /// Updates the port of the bind + /// + /// # Arguments + /// + /// - `port`, the new port to bind to. 0 means any available port. + /// + /// # Returns + /// + /// The unit type or an error, if binding fails + fn set_port(&self, port: u16) -> Result<(), Self::Error>; + + /// Returns the current port of the bind + fn get_port(&self) -> Option; + + /// Set the mark (e.g. on Linus this is the fwmark) on the bind + /// + /// # Arguments + /// + /// - `mark`, the mark to set + /// + /// # Note + /// + /// The mark should be retained accross calls to `set_port`. + /// + /// # Returns + /// + /// The unit type or an error, if the operation fails due to permission errors + fn set_mark(&self, mark: u16) -> Result<(), Self::Error>; +} -- cgit v1.2.3-59-g8ed1b