From d16521f4c75ebee6fa068461693755a5c1863e9f Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Sat, 31 Aug 2019 21:00:10 +0200 Subject: Added Bind trait to router --- src/main.rs | 9 +++++---- src/router/device.rs | 34 ++++++++++++++++++++-------------- src/router/peer.rs | 42 +++++++++++++++++++++--------------------- src/router/types.rs | 19 +++++++++++-------- src/router/workers.rs | 24 ++++++++++++------------ src/types/udp.rs | 2 +- 6 files changed, 70 insertions(+), 60 deletions(-) diff --git a/src/main.rs b/src/main.rs index cfe93eb..6d1d2e1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -52,14 +52,14 @@ impl Tun for TunTest { } } -struct Test {} +struct BindTest {} -impl Bind for Test { +impl Bind for BindTest { type Error = BindError; type Endpoint = SocketAddr; - fn new() -> Test { - Test {} + fn new() -> BindTest { + BindTest {} } fn set_port(&self, port: u16) -> Result<(), Self::Error> { @@ -111,6 +111,7 @@ fn main() { let router = router::Device::new( 4, TunTest {}, + BindTest {}, |t: &PeerTimer, data: bool, sent: bool| t.a.reset(Duration::from_millis(1000)), |t: &PeerTimer, data: bool, sent: bool| t.b.reset(Duration::from_millis(1000)), |t: &PeerTimer| println!("new key requested"), diff --git a/src/router/device.rs b/src/router/device.rs index 84f25c6..f04cf97 100644 --- a/src/router/device.rs +++ b/src/router/device.rs @@ -15,12 +15,13 @@ use super::anti_replay::AntiReplay; use super::peer; use super::peer::{Peer, PeerInner}; -use super::types::{Callback, Callbacks, CallbacksPhantom, KeyCallback, Opaque}; +use super::types::{Callback, Callbacks, KeyCallback, Opaque, PhantomCallbacks}; use super::workers::{worker_parallel, JobParallel}; -pub struct DeviceInner { +pub struct DeviceInner { // IO & timer generics pub tun: T, + pub bind: B, pub call_recv: C::CallbackRecv, pub call_send: C::CallbackSend, pub call_need_key: C::CallbackKey, @@ -31,9 +32,9 @@ pub struct DeviceInner { pub injector: Injector, // parallel enc/dec task injector // routing - pub recv: spin::RwLock>>, // receiver id -> decryption state - pub ipv4: spin::RwLock>>>, // ipv4 cryptkey routing - pub ipv6: spin::RwLock>>>, // ipv6 cryptkey routing + pub recv: spin::RwLock>>, // receiver id -> decryption state + pub ipv4: spin::RwLock>>>, // ipv4 cryptkey routing + pub ipv6: spin::RwLock>>>, // ipv6 cryptkey routing } pub struct EncryptionState { @@ -44,18 +45,21 @@ pub struct EncryptionState { // (birth + reject-after-time - keepalive-timeout - rekey-timeout) } -pub struct DecryptionState { +pub struct DecryptionState { pub key: [u8; 32], pub keypair: Weak, pub confirmed: AtomicBool, pub protector: spin::Mutex, - pub peer: Weak>, + pub peer: Weak>, pub death: Instant, // time when the key can no longer be used for decryption } -pub struct Device(Arc>, Vec>); +pub struct Device( + Arc>, + Vec>, +); -impl Drop for Device { +impl Drop for Device { fn drop(&mut self) { // mark device as stopped let device = &self.0; @@ -73,19 +77,21 @@ impl Drop for Device { } } -impl, S: Callback, K: KeyCallback, T: Tun> - Device, T> +impl, S: Callback, K: KeyCallback, T: Tun, B: Bind> + Device, T, B> { pub fn new( num_workers: usize, tun: T, + bind: B, call_recv: R, call_send: S, call_need_key: K, - ) -> Device, T> { + ) -> Device, T, B> { // allocate shared device state let inner = Arc::new(DeviceInner { tun, + bind, call_recv, call_send, call_need_key, @@ -122,13 +128,13 @@ impl, S: Callback, K: KeyCallback, T: Tun> } } -impl Device { +impl Device { /// Adds a new peer to the device /// /// # Returns /// /// A atomic ref. counted peer (with liftime matching the device) - pub fn new_peer(&self, opaque: C::Opaque) -> Peer { + pub fn new_peer(&self, opaque: C::Opaque) -> Peer { peer::new_peer(self.0.clone(), opaque) } diff --git a/src/router/peer.rs b/src/router/peer.rs index 647d24f..e21e69c 100644 --- a/src/router/peer.rs +++ b/src/router/peer.rs @@ -12,7 +12,7 @@ use treebitmap::address::Address; use treebitmap::IpLookupTable; use super::super::constants::*; -use super::super::types::{KeyPair, Tun}; +use super::super::types::{KeyPair, Tun, Bind}; use super::anti_replay::AntiReplay; use super::device::DecryptionState; @@ -31,29 +31,29 @@ pub struct KeyWheel { retired: Option, // retired id (previous id, after confirming key-pair) } -pub struct PeerInner { +pub struct PeerInner { pub stopped: AtomicBool, pub opaque: C::Opaque, - pub device: Arc>, + pub device: Arc>, pub thread_outbound: spin::Mutex>>, pub thread_inbound: spin::Mutex>>, pub queue_outbound: SyncSender, - pub queue_inbound: SyncSender>, + pub queue_inbound: SyncSender>, pub staged_packets: spin::Mutex; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake - pub rx_bytes: AtomicU64, // received bytes - pub tx_bytes: AtomicU64, // transmitted bytes + pub rx_bytes: AtomicU64, // received bytes + pub tx_bytes: AtomicU64, // transmitted bytes pub keys: spin::Mutex, // key-wheel pub ekey: spin::Mutex>, // encryption state pub endpoint: spin::Mutex>>, } -pub struct Peer( - Arc>, +pub struct Peer( + Arc>, ); -fn treebit_list( - peer: &Arc>, - table: &spin::RwLock>>>, +fn treebit_list( + peer: &Arc>, + table: &spin::RwLock>>>, callback: Box E>, ) -> Vec where @@ -71,9 +71,9 @@ where res } -fn treebit_remove( - peer: &Peer, - table: &spin::RwLock>>>, +fn treebit_remove( + peer: &Peer, + table: &spin::RwLock>>>, ) { let mut m = table.write(); @@ -95,7 +95,7 @@ fn treebit_remove( } } -impl Drop for Peer { +impl Drop for Peer { fn drop(&mut self) { // mark peer as stopped @@ -150,10 +150,10 @@ impl Drop for Peer { } } -pub fn new_peer( - device: Arc>, +pub fn new_peer( + device: Arc>, opaque: C::Opaque, -) -> Peer { +) -> Peer { // allocate in-order queues let (send_inbound, recv_inbound) = sync_channel(MAX_STAGED_PACKETS); let (send_outbound, recv_outbound) = sync_channel(MAX_STAGED_PACKETS); @@ -204,7 +204,7 @@ pub fn new_peer( Peer(peer) } -impl PeerInner { +impl PeerInner { pub fn confirm_key(&self, kp: Weak) { // upgrade key-pair to strong reference @@ -214,8 +214,8 @@ impl PeerInner { } } -impl Peer { - fn new(inner: PeerInner) -> Peer { +impl Peer { + fn new(inner: PeerInner) -> Peer { Peer(Arc::new(inner)) } diff --git a/src/router/types.rs b/src/router/types.rs index f6a0311..82dcd09 100644 --- a/src/router/types.rs +++ b/src/router/types.rs @@ -20,10 +20,6 @@ pub trait KeyCallback: Fn(&T) -> () + Sync + Send + 'static {} impl KeyCallback for F where F: Fn(&T) -> () + Sync + Send + 'static {} -pub trait TunCallback: Fn(&T, bool, bool) -> () + Sync + Send + 'static {} - -pub trait BindCallback: Fn(&T, bool, bool) -> () + Sync + Send + 'static {} - pub trait Endpoint: Send + Sync {} pub trait Callbacks: Send + Sync + 'static { @@ -33,16 +29,23 @@ pub trait Callbacks: Send + Sync + 'static { type CallbackKey: KeyCallback; } -pub struct CallbacksPhantom, S: Callback, K: KeyCallback> { +/* Concrete implementation of "Callbacks", + * used to hide the constituent type parameters. + * + * This type is never instantiated. + */ +pub struct PhantomCallbacks, S: Callback, K: KeyCallback> { _phantom_opaque: PhantomData, _phantom_recv: PhantomData, _phantom_send: PhantomData, - _phantom_key: PhantomData + _phantom_key: PhantomData, } -impl , S: Callback, K: KeyCallback> Callbacks for CallbacksPhantom { +impl, S: Callback, K: KeyCallback> Callbacks + for PhantomCallbacks +{ type Opaque = O; type CallbackRecv = R; type CallbackSend = S; type CallbackKey = K; -} \ No newline at end of file +} diff --git a/src/router/workers.rs b/src/router/workers.rs index 2b0b9ec..c4a9f18 100644 --- a/src/router/workers.rs +++ b/src/router/workers.rs @@ -17,7 +17,7 @@ use super::messages::TransportHeader; use super::peer::PeerInner; use super::types::Callbacks; -use super::super::types::Tun; +use super::super::types::{Tun, Bind}; #[derive(PartialEq, Debug)] pub enum Operation { @@ -41,7 +41,7 @@ pub struct JobInner { pub type JobBuffer = Arc>; pub type JobParallel = (Arc>, JobBuffer); -pub type JobInbound = (Weak>, JobBuffer); +pub type JobInbound = (Weak>, JobBuffer); pub type JobOutbound = JobBuffer; /* Strategy for workers acquiring a new job: @@ -89,10 +89,10 @@ fn wait_recv(running: &AtomicBool, recv: &Receiver) -> Result( - device: Arc>, // related device - peer: Arc>, // related peer - recv: Receiver>, // in order queue +pub fn worker_inbound( + device: Arc>, // related device + peer: Arc>, // related peer + recv: Receiver>, // in order queue ) { loop { match wait_recv(&peer.stopped, &recv) { @@ -157,10 +157,10 @@ pub fn worker_inbound( } } -pub fn worker_outbound( - device: Arc>, // related device - peer: Arc>, // related peer - recv: Receiver, // in order queue +pub fn worker_outbound( + device: Arc>, // related device + peer: Arc>, // related peer + recv: Receiver, // in order queue ) { loop { match wait_recv(&peer.stopped, &recv) { @@ -205,8 +205,8 @@ pub fn worker_outbound( } } -pub fn worker_parallel( - device: Arc>, +pub fn worker_parallel( + device: Arc>, local: Worker, // local job queue (local to thread) stealers: Vec>, // stealers (from other threads) ) { diff --git a/src/types/udp.rs b/src/types/udp.rs index 4bf0a9c..71d5a79 100644 --- a/src/types/udp.rs +++ b/src/types/udp.rs @@ -3,7 +3,7 @@ use std::error; /* Often times an a file descriptor in an atomic might suffice. */ -pub trait Bind: Send + Sync { +pub trait Bind: Send + Sync + 'static { type Error: error::Error; type Endpoint: Endpoint; -- cgit v1.2.3-59-g8ed1b