diff options
Diffstat (limited to 'src/router')
-rw-r--r-- | src/router/device.rs | 27 | ||||
-rw-r--r-- | src/router/messages.rs | 2 | ||||
-rw-r--r-- | src/router/peer.rs | 14 | ||||
-rw-r--r-- | src/router/tests.rs | 41 | ||||
-rw-r--r-- | src/router/types.rs | 10 | ||||
-rw-r--r-- | src/router/workers.rs | 8 |
6 files changed, 85 insertions, 17 deletions
diff --git a/src/router/device.rs b/src/router/device.rs index 4fb0334..f1cbb70 100644 --- a/src/router/device.rs +++ b/src/router/device.rs @@ -17,6 +17,7 @@ use super::peer; use super::peer::{Peer, PeerInner}; use super::SIZE_MESSAGE_PREFIX; +use super::messages::TYPE_TRANSPORT; use super::types::{Callback, Callbacks, KeyCallback, Opaque, PhantomCallbacks, RouterError}; use super::workers::{worker_parallel, JobParallel}; @@ -66,8 +67,8 @@ pub struct DecryptionState<C: Callbacks, T: Tun, B: Bind> { } pub struct Device<C: Callbacks, T: Tun, B: Bind>( - Arc<DeviceInner<C, T, B>>, - Vec<thread::JoinHandle<()>>, + Arc<DeviceInner<C, T, B>>, // reference to device state + Vec<thread::JoinHandle<()>>, // join handles for workers ); impl<C: Callbacks, T: Tun, B: Bind> Drop for Device<C, T, B> { @@ -207,8 +208,17 @@ impl<C: Callbacks, T: Tun, B: Bind> Device<C, T, B> { // schedule for encryption and transmission to peer if let Some(job) = peer.send_job(msg) { + println!("made job!"); self.0.injector.push((peer.clone(), job)); } + + // ensure workers running + if self.0.parked.load(Ordering::Acquire) { + for handle in &self.1 { + handle.thread().unpark(); + } + } + Ok(()) } @@ -216,8 +226,17 @@ impl<C: Callbacks, T: Tun, B: Bind> Device<C, T, B> { /// /// # Arguments /// - /// - ct_msg: Encrypted transport message - pub fn recv(&self, ct_msg: &mut [u8]) { + /// - msg: Encrypted transport message + pub fn recv(&self, msg: Vec<u8>) -> Result<(), RouterError> { + // ensure that the type field access is within bounds + if msg.len() < SIZE_MESSAGE_PREFIX || msg[0] != TYPE_TRANSPORT { + return Err(RouterError::MalformedTransportMessage); + } + + // parse / cast + + // lookup peer based on receiver id + unimplemented!(); } } diff --git a/src/router/messages.rs b/src/router/messages.rs index bec24ac..e7b592b 100644 --- a/src/router/messages.rs +++ b/src/router/messages.rs @@ -2,6 +2,8 @@ use byteorder::LittleEndian; use zerocopy::byteorder::{U32, U64}; use zerocopy::{AsBytes, ByteSlice, FromBytes, LayoutVerified}; +pub const TYPE_TRANSPORT: u8 = 4; + #[repr(packed)] #[derive(Copy, Clone, FromBytes, AsBytes)] pub struct TransportHeader { diff --git a/src/router/peer.rs b/src/router/peer.rs index d755fa5..c1762ad 100644 --- a/src/router/peer.rs +++ b/src/router/peer.rs @@ -5,9 +5,9 @@ use std::sync::mpsc::{sync_channel, SyncSender}; use std::sync::{Arc, Weak}; use std::thread; -use spin; +use spin::Mutex; -use arraydeque::{ArrayDeque, Wrapping}; +use arraydeque::{ArrayDeque, Wrapping, Saturating}; use zerocopy::{AsBytes, LayoutVerified}; use treebitmap::address::Address; @@ -40,6 +40,8 @@ pub struct KeyWheel { pub struct PeerInner<C: Callbacks, T: Tun, B: Bind> { pub stopped: AtomicBool, pub opaque: C::Opaque, + pub outbound: Mutex<ArrayDeque<[JobOutbound; MAX_STAGED_PACKETS], Wrapping>>, + pub inbound: Mutex<ArrayDeque<[JobInbound<C, T, B>; MAX_STAGED_PACKETS], Wrapping>>, pub device: Arc<DeviceInner<C, T, B>>, pub thread_outbound: spin::Mutex<Option<thread::JoinHandle<()>>>, pub thread_inbound: spin::Mutex<Option<thread::JoinHandle<()>>>, @@ -101,6 +103,7 @@ fn treebit_remove<A: Address, C: Callbacks, T: Tun, B: Bind>( impl<C: Callbacks, T: Tun, B: Bind> Drop for Peer<C, T, B> { fn drop(&mut self) { + println!("drop"); // mark peer as stopped let peer = &self.0; @@ -167,6 +170,8 @@ pub fn new_peer<C: Callbacks, T: Tun, B: Bind>( let device = device.clone(); Arc::new(PeerInner { opaque, + inbound: Mutex::new(ArrayDeque::new()), + outbound: Mutex::new(ArrayDeque::new()), stopped: AtomicBool::new(false), device: device, ekey: spin::Mutex::new(None), @@ -258,7 +263,10 @@ impl<C: Callbacks, T: Tun, B: Bind> PeerInner<C, T, B> { // add job to in-order queue and return to device for inclusion in worker pool match self.queue_outbound.try_send(job.clone()) { Ok(_) => Some(job), - Err(_) => None, + Err(e) => { + println!("{:?}", e); + None + } } } } diff --git a/src/router/tests.rs b/src/router/tests.rs index 07851a8..8c51ff1 100644 --- a/src/router/tests.rs +++ b/src/router/tests.rs @@ -3,11 +3,13 @@ use std::fmt; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use std::thread; +use std::time::{Duration, Instant}; use pnet::packet::ipv4::MutableIpv4Packet; use pnet::packet::ipv6::MutableIpv6Packet; -use super::super::types::{Bind, Tun}; +use super::super::types::{Bind, Key, KeyPair, Tun}; use super::{Device, Peer, SIZE_MESSAGE_PREFIX}; #[derive(Debug)] @@ -93,6 +95,32 @@ impl fmt::Display for BindError { } } +fn dummy_keypair(initiator: bool) -> KeyPair { + let k1 = Key { + key: [0x53u8; 32], + id: 0x646e6573, + }; + let k2 = Key { + key: [0x52u8; 32], + id: 0x76636572, + }; + if initiator { + KeyPair { + birth: Instant::now(), + initiator: true, + send: k1, + recv: k2, + } + } else { + KeyPair { + birth: Instant::now(), + initiator: false, + send: k2, + recv: k1, + } + } +} + #[test] fn test_outbound() { let opaque = Arc::new(AtomicBool::new(false)); @@ -134,6 +162,11 @@ fn test_outbound() { ), ]; + thread::sleep(Duration::from_millis(1000)); + assert!(false); + + peer.add_keypair(dummy_keypair(true)); + for (mask, len, ip, okay) in &tests { opaque.store(false, Ordering::SeqCst); @@ -161,8 +194,8 @@ fn test_outbound() { // cryptkey routing succeeded assert!(res.is_ok()); - // and a key should have been requested - assert!(opaque.load(Ordering::Acquire)); + // and a key should have been requested + // assert!(opaque.load(Ordering::Acquire), "did not request key"); } else { assert!(res.is_err()); } @@ -170,4 +203,6 @@ fn test_outbound() { // clear subnets for next test peer.remove_subnets(); } + + assert!(false); } diff --git a/src/router/types.rs b/src/router/types.rs index 5077686..336f56b 100644 --- a/src/router/types.rs +++ b/src/router/types.rs @@ -1,6 +1,6 @@ +use std::error::Error; use std::fmt; use std::marker::PhantomData; -use std::error::Error; pub trait Opaque: Send + Sync + 'static {} @@ -52,19 +52,19 @@ impl<O: Opaque, R: Callback<O>, S: Callback<O>, K: KeyCallback<O>> Callbacks type CallbackKey = K; } - - #[derive(Debug)] pub enum RouterError { NoCryptKeyRoute, MalformedIPHeader, + MalformedTransportMessage, } impl fmt::Display for RouterError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { RouterError::NoCryptKeyRoute => write!(f, "No cryptkey route configured for subnet"), - RouterError::MalformedIPHeader => write!(f, "IP header is malformed") + RouterError::MalformedIPHeader => write!(f, "IP header is malformed"), + RouterError::MalformedTransportMessage => write!(f, "IP header is malformed"), } } } @@ -77,4 +77,4 @@ impl Error for RouterError { fn source(&self) -> Option<&(dyn Error + 'static)> { None } -}
\ No newline at end of file +} diff --git a/src/router/workers.rs b/src/router/workers.rs index 1af2cae..241b06f 100644 --- a/src/router/workers.rs +++ b/src/router/workers.rs @@ -167,7 +167,7 @@ pub fn worker_outbound<C: Callbacks, T: Tun, B: Bind>( Ok(buf) => { while !peer.stopped.load(Ordering::Acquire) { match buf.try_lock() { - None => (), + None => (), // nothing to do Some(buf) => match buf.status { Status::Done => { // parse / cast @@ -198,7 +198,8 @@ pub fn worker_outbound<C: Callbacks, T: Tun, B: Bind>( thread::park(); } } - Err(_) => { + Err(e) => { + println!("park outbound! {:?}", e); break; } } @@ -211,9 +212,11 @@ pub fn worker_parallel<C: Callbacks, T: Tun, B: Bind>( stealers: Vec<Stealer<JobParallel<C, T, B>>>, // stealers (from other threads) ) { while device.running.load(Ordering::SeqCst) { + println!("running"); match find_task(&local, &device.injector, &stealers) { Some(job) => { let (peer, buf) = job; + println!("jobs!"); // take ownership of the job buffer and complete it { @@ -269,6 +272,7 @@ pub fn worker_parallel<C: Callbacks, T: Tun, B: Bind>( .unpark(); } None => { + println!("park"); device.parked.store(true, Ordering::Release); thread::park(); } |