diff options
-rw-r--r-- | Cargo.lock | 36 | ||||
-rw-r--r-- | Cargo.toml | 3 | ||||
-rw-r--r-- | src/router/device.rs | 17 | ||||
-rw-r--r-- | src/router/peer.rs | 50 | ||||
-rw-r--r-- | src/router/tests.rs | 6 | ||||
-rw-r--r-- | src/router/workers.rs | 233 |
6 files changed, 190 insertions, 155 deletions
@@ -399,6 +399,14 @@ dependencies = [ ] [[package]] +name = "lock_api" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "scopeguard 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] name = "log" version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -539,6 +547,16 @@ dependencies = [ ] [[package]] +name = "parking_lot" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "lock_api 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot_core 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", + "rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] name = "parking_lot_core" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -551,6 +569,20 @@ dependencies = [ ] [[package]] +name = "parking_lot_core" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", + "cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.62 (registry+https://github.com/rust-lang/crates.io-index)", + "redox_syscall 0.1.56 (registry+https://github.com/rust-lang/crates.io-index)", + "rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", + "smallvec 0.6.10 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] name = "pnet" version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1459,6 +1491,7 @@ dependencies = [ "hjul 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "hmac 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", "pnet 0.22.0 (registry+https://github.com/rust-lang/crates.io-index)", "proptest 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1567,6 +1600,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum lazycell 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b294d6fa9ee409a054354afc4352b0b9ef7ca222c69b8812cbea9e7d2bf3783f" "checksum libc 0.2.62 (registry+https://github.com/rust-lang/crates.io-index)" = "34fcd2c08d2f832f376f4173a231990fa5aef4e99fb569867318a227ef4c06ba" "checksum lock_api 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "62ebf1391f6acad60e5c8b43706dde4582df75c06698ab44511d15016bc2442c" +"checksum lock_api 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f8912e782533a93a167888781b836336a6ca5da6175c05944c86cf28c31104dc" "checksum log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "e19e8d5c34a3e0e2223db8e060f9e8264aeeb5c5fc64a4ee9965c062211c024b" "checksum log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7" "checksum memchr 2.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "88579771288728879b57485cc7d6b07d648c9f0141eb955f8ab7f9d45394468e" @@ -1583,7 +1617,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum opaque-debug 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c" "checksum owning_ref 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "49a4b8ea2179e6a2e27411d3bca09ca6dd630821cf6894c6c7c8467a8ee7ef13" "checksum parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ab41b4aed082705d1056416ae4468b6ea99d52599ecf3169b00088d43113e337" +"checksum parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f842b1982eb6c2fe34036a4fbfb06dd185a3f5c8edfaacdf7d1ea10b07de6252" "checksum parking_lot_core 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "94c8c7923936b28d546dfd14d4472eaf34c99b14e1c973a32b3e6d4eb04298c9" +"checksum parking_lot_core 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "b876b1b9e7ac6e1a74a6da34d25c42e17e8862aa409cbbbdcfc8d86c6f3bc62b" "checksum pnet 0.22.0 (registry+https://github.com/rust-lang/crates.io-index)" = "63d693c84430248366146e3181ff9d330243464fa9e6146c372b2f3eb2e2d8e7" "checksum pnet_base 0.22.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4df28acf2fcc77436dd2b91a9a0c2bb617f9ca5f2acefee1a4135058b9f9801f" "checksum pnet_datalink 0.22.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7b34f8ca857599d05b6b082e9baff8d27c54cb9c26568cf3c0993a5755816966" @@ -18,7 +18,7 @@ digest = "0.8.0" lazy_static = "^1.3" tokio = "0.1.22" futures = "0.1.28" -arraydeque = "^0.4" +arraydeque = "0.4.5" treebitmap = "^0.4" crossbeam-deque = "0.7" hjul = "0.1.2" @@ -26,6 +26,7 @@ ring = "0.16.7" chacha20poly1305 = "^0.1" aead = "^0.1.1" clear_on_drop = "0.2.3" +parking_lot = "^0.9" [dependencies.x25519-dalek] version = "^0.5" diff --git a/src/router/device.rs b/src/router/device.rs index f1cbb70..ec6453d 100644 --- a/src/router/device.rs +++ b/src/router/device.rs @@ -6,6 +6,8 @@ use std::sync::{Arc, Weak}; use std::thread; use std::time::Instant; +use parking_lot::{Condvar, Mutex}; + use crossbeam_deque::{Injector, Worker}; use spin; use treebitmap::IpLookupTable; @@ -40,8 +42,8 @@ pub struct DeviceInner<C: Callbacks, T: Tun, B: Bind> { pub call_need_key: C::CallbackKey, // threading and workers + pub waker: (Mutex<()>, Condvar), pub running: AtomicBool, // workers running? - pub parked: AtomicBool, // any workers parked? pub injector: Injector<JobParallel<C, T, B>>, // parallel enc/dec task injector // routing @@ -107,7 +109,7 @@ impl<O: Opaque, R: Callback<O>, S: Callback<O>, K: KeyCallback<O>, T: Tun, B: Bi call_recv, call_send, call_need_key, - parked: AtomicBool::new(false), + waker: (Mutex::new(()), Condvar::new()), running: AtomicBool::new(true), injector: Injector::new(), recv: spin::RwLock::new(HashMap::new()), @@ -208,15 +210,12 @@ impl<C: Callbacks, T: Tun, B: Bind> Device<C, T, B> { // schedule for encryption and transmission to peer if let Some(job) = peer.send_job(msg) { - println!("made job!"); + // add job to parallel worker pool self.0.injector.push((peer.clone(), job)); - } - // ensure workers running - if self.0.parked.load(Ordering::Acquire) { - for handle in &self.1 { - handle.thread().unpark(); - } + // ensure workers running, TODO: something faster + let &(_, ref cvar) = &self.0.waker; + cvar.notify_all(); } Ok(()) diff --git a/src/router/peer.rs b/src/router/peer.rs index c1762ad..a85d87a 100644 --- a/src/router/peer.rs +++ b/src/router/peer.rs @@ -7,7 +7,7 @@ use std::thread; use spin::Mutex; -use arraydeque::{ArrayDeque, Wrapping, Saturating}; +use arraydeque::{ArrayDeque, Saturating, Wrapping}; use zerocopy::{AsBytes, LayoutVerified}; use treebitmap::address::Address; @@ -40,19 +40,17 @@ pub struct KeyWheel { pub struct PeerInner<C: Callbacks, T: Tun, B: Bind> { pub stopped: AtomicBool, pub opaque: C::Opaque, - pub outbound: Mutex<ArrayDeque<[JobOutbound; MAX_STAGED_PACKETS], Wrapping>>, - pub inbound: Mutex<ArrayDeque<[JobInbound<C, T, B>; MAX_STAGED_PACKETS], Wrapping>>, + pub outbound: Mutex<ArrayDeque<[JobOutbound; MAX_STAGED_PACKETS], Saturating>>, + pub inbound: Mutex<ArrayDeque<[JobInbound<C, T, B>; MAX_STAGED_PACKETS], Saturating>>, pub device: Arc<DeviceInner<C, T, B>>, - pub thread_outbound: spin::Mutex<Option<thread::JoinHandle<()>>>, - pub thread_inbound: spin::Mutex<Option<thread::JoinHandle<()>>>, - pub queue_outbound: SyncSender<JobOutbound>, - pub queue_inbound: SyncSender<JobInbound<C, T, B>>, - pub staged_packets: spin::Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake - pub rx_bytes: AtomicU64, // received bytes - pub tx_bytes: AtomicU64, // transmitted bytes - pub keys: spin::Mutex<KeyWheel>, // key-wheel - pub ekey: spin::Mutex<Option<EncryptionState>>, // encryption state - pub endpoint: spin::Mutex<Option<Arc<SocketAddr>>>, + pub thread_outbound: Mutex<Option<thread::JoinHandle<()>>>, + pub thread_inbound: Mutex<Option<thread::JoinHandle<()>>>, + pub staged_packets: Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake + pub rx_bytes: AtomicU64, // received bytes + pub tx_bytes: AtomicU64, // transmitted bytes + pub keys: Mutex<KeyWheel>, // key-wheel + pub ekey: Mutex<Option<EncryptionState>>, // encryption state + pub endpoint: Mutex<Option<Arc<SocketAddr>>>, } pub struct Peer<C: Callbacks, T: Tun, B: Bind>(Arc<PeerInner<C, T, B>>); @@ -103,7 +101,6 @@ fn treebit_remove<A: Address, C: Callbacks, T: Tun, B: Bind>( impl<C: Callbacks, T: Tun, B: Bind> Drop for Peer<C, T, B> { fn drop(&mut self) { - println!("drop"); // mark peer as stopped let peer = &self.0; @@ -161,10 +158,6 @@ pub fn new_peer<C: Callbacks, T: Tun, B: Bind>( device: Arc<DeviceInner<C, T, B>>, opaque: C::Opaque, ) -> Peer<C, T, B> { - // allocate in-order queues - let (send_inbound, recv_inbound) = sync_channel(MAX_STAGED_PACKETS); - let (send_outbound, recv_outbound) = sync_channel(MAX_STAGED_PACKETS); - // allocate peer object let peer = { let device = device.clone(); @@ -176,8 +169,6 @@ pub fn new_peer<C: Callbacks, T: Tun, B: Bind>( device: device, ekey: spin::Mutex::new(None), endpoint: spin::Mutex::new(None), - queue_inbound: send_inbound, - queue_outbound: send_outbound, keys: spin::Mutex::new(KeyWheel { next: None, current: None, @@ -192,22 +183,18 @@ pub fn new_peer<C: Callbacks, T: Tun, B: Bind>( }) }; - // spawn inbound thread + // spawn outbound thread *peer.thread_inbound.lock() = { let peer = peer.clone(); let device = device.clone(); - Some(thread::spawn(move || { - worker_outbound(device, peer, recv_outbound) - })) + Some(thread::spawn(move || worker_outbound(device, peer))) }; - // spawn outbound thread + // spawn inbound thread *peer.thread_outbound.lock() = { let peer = peer.clone(); let device = device.clone(); - Some(thread::spawn(move || { - worker_inbound(device, peer, recv_inbound) - })) + Some(thread::spawn(move || worker_inbound(device, peer))) }; Peer(peer) @@ -261,12 +248,9 @@ impl<C: Callbacks, T: Tun, B: Bind> PeerInner<C, T, B> { })); // add job to in-order queue and return to device for inclusion in worker pool - match self.queue_outbound.try_send(job.clone()) { + match self.outbound.lock().push_back(job.clone()) { Ok(_) => Some(job), - Err(e) => { - println!("{:?}", e); - None - } + Err(_) => None, } } } diff --git a/src/router/tests.rs b/src/router/tests.rs index 8c51ff1..d626aeb 100644 --- a/src/router/tests.rs +++ b/src/router/tests.rs @@ -162,9 +162,6 @@ fn test_outbound() { ), ]; - thread::sleep(Duration::from_millis(1000)); - assert!(false); - peer.add_keypair(dummy_keypair(true)); for (mask, len, ip, okay) in &tests { @@ -199,10 +196,7 @@ fn test_outbound() { } else { assert!(res.is_err()); } - // clear subnets for next test peer.remove_subnets(); } - - assert!(false); } diff --git a/src/router/workers.rs b/src/router/workers.rs index 241b06f..0e68954 100644 --- a/src/router/workers.rs +++ b/src/router/workers.rs @@ -90,69 +90,81 @@ fn wait_recv<T>(running: &AtomicBool, recv: &Receiver<T>) -> Result<T, TryRecvEr } pub fn worker_inbound<C: Callbacks, T: Tun, B: Bind>( - device: Arc<DeviceInner<C, T, B>>, // related device - peer: Arc<PeerInner<C, T, B>>, // related peer - recv: Receiver<JobInbound<C, T, B>>, // in order queue + device: Arc<DeviceInner<C, T, B>>, // related device + peer: Arc<PeerInner<C, T, B>>, // related peer ) { - loop { - match wait_recv(&peer.stopped, &recv) { - Ok((state, buf)) => { - while !peer.stopped.load(Ordering::Acquire) { - match buf.try_lock() { - None => (), - Some(buf) => match buf.status { - Status::Done => { - // parse / cast - let (header, packet) = - match LayoutVerified::new_from_prefix(&buf.msg[..]) { - Some(v) => v, - None => continue, - }; - let header: LayoutVerified<&[u8], TransportHeader> = header; - - // obtain strong reference to decryption state - let state = if let Some(state) = state.upgrade() { - state - } else { - break; - }; - - // check for replay - if !state.protector.lock().update(header.f_counter.get()) { - break; - } - - // check for confirms key - if !state.confirmed.swap(true, Ordering::SeqCst) { - peer.confirm_key(state.keypair.clone()); - } - - // update enpoint, TODO - - // write packet to TUN device, TODO - - // trigger callback - debug_assert!( - packet.len() >= CHACHA20_POLY1305.nonce_len(), - "this should be checked earlier in the pipeline" - ); - (device.call_recv)( - &peer.opaque, - packet.len() > CHACHA20_POLY1305.nonce_len(), - true, - ); - break; - } - Status::Fault => break, - _ => (), - }, - }; - thread::park(); - } - } - Err(_) => { - break; + while !peer.stopped.load(Ordering::Acquire) { + inner(&device, &peer) + } + + fn inner<C: Callbacks, T: Tun, B: Bind>( + device: &Arc<DeviceInner<C, T, B>>, + peer: &Arc<PeerInner<C, T, B>>, + ) { + // wait for job to be submitted + let (state, buf) = loop { + match peer.inbound.lock().pop_front() { + Some(elem) => break elem, + _ => (), } + + // default is to park + thread::park() + }; + + // wait for job to complete + loop { + match buf.try_lock() { + None => (), + Some(buf) => match buf.status { + Status::Fault => break (), + Status::Done => { + // parse / cast + let (header, packet) = match LayoutVerified::new_from_prefix(&buf.msg[..]) { + Some(v) => v, + None => continue, + }; + let header: LayoutVerified<&[u8], TransportHeader> = header; + + // obtain strong reference to decryption state + let state = if let Some(state) = state.upgrade() { + state + } else { + break; + }; + + // check for replay + if !state.protector.lock().update(header.f_counter.get()) { + break; + } + + // check for confirms key + if !state.confirmed.swap(true, Ordering::SeqCst) { + peer.confirm_key(state.keypair.clone()); + } + + // update endpoint, TODO + + // write packet to TUN device, TODO + + // trigger callback + debug_assert!( + packet.len() >= CHACHA20_POLY1305.nonce_len(), + "this should be checked earlier in the pipeline" + ); + (device.call_recv)( + &peer.opaque, + packet.len() > CHACHA20_POLY1305.nonce_len(), + true, + ); + break; + } + _ => (), + }, + }; + + // default is to park + thread::park() } } } @@ -160,48 +172,58 @@ pub fn worker_inbound<C: Callbacks, T: Tun, B: Bind>( pub fn worker_outbound<C: Callbacks, T: Tun, B: Bind>( device: Arc<DeviceInner<C, T, B>>, // related device peer: Arc<PeerInner<C, T, B>>, // related peer - recv: Receiver<JobOutbound>, // in order queue ) { - loop { - match wait_recv(&peer.stopped, &recv) { - Ok(buf) => { - while !peer.stopped.load(Ordering::Acquire) { - match buf.try_lock() { - None => (), // nothing to do - Some(buf) => match buf.status { - Status::Done => { - // parse / cast - let (header, packet) = - match LayoutVerified::new_from_prefix(&buf.msg[..]) { - Some(v) => v, - None => continue, - }; - let header: LayoutVerified<&[u8], TransportHeader> = header; - - // write to UDP device, TODO - let xmit = false; - - // trigger callback - (device.call_send)( - &peer.opaque, - buf.msg.len() - > CHACHA20_POLY1305.nonce_len() - + mem::size_of::<TransportHeader>(), - xmit, - ); - break; - } - Status::Fault => break, - _ => (), - }, - }; - thread::park(); - } - } - Err(e) => { - println!("park outbound! {:?}", e); - break; + while !peer.stopped.load(Ordering::Acquire) { + inner(&device, &peer) + } + + fn inner<C: Callbacks, T: Tun, B: Bind>( + device: &Arc<DeviceInner<C, T, B>>, + peer: &Arc<PeerInner<C, T, B>>, + ) { + // wait for job to be submitted + let (state, buf) = loop { + match peer.inbound.lock().pop_front() { + Some(elem) => break elem, + _ => (), } + + // default is to park + thread::park() + }; + + // wait for job to complete + loop { + match buf.try_lock() { + None => (), + Some(buf) => match buf.status { + Status::Fault => break (), + Status::Done => { + // parse / cast + let (header, packet) = match LayoutVerified::new_from_prefix(&buf.msg[..]) { + Some(v) => v, + None => continue, + }; + let header: LayoutVerified<&[u8], TransportHeader> = header; + + // write to UDP device, TODO + let xmit = false; + + // trigger callback + (device.call_send)( + &peer.opaque, + buf.msg.len() + > CHACHA20_POLY1305.nonce_len() + mem::size_of::<TransportHeader>(), + xmit, + ); + break; + } + _ => (), + }, + }; + + // default is to park + thread::park() } } } @@ -212,11 +234,9 @@ pub fn worker_parallel<C: Callbacks, T: Tun, B: Bind>( stealers: Vec<Stealer<JobParallel<C, T, B>>>, // stealers (from other threads) ) { while device.running.load(Ordering::SeqCst) { - println!("running"); match find_task(&local, &device.injector, &stealers) { Some(job) => { let (peer, buf) = job; - println!("jobs!"); // take ownership of the job buffer and complete it { @@ -272,9 +292,10 @@ pub fn worker_parallel<C: Callbacks, T: Tun, B: Bind>( .unpark(); } None => { - println!("park"); - device.parked.store(true, Ordering::Release); - thread::park(); + // wait for notification from device + let &(ref lock, ref cvar) = &device.waker; + let mut guard = lock.lock(); + cvar.wait(&mut guard); } } } |