aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/main.rs30
-rw-r--r--src/router/device.rs56
-rw-r--r--src/router/mod.rs1
-rw-r--r--src/router/peer.rs44
-rw-r--r--src/router/types.rs7
-rw-r--r--src/router/workers.rs18
6 files changed, 97 insertions, 59 deletions
diff --git a/src/main.rs b/src/main.rs
index eab4b61..ae90251 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -5,18 +5,38 @@ mod handshake;
mod router;
mod types;
+use hjul::*;
+
use std::sync::Arc;
+use std::time::Duration;
use sodiumoxide;
use types::KeyPair;
+#[derive(Debug, Clone)]
+struct PeerTimer {
+ a: Timer,
+ b: Timer,
+}
+
fn main() {
+ let runner = Runner::new(Duration::from_millis(100), 1000, 1024);
+
// choose optimal crypto implementations for platform
sodiumoxide::init().unwrap();
- let mut router = router::Device::new(8);
- {
- let peer = router.new_peer();
- }
- loop {}
+ let router = router::Device::new(
+ 4,
+ |t: &PeerTimer, data: bool| t.a.reset(Duration::from_millis(1000)),
+ |t: &PeerTimer, data: bool| t.b.reset(Duration::from_millis(1000)),
+ );
+
+ let pt = PeerTimer {
+ a: runner.timer(|| println!("timer-a fired for peer")),
+ b: runner.timer(|| println!("timer-b fired for peer")),
+ };
+
+ let peer = router.new_peer(pt.clone());
+
+ println!("{:?}", pt);
}
diff --git a/src/router/device.rs b/src/router/device.rs
index 974d019..d1a7d93 100644
--- a/src/router/device.rs
+++ b/src/router/device.rs
@@ -1,32 +1,33 @@
-use arraydeque::{ArrayDeque, Wrapping};
-use treebitmap::address::Address;
-use treebitmap::IpLookupTable;
-
-use crossbeam_deque::{Injector, Steal};
use std::collections::HashMap;
-use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
-use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
-use std::sync::mpsc::{sync_channel, SyncSender};
-use std::sync::{Arc, Mutex, Weak};
+use std::net::{Ipv4Addr, Ipv6Addr};
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::{Arc, Weak};
use std::thread;
use std::time::Instant;
+use crossbeam_deque::{Injector, Steal};
use spin;
+use treebitmap::IpLookupTable;
-use super::super::constants::*;
use super::super::types::KeyPair;
use super::anti_replay::AntiReplay;
use super::peer;
use super::peer::{Peer, PeerInner};
-use super::workers::worker_parallel;
-pub struct DeviceInner {
+use super::types::{Callback, Opaque};
+
+pub struct DeviceInner<T: Opaque> {
+ // callbacks (used for timers)
+ pub event_recv: Box<dyn Callback<T>>, // authenticated message received
+ pub event_send: Box<dyn Callback<T>>, // authenticated message send
+ pub event_new_handshake: (), // called when a new handshake is required
+
pub stopped: AtomicBool,
pub injector: Injector<()>, // parallel enc/dec task injector
pub threads: Vec<thread::JoinHandle<()>>, // join handles of worker threads
- pub recv: spin::RwLock<HashMap<u32, DecryptionState>>, // receiver id -> decryption state
- pub ipv4: spin::RwLock<IpLookupTable<Ipv4Addr, Weak<PeerInner>>>, // ipv4 cryptkey routing
- pub ipv6: spin::RwLock<IpLookupTable<Ipv6Addr, Weak<PeerInner>>>, // ipv6 cryptkey routing
+ pub recv: spin::RwLock<HashMap<u32, DecryptionState<T>>>, // receiver id -> decryption state
+ pub ipv4: spin::RwLock<IpLookupTable<Ipv4Addr, Weak<PeerInner<T>>>>, // ipv4 cryptkey routing
+ pub ipv6: spin::RwLock<IpLookupTable<Ipv6Addr, Weak<PeerInner<T>>>>, // ipv6 cryptkey routing
}
pub struct EncryptionState {
@@ -37,17 +38,17 @@ pub struct EncryptionState {
// (birth + reject-after-time - keepalive-timeout - rekey-timeout)
}
-pub struct DecryptionState {
+pub struct DecryptionState<T: Opaque> {
pub key: [u8; 32],
pub keypair: Weak<KeyPair>,
pub protector: spin::Mutex<AntiReplay>,
- pub peer: Weak<PeerInner>,
+ pub peer: Weak<PeerInner<T>>,
pub death: Instant, // time when the key can no longer be used for decryption
}
-pub struct Device(Arc<DeviceInner>);
+pub struct Device<T: Opaque>(Arc<DeviceInner<T>>);
-impl Drop for Device {
+impl<T: Opaque> Drop for Device<T> {
fn drop(&mut self) {
// mark device as stopped
let device = &self.0;
@@ -63,9 +64,16 @@ impl Drop for Device {
}
}
-impl Device {
- pub fn new(workers: usize) -> Device {
+impl<T: Opaque> Device<T> {
+ pub fn new<F1: Callback<T>, F2: Callback<T>>(
+ workers: usize,
+ event_recv: F1,
+ event_send: F2,
+ ) -> Device<T> {
Device(Arc::new(DeviceInner {
+ event_recv: Box::new(event_recv),
+ event_send: Box::new(event_send),
+ event_new_handshake: (),
threads: vec![],
stopped: AtomicBool::new(false),
injector: Injector::new(),
@@ -80,8 +88,8 @@ impl Device {
/// # Returns
///
/// A atomic ref. counted peer (with liftime matching the device)
- pub fn new_peer(&self) -> Peer {
- peer::new_peer(self.0.clone())
+ pub fn new_peer(&self, opaque: T) -> Peer<T> {
+ peer::new_peer(self.0.clone(), opaque)
}
/// Cryptkey routes and sends a plaintext message (IP packet)
@@ -96,7 +104,7 @@ impl Device {
/// This indicates that a handshake should be initated (see the handshake module).
/// If this occurs the packet is copied to an internal buffer
/// and retransmission can be attempted using send_run_queue
- pub fn send(&self, pt_msg: &mut [u8]) -> Arc<Peer> {
+ pub fn send(&self, pt_msg: &mut [u8]) -> Arc<Peer<T>> {
unimplemented!();
}
diff --git a/src/router/mod.rs b/src/router/mod.rs
index 6a4dd61..644dd61 100644
--- a/src/router/mod.rs
+++ b/src/router/mod.rs
@@ -1,6 +1,7 @@
mod anti_replay;
mod buffer;
mod device;
+mod types;
// mod inbound;
mod workers;
mod peer;
diff --git a/src/router/peer.rs b/src/router/peer.rs
index 9c1721a..71f387a 100644
--- a/src/router/peer.rs
+++ b/src/router/peer.rs
@@ -1,9 +1,8 @@
+use std::net::{IpAddr, SocketAddr};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
+use std::sync::mpsc::{sync_channel, SyncSender};
use std::sync::{Arc, Weak};
use std::thread;
-use std::mem;
-use std::net::{IpAddr, SocketAddr};
-use std::sync::mpsc::{sync_channel, SyncSender};
use spin;
@@ -21,6 +20,8 @@ use super::device::DeviceInner;
use super::device::EncryptionState;
use super::workers::{worker_inbound, worker_outbound, JobInbound, JobOutbound};
+use super::types::Opaque;
+
const MAX_STAGED_PACKETS: usize = 128;
pub struct KeyWheel {
@@ -30,13 +31,14 @@ pub struct KeyWheel {
retired: Option<u32>, // retired id (previous id, after confirming key-pair)
}
-pub struct PeerInner {
+pub struct PeerInner<T: Opaque> {
pub stopped: AtomicBool,
- pub device: Arc<DeviceInner>,
+ pub opaque: T,
+ pub device: Arc<DeviceInner<T>>,
pub thread_outbound: spin::Mutex<Option<thread::JoinHandle<()>>>,
pub thread_inbound: spin::Mutex<Option<thread::JoinHandle<()>>>,
pub queue_outbound: SyncSender<JobOutbound>,
- pub queue_inbound: SyncSender<JobInbound>,
+ pub queue_inbound: SyncSender<JobInbound<T>>,
pub staged_packets: spin::Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake
pub rx_bytes: AtomicU64, // received bytes
pub tx_bytes: AtomicU64, // transmitted bytes
@@ -45,11 +47,11 @@ pub struct PeerInner {
pub endpoint: spin::Mutex<Option<Arc<SocketAddr>>>,
}
-pub struct Peer(Arc<PeerInner>);
+pub struct Peer<T: Opaque>(Arc<PeerInner<T>>);
-fn treebit_list<A, R>(
- peer: &Arc<PeerInner>,
- table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner>>>,
+fn treebit_list<A, R, T: Opaque>(
+ peer: &Arc<PeerInner<T>>,
+ table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner<T>>>>,
callback: Box<dyn Fn(A, u32) -> R>,
) -> Vec<R>
where
@@ -67,10 +69,10 @@ where
res
}
-fn treebit_remove<A>(peer: &Peer, table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner>>>)
-where
- A: Address,
-{
+fn treebit_remove<A: Address, T: Opaque>(
+ peer: &Peer<T>,
+ table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner<T>>>>,
+) {
let mut m = table.write();
// collect keys for value
@@ -91,9 +93,8 @@ where
}
}
-impl Drop for Peer {
+impl<T: Opaque> Drop for Peer<T> {
fn drop(&mut self) {
-
// mark peer as stopped
let peer = &self.0;
@@ -144,11 +145,10 @@ impl Drop for Peer {
*peer.ekey.lock() = None;
*peer.endpoint.lock() = None;
-
}
}
-pub fn new_peer(device: Arc<DeviceInner>) -> Peer {
+pub fn new_peer<T: Opaque>(device: Arc<DeviceInner<T>>, opaque: T) -> Peer<T> {
// allocate in-order queues
let (send_inbound, recv_inbound) = sync_channel(MAX_STAGED_PACKETS);
let (send_outbound, recv_outbound) = sync_channel(MAX_STAGED_PACKETS);
@@ -157,6 +157,7 @@ pub fn new_peer(device: Arc<DeviceInner>) -> Peer {
let peer = {
let device = device.clone();
Arc::new(PeerInner {
+ opaque,
stopped: AtomicBool::new(false),
device: device,
ekey: spin::Mutex::new(None),
@@ -198,8 +199,8 @@ pub fn new_peer(device: Arc<DeviceInner>) -> Peer {
Peer(peer)
}
-impl Peer {
- fn new(inner: PeerInner) -> Peer {
+impl<T: Opaque> Peer<T> {
+ fn new(inner: PeerInner<T>) -> Peer<T> {
Peer(Arc::new(inner))
}
@@ -315,6 +316,5 @@ impl Peer {
res
}
- pub fn send(&self, msg : Vec<u8>) {
- }
+ pub fn send(&self, msg: Vec<u8>) {}
}
diff --git a/src/router/types.rs b/src/router/types.rs
new file mode 100644
index 0000000..0b5e162
--- /dev/null
+++ b/src/router/types.rs
@@ -0,0 +1,7 @@
+pub trait Opaque: Send + Sync + 'static {}
+
+impl<T> Opaque for T where T: Send + Sync + 'static {}
+
+pub trait Callback<T>: Fn(&T, bool) -> () + Sync + Send + 'static {}
+
+impl<T, F> Callback<T> for F where F: Fn(&T, bool) -> () + Sync + Send + 'static {}
diff --git a/src/router/workers.rs b/src/router/workers.rs
index 2f7977c..4942491 100644
--- a/src/router/workers.rs
+++ b/src/router/workers.rs
@@ -10,6 +10,8 @@ use std::sync::mpsc::{sync_channel, Receiver, TryRecvError};
use std::sync::{Arc, Weak};
use std::thread;
+use super::types::{Opaque, Callback};
+
#[derive(PartialEq)]
enum Operation {
Encryption,
@@ -32,7 +34,7 @@ pub struct JobInner {
pub type JobBuffer = Arc<spin::Mutex<JobInner>>;
pub type JobParallel = (Arc<thread::JoinHandle<()>>, JobBuffer);
-pub type JobInbound = (Weak<DecryptionState>, JobBuffer);
+pub type JobInbound<T> = (Weak<DecryptionState<T>>, JobBuffer);
pub type JobOutbound = JobBuffer;
/* Strategy for workers acquiring a new job:
@@ -80,10 +82,10 @@ fn wait_recv<T>(stopped: &AtomicBool, recv: &Receiver<T>) -> Result<T, TryRecvEr
return Err(TryRecvError::Disconnected);
}
-pub fn worker_inbound(
- device: Arc<DeviceInner>, // related device
- peer: Arc<PeerInner>, // related peer
- recv: Receiver<JobInbound>, // in order queue
+pub fn worker_inbound<T : Opaque>(
+ device: Arc<DeviceInner<T>>, // related device
+ peer: Arc<PeerInner<T>>, // related peer
+ recv: Receiver<JobInbound<T>>, // in order queue
) {
loop {
match wait_recv(&peer.stopped, &recv) {
@@ -108,9 +110,9 @@ pub fn worker_inbound(
}
}
-pub fn worker_outbound(
- device: Arc<DeviceInner>, // related device
- peer: Arc<PeerInner>, // related peer
+pub fn worker_outbound<T : Opaque>(
+ device: Arc<DeviceInner<T>>, // related device
+ peer: Arc<PeerInner<T>>, // related peer
recv: Receiver<JobOutbound>, // in order queue
) {
loop {