diff options
-rw-r--r-- | src/main.rs | 5 | ||||
-rw-r--r-- | src/router/device.rs | 60 | ||||
-rw-r--r-- | src/router/peer.rs | 38 | ||||
-rw-r--r-- | src/router/types.rs | 16 | ||||
-rw-r--r-- | src/router/workers.rs | 18 |
5 files changed, 77 insertions, 60 deletions
diff --git a/src/main.rs b/src/main.rs index ae90251..5c58b24 100644 --- a/src/main.rs +++ b/src/main.rs @@ -27,8 +27,9 @@ fn main() { let router = router::Device::new( 4, - |t: &PeerTimer, data: bool| t.a.reset(Duration::from_millis(1000)), - |t: &PeerTimer, data: bool| t.b.reset(Duration::from_millis(1000)), + |t: &PeerTimer, data: bool, sent: bool| t.a.reset(Duration::from_millis(1000)), + |t: &PeerTimer, data: bool, sent: bool| t.b.reset(Duration::from_millis(1000)), + |t: &PeerTimer| println!("new key requested"), ); let pt = PeerTimer { diff --git a/src/router/device.rs b/src/router/device.rs index d1a7d93..3670cb5 100644 --- a/src/router/device.rs +++ b/src/router/device.rs @@ -14,20 +14,23 @@ use super::anti_replay::AntiReplay; use super::peer; use super::peer::{Peer, PeerInner}; -use super::types::{Callback, Opaque}; - -pub struct DeviceInner<T: Opaque> { - // callbacks (used for timers) - pub event_recv: Box<dyn Callback<T>>, // authenticated message received - pub event_send: Box<dyn Callback<T>>, // authenticated message send - pub event_new_handshake: (), // called when a new handshake is required +use super::types::{Callback, KeyCallback, Opaque}; +pub struct DeviceInner<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>> { + // threading and workers pub stopped: AtomicBool, pub injector: Injector<()>, // parallel enc/dec task injector pub threads: Vec<thread::JoinHandle<()>>, // join handles of worker threads - pub recv: spin::RwLock<HashMap<u32, DecryptionState<T>>>, // receiver id -> decryption state - pub ipv4: spin::RwLock<IpLookupTable<Ipv4Addr, Weak<PeerInner<T>>>>, // ipv4 cryptkey routing - pub ipv6: spin::RwLock<IpLookupTable<Ipv6Addr, Weak<PeerInner<T>>>>, // ipv6 cryptkey routing + + // unboxed callbacks (used for timers and handshake requests) + pub event_send: S, // called when authenticated message send + pub event_recv: R, // called when authenticated message received + pub event_need_key: K, // called when new key material is required + + // routing + pub recv: spin::RwLock<HashMap<u32, DecryptionState<T, S, R, K>>>, // receiver id -> decryption state + pub ipv4: spin::RwLock<IpLookupTable<Ipv4Addr, Weak<PeerInner<T, S, R, K>>>>, // ipv4 cryptkey routing + pub ipv6: spin::RwLock<IpLookupTable<Ipv6Addr, Weak<PeerInner<T, S, R, K>>>>, // ipv6 cryptkey routing } pub struct EncryptionState { @@ -38,17 +41,19 @@ pub struct EncryptionState { // (birth + reject-after-time - keepalive-timeout - rekey-timeout) } -pub struct DecryptionState<T: Opaque> { +pub struct DecryptionState<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>> { pub key: [u8; 32], pub keypair: Weak<KeyPair>, pub protector: spin::Mutex<AntiReplay>, - pub peer: Weak<PeerInner<T>>, + pub peer: Weak<PeerInner<T, S, R, K>>, pub death: Instant, // time when the key can no longer be used for decryption } -pub struct Device<T: Opaque>(Arc<DeviceInner<T>>); +pub struct Device<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>>( + Arc<DeviceInner<T, S, R, K>>, +); -impl<T: Opaque> Drop for Device<T> { +impl<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>> Drop for Device<T, S, R, K> { fn drop(&mut self) { // mark device as stopped let device = &self.0; @@ -64,16 +69,17 @@ impl<T: Opaque> Drop for Device<T> { } } -impl<T: Opaque> Device<T> { - pub fn new<F1: Callback<T>, F2: Callback<T>>( +impl<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>> Device<T, S, R, K> { + pub fn new( workers: usize, - event_recv: F1, - event_send: F2, - ) -> Device<T> { + event_recv: R, + event_send: S, + event_need_key: K, + ) -> Device<T, S, R, K> { Device(Arc::new(DeviceInner { - event_recv: Box::new(event_recv), - event_send: Box::new(event_send), - event_new_handshake: (), + event_recv, + event_send, + event_need_key, threads: vec![], stopped: AtomicBool::new(false), injector: Injector::new(), @@ -88,7 +94,7 @@ impl<T: Opaque> Device<T> { /// # Returns /// /// A atomic ref. counted peer (with liftime matching the device) - pub fn new_peer(&self, opaque: T) -> Peer<T> { + pub fn new_peer(&self, opaque: T) -> Peer<T, S, R, K> { peer::new_peer(self.0.clone(), opaque) } @@ -98,13 +104,7 @@ impl<T: Opaque> Device<T> { /// /// - pt_msg: IP packet to cryptkey route /// - /// # Returns - /// - /// A peer reference for the peer if no key-pair is currently valid for the destination. - /// This indicates that a handshake should be initated (see the handshake module). - /// If this occurs the packet is copied to an internal buffer - /// and retransmission can be attempted using send_run_queue - pub fn send(&self, pt_msg: &mut [u8]) -> Arc<Peer<T>> { + pub fn send(&self, pt_msg: &mut [u8]) { unimplemented!(); } diff --git a/src/router/peer.rs b/src/router/peer.rs index 71f387a..0598b3a 100644 --- a/src/router/peer.rs +++ b/src/router/peer.rs @@ -20,7 +20,7 @@ use super::device::DeviceInner; use super::device::EncryptionState; use super::workers::{worker_inbound, worker_outbound, JobInbound, JobOutbound}; -use super::types::Opaque; +use super::types::{Opaque, Callback, KeyCallback}; const MAX_STAGED_PACKETS: usize = 128; @@ -31,14 +31,14 @@ pub struct KeyWheel { retired: Option<u32>, // retired id (previous id, after confirming key-pair) } -pub struct PeerInner<T: Opaque> { +pub struct PeerInner<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>> { pub stopped: AtomicBool, pub opaque: T, - pub device: Arc<DeviceInner<T>>, + pub device: Arc<DeviceInner<T, S, R, K>>, pub thread_outbound: spin::Mutex<Option<thread::JoinHandle<()>>>, pub thread_inbound: spin::Mutex<Option<thread::JoinHandle<()>>>, pub queue_outbound: SyncSender<JobOutbound>, - pub queue_inbound: SyncSender<JobInbound<T>>, + pub queue_inbound: SyncSender<JobInbound<T, S, R, K>>, pub staged_packets: spin::Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake pub rx_bytes: AtomicU64, // received bytes pub tx_bytes: AtomicU64, // transmitted bytes @@ -47,13 +47,13 @@ pub struct PeerInner<T: Opaque> { pub endpoint: spin::Mutex<Option<Arc<SocketAddr>>>, } -pub struct Peer<T: Opaque>(Arc<PeerInner<T>>); +pub struct Peer<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>>(Arc<PeerInner<T, S, R, K>>); -fn treebit_list<A, R, T: Opaque>( - peer: &Arc<PeerInner<T>>, - table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner<T>>>>, - callback: Box<dyn Fn(A, u32) -> R>, -) -> Vec<R> +fn treebit_list<A, O, T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>>( + peer: &Arc<PeerInner<T, S, R, K>>, + table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner<T, S, R, K>>>>, + callback: Box<dyn Fn(A, u32) -> O>, +) -> Vec<O> where A: Address, { @@ -69,9 +69,9 @@ where res } -fn treebit_remove<A: Address, T: Opaque>( - peer: &Peer<T>, - table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner<T>>>>, +fn treebit_remove<A: Address, T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>>( + peer: &Peer<T, S, R, K>, + table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner<T, S, R, K>>>>, ) { let mut m = table.write(); @@ -93,7 +93,7 @@ fn treebit_remove<A: Address, T: Opaque>( } } -impl<T: Opaque> Drop for Peer<T> { +impl<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>> Drop for Peer<T, S, R, K> { fn drop(&mut self) { // mark peer as stopped @@ -148,7 +148,11 @@ impl<T: Opaque> Drop for Peer<T> { } } -pub fn new_peer<T: Opaque>(device: Arc<DeviceInner<T>>, opaque: T) -> Peer<T> { +pub fn new_peer<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>>( + device: Arc<DeviceInner<T, S, R, K>>, + opaque: T +) -> Peer<T, S, R, K> { + // allocate in-order queues let (send_inbound, recv_inbound) = sync_channel(MAX_STAGED_PACKETS); let (send_outbound, recv_outbound) = sync_channel(MAX_STAGED_PACKETS); @@ -199,8 +203,8 @@ pub fn new_peer<T: Opaque>(device: Arc<DeviceInner<T>>, opaque: T) -> Peer<T> { Peer(peer) } -impl<T: Opaque> Peer<T> { - fn new(inner: PeerInner<T>) -> Peer<T> { +impl<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>> Peer<T, S, R, K> { + fn new(inner: PeerInner<T, S, R, K>) -> Peer<T, S, R, K> { Peer(Arc::new(inner)) } diff --git a/src/router/types.rs b/src/router/types.rs index 0b5e162..2ed011b 100644 --- a/src/router/types.rs +++ b/src/router/types.rs @@ -2,6 +2,18 @@ pub trait Opaque: Send + Sync + 'static {} impl<T> Opaque for T where T: Send + Sync + 'static {} -pub trait Callback<T>: Fn(&T, bool) -> () + Sync + Send + 'static {} +/// A send/recv callback takes 3 arguments: +/// +/// * `0`, a reference to the opaque value assigned to the peer +/// * `1`, a bool indicating whether the message contained data (not just keepalive) +/// * `2`, a bool indicating whether the message was transmitted (i.e. did the peer have an associated endpoint?) +pub trait Callback<T>: Fn(&T, bool, bool) -> () + Sync + Send + 'static {} -impl<T, F> Callback<T> for F where F: Fn(&T, bool) -> () + Sync + Send + 'static {} +impl<T, F> Callback<T> for F where F: Fn(&T, bool, bool) -> () + Sync + Send + 'static {} + +/// A key callback takes 1 argument +/// +/// * `0`, a reference to the opaque value assigned to the peer +pub trait KeyCallback<T>: Fn(&T) -> () + Sync + Send + 'static {} + +impl<T, F> KeyCallback<T> for F where F: Fn(&T) -> () + Sync + Send + 'static {}
\ No newline at end of file diff --git a/src/router/workers.rs b/src/router/workers.rs index 4942491..4f39fb2 100644 --- a/src/router/workers.rs +++ b/src/router/workers.rs @@ -10,7 +10,7 @@ use std::sync::mpsc::{sync_channel, Receiver, TryRecvError}; use std::sync::{Arc, Weak}; use std::thread; -use super::types::{Opaque, Callback}; +use super::types::{Opaque, Callback, KeyCallback}; #[derive(PartialEq)] enum Operation { @@ -34,7 +34,7 @@ pub struct JobInner { pub type JobBuffer = Arc<spin::Mutex<JobInner>>; pub type JobParallel = (Arc<thread::JoinHandle<()>>, JobBuffer); -pub type JobInbound<T> = (Weak<DecryptionState<T>>, JobBuffer); +pub type JobInbound<T, S, R, K> = (Weak<DecryptionState<T, S, R, K>>, JobBuffer); pub type JobOutbound = JobBuffer; /* Strategy for workers acquiring a new job: @@ -82,10 +82,10 @@ fn wait_recv<T>(stopped: &AtomicBool, recv: &Receiver<T>) -> Result<T, TryRecvEr return Err(TryRecvError::Disconnected); } -pub fn worker_inbound<T : Opaque>( - device: Arc<DeviceInner<T>>, // related device - peer: Arc<PeerInner<T>>, // related peer - recv: Receiver<JobInbound<T>>, // in order queue +pub fn worker_inbound<T : Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>>( + device: Arc<DeviceInner<T, S, R, K>>, // related device + peer: Arc<PeerInner<T, S, R, K>>, // related peer + recv: Receiver<JobInbound<T, S, R, K>>, // in order queue ) { loop { match wait_recv(&peer.stopped, &recv) { @@ -110,9 +110,9 @@ pub fn worker_inbound<T : Opaque>( } } -pub fn worker_outbound<T : Opaque>( - device: Arc<DeviceInner<T>>, // related device - peer: Arc<PeerInner<T>>, // related peer +pub fn worker_outbound<T : Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>>( + device: Arc<DeviceInner<T, S, R, K>>, // related device + peer: Arc<PeerInner<T, S, R, K>>, // related peer recv: Receiver<JobOutbound>, // in order queue ) { loop { |