diff options
-rw-r--r-- | src/main.rs | 30 | ||||
-rw-r--r-- | src/router/device.rs | 56 | ||||
-rw-r--r-- | src/router/mod.rs | 1 | ||||
-rw-r--r-- | src/router/peer.rs | 44 | ||||
-rw-r--r-- | src/router/types.rs | 7 | ||||
-rw-r--r-- | src/router/workers.rs | 18 |
6 files changed, 97 insertions, 59 deletions
diff --git a/src/main.rs b/src/main.rs index eab4b61..ae90251 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,18 +5,38 @@ mod handshake; mod router; mod types; +use hjul::*; + use std::sync::Arc; +use std::time::Duration; use sodiumoxide; use types::KeyPair; +#[derive(Debug, Clone)] +struct PeerTimer { + a: Timer, + b: Timer, +} + fn main() { + let runner = Runner::new(Duration::from_millis(100), 1000, 1024); + // choose optimal crypto implementations for platform sodiumoxide::init().unwrap(); - let mut router = router::Device::new(8); - { - let peer = router.new_peer(); - } - loop {} + let router = router::Device::new( + 4, + |t: &PeerTimer, data: bool| t.a.reset(Duration::from_millis(1000)), + |t: &PeerTimer, data: bool| t.b.reset(Duration::from_millis(1000)), + ); + + let pt = PeerTimer { + a: runner.timer(|| println!("timer-a fired for peer")), + b: runner.timer(|| println!("timer-b fired for peer")), + }; + + let peer = router.new_peer(pt.clone()); + + println!("{:?}", pt); } diff --git a/src/router/device.rs b/src/router/device.rs index 974d019..d1a7d93 100644 --- a/src/router/device.rs +++ b/src/router/device.rs @@ -1,32 +1,33 @@ -use arraydeque::{ArrayDeque, Wrapping}; -use treebitmap::address::Address; -use treebitmap::IpLookupTable; - -use crossbeam_deque::{Injector, Steal}; use std::collections::HashMap; -use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; -use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; -use std::sync::mpsc::{sync_channel, SyncSender}; -use std::sync::{Arc, Mutex, Weak}; +use std::net::{Ipv4Addr, Ipv6Addr}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Weak}; use std::thread; use std::time::Instant; +use crossbeam_deque::{Injector, Steal}; use spin; +use treebitmap::IpLookupTable; -use super::super::constants::*; use super::super::types::KeyPair; use super::anti_replay::AntiReplay; use super::peer; use super::peer::{Peer, PeerInner}; -use super::workers::worker_parallel; -pub struct DeviceInner { +use super::types::{Callback, Opaque}; + +pub struct DeviceInner<T: Opaque> { + // callbacks (used for timers) + pub event_recv: Box<dyn Callback<T>>, // authenticated message received + pub event_send: Box<dyn Callback<T>>, // authenticated message send + pub event_new_handshake: (), // called when a new handshake is required + pub stopped: AtomicBool, pub injector: Injector<()>, // parallel enc/dec task injector pub threads: Vec<thread::JoinHandle<()>>, // join handles of worker threads - pub recv: spin::RwLock<HashMap<u32, DecryptionState>>, // receiver id -> decryption state - pub ipv4: spin::RwLock<IpLookupTable<Ipv4Addr, Weak<PeerInner>>>, // ipv4 cryptkey routing - pub ipv6: spin::RwLock<IpLookupTable<Ipv6Addr, Weak<PeerInner>>>, // ipv6 cryptkey routing + pub recv: spin::RwLock<HashMap<u32, DecryptionState<T>>>, // receiver id -> decryption state + pub ipv4: spin::RwLock<IpLookupTable<Ipv4Addr, Weak<PeerInner<T>>>>, // ipv4 cryptkey routing + pub ipv6: spin::RwLock<IpLookupTable<Ipv6Addr, Weak<PeerInner<T>>>>, // ipv6 cryptkey routing } pub struct EncryptionState { @@ -37,17 +38,17 @@ pub struct EncryptionState { // (birth + reject-after-time - keepalive-timeout - rekey-timeout) } -pub struct DecryptionState { +pub struct DecryptionState<T: Opaque> { pub key: [u8; 32], pub keypair: Weak<KeyPair>, pub protector: spin::Mutex<AntiReplay>, - pub peer: Weak<PeerInner>, + pub peer: Weak<PeerInner<T>>, pub death: Instant, // time when the key can no longer be used for decryption } -pub struct Device(Arc<DeviceInner>); +pub struct Device<T: Opaque>(Arc<DeviceInner<T>>); -impl Drop for Device { +impl<T: Opaque> Drop for Device<T> { fn drop(&mut self) { // mark device as stopped let device = &self.0; @@ -63,9 +64,16 @@ impl Drop for Device { } } -impl Device { - pub fn new(workers: usize) -> Device { +impl<T: Opaque> Device<T> { + pub fn new<F1: Callback<T>, F2: Callback<T>>( + workers: usize, + event_recv: F1, + event_send: F2, + ) -> Device<T> { Device(Arc::new(DeviceInner { + event_recv: Box::new(event_recv), + event_send: Box::new(event_send), + event_new_handshake: (), threads: vec![], stopped: AtomicBool::new(false), injector: Injector::new(), @@ -80,8 +88,8 @@ impl Device { /// # Returns /// /// A atomic ref. counted peer (with liftime matching the device) - pub fn new_peer(&self) -> Peer { - peer::new_peer(self.0.clone()) + pub fn new_peer(&self, opaque: T) -> Peer<T> { + peer::new_peer(self.0.clone(), opaque) } /// Cryptkey routes and sends a plaintext message (IP packet) @@ -96,7 +104,7 @@ impl Device { /// This indicates that a handshake should be initated (see the handshake module). /// If this occurs the packet is copied to an internal buffer /// and retransmission can be attempted using send_run_queue - pub fn send(&self, pt_msg: &mut [u8]) -> Arc<Peer> { + pub fn send(&self, pt_msg: &mut [u8]) -> Arc<Peer<T>> { unimplemented!(); } diff --git a/src/router/mod.rs b/src/router/mod.rs index 6a4dd61..644dd61 100644 --- a/src/router/mod.rs +++ b/src/router/mod.rs @@ -1,6 +1,7 @@ mod anti_replay; mod buffer; mod device; +mod types; // mod inbound; mod workers; mod peer; diff --git a/src/router/peer.rs b/src/router/peer.rs index 9c1721a..71f387a 100644 --- a/src/router/peer.rs +++ b/src/router/peer.rs @@ -1,9 +1,8 @@ +use std::net::{IpAddr, SocketAddr}; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::mpsc::{sync_channel, SyncSender}; use std::sync::{Arc, Weak}; use std::thread; -use std::mem; -use std::net::{IpAddr, SocketAddr}; -use std::sync::mpsc::{sync_channel, SyncSender}; use spin; @@ -21,6 +20,8 @@ use super::device::DeviceInner; use super::device::EncryptionState; use super::workers::{worker_inbound, worker_outbound, JobInbound, JobOutbound}; +use super::types::Opaque; + const MAX_STAGED_PACKETS: usize = 128; pub struct KeyWheel { @@ -30,13 +31,14 @@ pub struct KeyWheel { retired: Option<u32>, // retired id (previous id, after confirming key-pair) } -pub struct PeerInner { +pub struct PeerInner<T: Opaque> { pub stopped: AtomicBool, - pub device: Arc<DeviceInner>, + pub opaque: T, + pub device: Arc<DeviceInner<T>>, pub thread_outbound: spin::Mutex<Option<thread::JoinHandle<()>>>, pub thread_inbound: spin::Mutex<Option<thread::JoinHandle<()>>>, pub queue_outbound: SyncSender<JobOutbound>, - pub queue_inbound: SyncSender<JobInbound>, + pub queue_inbound: SyncSender<JobInbound<T>>, pub staged_packets: spin::Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake pub rx_bytes: AtomicU64, // received bytes pub tx_bytes: AtomicU64, // transmitted bytes @@ -45,11 +47,11 @@ pub struct PeerInner { pub endpoint: spin::Mutex<Option<Arc<SocketAddr>>>, } -pub struct Peer(Arc<PeerInner>); +pub struct Peer<T: Opaque>(Arc<PeerInner<T>>); -fn treebit_list<A, R>( - peer: &Arc<PeerInner>, - table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner>>>, +fn treebit_list<A, R, T: Opaque>( + peer: &Arc<PeerInner<T>>, + table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner<T>>>>, callback: Box<dyn Fn(A, u32) -> R>, ) -> Vec<R> where @@ -67,10 +69,10 @@ where res } -fn treebit_remove<A>(peer: &Peer, table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner>>>) -where - A: Address, -{ +fn treebit_remove<A: Address, T: Opaque>( + peer: &Peer<T>, + table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner<T>>>>, +) { let mut m = table.write(); // collect keys for value @@ -91,9 +93,8 @@ where } } -impl Drop for Peer { +impl<T: Opaque> Drop for Peer<T> { fn drop(&mut self) { - // mark peer as stopped let peer = &self.0; @@ -144,11 +145,10 @@ impl Drop for Peer { *peer.ekey.lock() = None; *peer.endpoint.lock() = None; - } } -pub fn new_peer(device: Arc<DeviceInner>) -> Peer { +pub fn new_peer<T: Opaque>(device: Arc<DeviceInner<T>>, opaque: T) -> Peer<T> { // allocate in-order queues let (send_inbound, recv_inbound) = sync_channel(MAX_STAGED_PACKETS); let (send_outbound, recv_outbound) = sync_channel(MAX_STAGED_PACKETS); @@ -157,6 +157,7 @@ pub fn new_peer(device: Arc<DeviceInner>) -> Peer { let peer = { let device = device.clone(); Arc::new(PeerInner { + opaque, stopped: AtomicBool::new(false), device: device, ekey: spin::Mutex::new(None), @@ -198,8 +199,8 @@ pub fn new_peer(device: Arc<DeviceInner>) -> Peer { Peer(peer) } -impl Peer { - fn new(inner: PeerInner) -> Peer { +impl<T: Opaque> Peer<T> { + fn new(inner: PeerInner<T>) -> Peer<T> { Peer(Arc::new(inner)) } @@ -315,6 +316,5 @@ impl Peer { res } - pub fn send(&self, msg : Vec<u8>) { - } + pub fn send(&self, msg: Vec<u8>) {} } diff --git a/src/router/types.rs b/src/router/types.rs new file mode 100644 index 0000000..0b5e162 --- /dev/null +++ b/src/router/types.rs @@ -0,0 +1,7 @@ +pub trait Opaque: Send + Sync + 'static {} + +impl<T> Opaque for T where T: Send + Sync + 'static {} + +pub trait Callback<T>: Fn(&T, bool) -> () + Sync + Send + 'static {} + +impl<T, F> Callback<T> for F where F: Fn(&T, bool) -> () + Sync + Send + 'static {} diff --git a/src/router/workers.rs b/src/router/workers.rs index 2f7977c..4942491 100644 --- a/src/router/workers.rs +++ b/src/router/workers.rs @@ -10,6 +10,8 @@ use std::sync::mpsc::{sync_channel, Receiver, TryRecvError}; use std::sync::{Arc, Weak}; use std::thread; +use super::types::{Opaque, Callback}; + #[derive(PartialEq)] enum Operation { Encryption, @@ -32,7 +34,7 @@ pub struct JobInner { pub type JobBuffer = Arc<spin::Mutex<JobInner>>; pub type JobParallel = (Arc<thread::JoinHandle<()>>, JobBuffer); -pub type JobInbound = (Weak<DecryptionState>, JobBuffer); +pub type JobInbound<T> = (Weak<DecryptionState<T>>, JobBuffer); pub type JobOutbound = JobBuffer; /* Strategy for workers acquiring a new job: @@ -80,10 +82,10 @@ fn wait_recv<T>(stopped: &AtomicBool, recv: &Receiver<T>) -> Result<T, TryRecvEr return Err(TryRecvError::Disconnected); } -pub fn worker_inbound( - device: Arc<DeviceInner>, // related device - peer: Arc<PeerInner>, // related peer - recv: Receiver<JobInbound>, // in order queue +pub fn worker_inbound<T : Opaque>( + device: Arc<DeviceInner<T>>, // related device + peer: Arc<PeerInner<T>>, // related peer + recv: Receiver<JobInbound<T>>, // in order queue ) { loop { match wait_recv(&peer.stopped, &recv) { @@ -108,9 +110,9 @@ pub fn worker_inbound( } } -pub fn worker_outbound( - device: Arc<DeviceInner>, // related device - peer: Arc<PeerInner>, // related peer +pub fn worker_outbound<T : Opaque>( + device: Arc<DeviceInner<T>>, // related device + peer: Arc<PeerInner<T>>, // related peer recv: Receiver<JobOutbound>, // in order queue ) { loop { |