From 62d71a7a67e2f4e32a8fc48d3e483fecea2c352e Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Mon, 2 Sep 2019 20:22:47 +0200 Subject: Reconsider inorder queueing --- src/router/device.rs | 27 +++++++++++++++++++++++---- src/router/messages.rs | 2 ++ src/router/peer.rs | 14 +++++++++++--- src/router/tests.rs | 41 ++++++++++++++++++++++++++++++++++++++--- src/router/types.rs | 10 +++++----- src/router/workers.rs | 8 ++++++-- 6 files changed, 85 insertions(+), 17 deletions(-) diff --git a/src/router/device.rs b/src/router/device.rs index 4fb0334..f1cbb70 100644 --- a/src/router/device.rs +++ b/src/router/device.rs @@ -17,6 +17,7 @@ use super::peer; use super::peer::{Peer, PeerInner}; use super::SIZE_MESSAGE_PREFIX; +use super::messages::TYPE_TRANSPORT; use super::types::{Callback, Callbacks, KeyCallback, Opaque, PhantomCallbacks, RouterError}; use super::workers::{worker_parallel, JobParallel}; @@ -66,8 +67,8 @@ pub struct DecryptionState { } pub struct Device( - Arc>, - Vec>, + Arc>, // reference to device state + Vec>, // join handles for workers ); impl Drop for Device { @@ -207,8 +208,17 @@ impl Device { // schedule for encryption and transmission to peer if let Some(job) = peer.send_job(msg) { + println!("made job!"); self.0.injector.push((peer.clone(), job)); } + + // ensure workers running + if self.0.parked.load(Ordering::Acquire) { + for handle in &self.1 { + handle.thread().unpark(); + } + } + Ok(()) } @@ -216,8 +226,17 @@ impl Device { /// /// # Arguments /// - /// - ct_msg: Encrypted transport message - pub fn recv(&self, ct_msg: &mut [u8]) { + /// - msg: Encrypted transport message + pub fn recv(&self, msg: Vec) -> Result<(), RouterError> { + // ensure that the type field access is within bounds + if msg.len() < SIZE_MESSAGE_PREFIX || msg[0] != TYPE_TRANSPORT { + return Err(RouterError::MalformedTransportMessage); + } + + // parse / cast + + // lookup peer based on receiver id + unimplemented!(); } } diff --git a/src/router/messages.rs b/src/router/messages.rs index bec24ac..e7b592b 100644 --- a/src/router/messages.rs +++ b/src/router/messages.rs @@ -2,6 +2,8 @@ use byteorder::LittleEndian; use zerocopy::byteorder::{U32, U64}; use zerocopy::{AsBytes, ByteSlice, FromBytes, LayoutVerified}; +pub const TYPE_TRANSPORT: u8 = 4; + #[repr(packed)] #[derive(Copy, Clone, FromBytes, AsBytes)] pub struct TransportHeader { diff --git a/src/router/peer.rs b/src/router/peer.rs index d755fa5..c1762ad 100644 --- a/src/router/peer.rs +++ b/src/router/peer.rs @@ -5,9 +5,9 @@ use std::sync::mpsc::{sync_channel, SyncSender}; use std::sync::{Arc, Weak}; use std::thread; -use spin; +use spin::Mutex; -use arraydeque::{ArrayDeque, Wrapping}; +use arraydeque::{ArrayDeque, Wrapping, Saturating}; use zerocopy::{AsBytes, LayoutVerified}; use treebitmap::address::Address; @@ -40,6 +40,8 @@ pub struct KeyWheel { pub struct PeerInner { pub stopped: AtomicBool, pub opaque: C::Opaque, + pub outbound: Mutex>, + pub inbound: Mutex; MAX_STAGED_PACKETS], Wrapping>>, pub device: Arc>, pub thread_outbound: spin::Mutex>>, pub thread_inbound: spin::Mutex>>, @@ -101,6 +103,7 @@ fn treebit_remove( impl Drop for Peer { fn drop(&mut self) { + println!("drop"); // mark peer as stopped let peer = &self.0; @@ -167,6 +170,8 @@ pub fn new_peer( let device = device.clone(); Arc::new(PeerInner { opaque, + inbound: Mutex::new(ArrayDeque::new()), + outbound: Mutex::new(ArrayDeque::new()), stopped: AtomicBool::new(false), device: device, ekey: spin::Mutex::new(None), @@ -258,7 +263,10 @@ impl PeerInner { // add job to in-order queue and return to device for inclusion in worker pool match self.queue_outbound.try_send(job.clone()) { Ok(_) => Some(job), - Err(_) => None, + Err(e) => { + println!("{:?}", e); + None + } } } } diff --git a/src/router/tests.rs b/src/router/tests.rs index 07851a8..8c51ff1 100644 --- a/src/router/tests.rs +++ b/src/router/tests.rs @@ -3,11 +3,13 @@ use std::fmt; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use std::thread; +use std::time::{Duration, Instant}; use pnet::packet::ipv4::MutableIpv4Packet; use pnet::packet::ipv6::MutableIpv6Packet; -use super::super::types::{Bind, Tun}; +use super::super::types::{Bind, Key, KeyPair, Tun}; use super::{Device, Peer, SIZE_MESSAGE_PREFIX}; #[derive(Debug)] @@ -93,6 +95,32 @@ impl fmt::Display for BindError { } } +fn dummy_keypair(initiator: bool) -> KeyPair { + let k1 = Key { + key: [0x53u8; 32], + id: 0x646e6573, + }; + let k2 = Key { + key: [0x52u8; 32], + id: 0x76636572, + }; + if initiator { + KeyPair { + birth: Instant::now(), + initiator: true, + send: k1, + recv: k2, + } + } else { + KeyPair { + birth: Instant::now(), + initiator: false, + send: k2, + recv: k1, + } + } +} + #[test] fn test_outbound() { let opaque = Arc::new(AtomicBool::new(false)); @@ -134,6 +162,11 @@ fn test_outbound() { ), ]; + thread::sleep(Duration::from_millis(1000)); + assert!(false); + + peer.add_keypair(dummy_keypair(true)); + for (mask, len, ip, okay) in &tests { opaque.store(false, Ordering::SeqCst); @@ -161,8 +194,8 @@ fn test_outbound() { // cryptkey routing succeeded assert!(res.is_ok()); - // and a key should have been requested - assert!(opaque.load(Ordering::Acquire)); + // and a key should have been requested + // assert!(opaque.load(Ordering::Acquire), "did not request key"); } else { assert!(res.is_err()); } @@ -170,4 +203,6 @@ fn test_outbound() { // clear subnets for next test peer.remove_subnets(); } + + assert!(false); } diff --git a/src/router/types.rs b/src/router/types.rs index 5077686..336f56b 100644 --- a/src/router/types.rs +++ b/src/router/types.rs @@ -1,6 +1,6 @@ +use std::error::Error; use std::fmt; use std::marker::PhantomData; -use std::error::Error; pub trait Opaque: Send + Sync + 'static {} @@ -52,19 +52,19 @@ impl, S: Callback, K: KeyCallback> Callbacks type CallbackKey = K; } - - #[derive(Debug)] pub enum RouterError { NoCryptKeyRoute, MalformedIPHeader, + MalformedTransportMessage, } impl fmt::Display for RouterError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { RouterError::NoCryptKeyRoute => write!(f, "No cryptkey route configured for subnet"), - RouterError::MalformedIPHeader => write!(f, "IP header is malformed") + RouterError::MalformedIPHeader => write!(f, "IP header is malformed"), + RouterError::MalformedTransportMessage => write!(f, "IP header is malformed"), } } } @@ -77,4 +77,4 @@ impl Error for RouterError { fn source(&self) -> Option<&(dyn Error + 'static)> { None } -} \ No newline at end of file +} diff --git a/src/router/workers.rs b/src/router/workers.rs index 1af2cae..241b06f 100644 --- a/src/router/workers.rs +++ b/src/router/workers.rs @@ -167,7 +167,7 @@ pub fn worker_outbound( Ok(buf) => { while !peer.stopped.load(Ordering::Acquire) { match buf.try_lock() { - None => (), + None => (), // nothing to do Some(buf) => match buf.status { Status::Done => { // parse / cast @@ -198,7 +198,8 @@ pub fn worker_outbound( thread::park(); } } - Err(_) => { + Err(e) => { + println!("park outbound! {:?}", e); break; } } @@ -211,9 +212,11 @@ pub fn worker_parallel( stealers: Vec>>, // stealers (from other threads) ) { while device.running.load(Ordering::SeqCst) { + println!("running"); match find_task(&local, &device.injector, &stealers) { Some(job) => { let (peer, buf) = job; + println!("jobs!"); // take ownership of the job buffer and complete it { @@ -269,6 +272,7 @@ pub fn worker_parallel( .unpark(); } None => { + println!("park"); device.parked.store(true, Ordering::Release); thread::park(); } -- cgit v1.2.3-59-g8ed1b