summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2019-08-27 11:28:20 +0200
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2019-08-27 11:28:20 +0200
commita80e64014c21e092e35080baf29b2611d18c486a (patch)
tree6d9c044a54aa5a8c48a121397a1c4a89b37be1dc
parentWork on callback structure for cryptkey router (diff)
downloadwireguard-rs-a80e64014c21e092e35080baf29b2611d18c486a.tar.xz
wireguard-rs-a80e64014c21e092e35080baf29b2611d18c486a.zip
Unbox callback closures
Accepted the more verbose type signatures and added a callback to request new key-material.
-rw-r--r--src/main.rs5
-rw-r--r--src/router/device.rs60
-rw-r--r--src/router/peer.rs38
-rw-r--r--src/router/types.rs16
-rw-r--r--src/router/workers.rs18
5 files changed, 77 insertions, 60 deletions
diff --git a/src/main.rs b/src/main.rs
index ae90251..5c58b24 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -27,8 +27,9 @@ fn main() {
let router = router::Device::new(
4,
- |t: &PeerTimer, data: bool| t.a.reset(Duration::from_millis(1000)),
- |t: &PeerTimer, data: bool| t.b.reset(Duration::from_millis(1000)),
+ |t: &PeerTimer, data: bool, sent: bool| t.a.reset(Duration::from_millis(1000)),
+ |t: &PeerTimer, data: bool, sent: bool| t.b.reset(Duration::from_millis(1000)),
+ |t: &PeerTimer| println!("new key requested"),
);
let pt = PeerTimer {
diff --git a/src/router/device.rs b/src/router/device.rs
index d1a7d93..3670cb5 100644
--- a/src/router/device.rs
+++ b/src/router/device.rs
@@ -14,20 +14,23 @@ use super::anti_replay::AntiReplay;
use super::peer;
use super::peer::{Peer, PeerInner};
-use super::types::{Callback, Opaque};
-
-pub struct DeviceInner<T: Opaque> {
- // callbacks (used for timers)
- pub event_recv: Box<dyn Callback<T>>, // authenticated message received
- pub event_send: Box<dyn Callback<T>>, // authenticated message send
- pub event_new_handshake: (), // called when a new handshake is required
+use super::types::{Callback, KeyCallback, Opaque};
+pub struct DeviceInner<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>> {
+ // threading and workers
pub stopped: AtomicBool,
pub injector: Injector<()>, // parallel enc/dec task injector
pub threads: Vec<thread::JoinHandle<()>>, // join handles of worker threads
- pub recv: spin::RwLock<HashMap<u32, DecryptionState<T>>>, // receiver id -> decryption state
- pub ipv4: spin::RwLock<IpLookupTable<Ipv4Addr, Weak<PeerInner<T>>>>, // ipv4 cryptkey routing
- pub ipv6: spin::RwLock<IpLookupTable<Ipv6Addr, Weak<PeerInner<T>>>>, // ipv6 cryptkey routing
+
+ // unboxed callbacks (used for timers and handshake requests)
+ pub event_send: S, // called when authenticated message send
+ pub event_recv: R, // called when authenticated message received
+ pub event_need_key: K, // called when new key material is required
+
+ // routing
+ pub recv: spin::RwLock<HashMap<u32, DecryptionState<T, S, R, K>>>, // receiver id -> decryption state
+ pub ipv4: spin::RwLock<IpLookupTable<Ipv4Addr, Weak<PeerInner<T, S, R, K>>>>, // ipv4 cryptkey routing
+ pub ipv6: spin::RwLock<IpLookupTable<Ipv6Addr, Weak<PeerInner<T, S, R, K>>>>, // ipv6 cryptkey routing
}
pub struct EncryptionState {
@@ -38,17 +41,19 @@ pub struct EncryptionState {
// (birth + reject-after-time - keepalive-timeout - rekey-timeout)
}
-pub struct DecryptionState<T: Opaque> {
+pub struct DecryptionState<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>> {
pub key: [u8; 32],
pub keypair: Weak<KeyPair>,
pub protector: spin::Mutex<AntiReplay>,
- pub peer: Weak<PeerInner<T>>,
+ pub peer: Weak<PeerInner<T, S, R, K>>,
pub death: Instant, // time when the key can no longer be used for decryption
}
-pub struct Device<T: Opaque>(Arc<DeviceInner<T>>);
+pub struct Device<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>>(
+ Arc<DeviceInner<T, S, R, K>>,
+);
-impl<T: Opaque> Drop for Device<T> {
+impl<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>> Drop for Device<T, S, R, K> {
fn drop(&mut self) {
// mark device as stopped
let device = &self.0;
@@ -64,16 +69,17 @@ impl<T: Opaque> Drop for Device<T> {
}
}
-impl<T: Opaque> Device<T> {
- pub fn new<F1: Callback<T>, F2: Callback<T>>(
+impl<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>> Device<T, S, R, K> {
+ pub fn new(
workers: usize,
- event_recv: F1,
- event_send: F2,
- ) -> Device<T> {
+ event_recv: R,
+ event_send: S,
+ event_need_key: K,
+ ) -> Device<T, S, R, K> {
Device(Arc::new(DeviceInner {
- event_recv: Box::new(event_recv),
- event_send: Box::new(event_send),
- event_new_handshake: (),
+ event_recv,
+ event_send,
+ event_need_key,
threads: vec![],
stopped: AtomicBool::new(false),
injector: Injector::new(),
@@ -88,7 +94,7 @@ impl<T: Opaque> Device<T> {
/// # Returns
///
/// A atomic ref. counted peer (with liftime matching the device)
- pub fn new_peer(&self, opaque: T) -> Peer<T> {
+ pub fn new_peer(&self, opaque: T) -> Peer<T, S, R, K> {
peer::new_peer(self.0.clone(), opaque)
}
@@ -98,13 +104,7 @@ impl<T: Opaque> Device<T> {
///
/// - pt_msg: IP packet to cryptkey route
///
- /// # Returns
- ///
- /// A peer reference for the peer if no key-pair is currently valid for the destination.
- /// This indicates that a handshake should be initated (see the handshake module).
- /// If this occurs the packet is copied to an internal buffer
- /// and retransmission can be attempted using send_run_queue
- pub fn send(&self, pt_msg: &mut [u8]) -> Arc<Peer<T>> {
+ pub fn send(&self, pt_msg: &mut [u8]) {
unimplemented!();
}
diff --git a/src/router/peer.rs b/src/router/peer.rs
index 71f387a..0598b3a 100644
--- a/src/router/peer.rs
+++ b/src/router/peer.rs
@@ -20,7 +20,7 @@ use super::device::DeviceInner;
use super::device::EncryptionState;
use super::workers::{worker_inbound, worker_outbound, JobInbound, JobOutbound};
-use super::types::Opaque;
+use super::types::{Opaque, Callback, KeyCallback};
const MAX_STAGED_PACKETS: usize = 128;
@@ -31,14 +31,14 @@ pub struct KeyWheel {
retired: Option<u32>, // retired id (previous id, after confirming key-pair)
}
-pub struct PeerInner<T: Opaque> {
+pub struct PeerInner<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>> {
pub stopped: AtomicBool,
pub opaque: T,
- pub device: Arc<DeviceInner<T>>,
+ pub device: Arc<DeviceInner<T, S, R, K>>,
pub thread_outbound: spin::Mutex<Option<thread::JoinHandle<()>>>,
pub thread_inbound: spin::Mutex<Option<thread::JoinHandle<()>>>,
pub queue_outbound: SyncSender<JobOutbound>,
- pub queue_inbound: SyncSender<JobInbound<T>>,
+ pub queue_inbound: SyncSender<JobInbound<T, S, R, K>>,
pub staged_packets: spin::Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake
pub rx_bytes: AtomicU64, // received bytes
pub tx_bytes: AtomicU64, // transmitted bytes
@@ -47,13 +47,13 @@ pub struct PeerInner<T: Opaque> {
pub endpoint: spin::Mutex<Option<Arc<SocketAddr>>>,
}
-pub struct Peer<T: Opaque>(Arc<PeerInner<T>>);
+pub struct Peer<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>>(Arc<PeerInner<T, S, R, K>>);
-fn treebit_list<A, R, T: Opaque>(
- peer: &Arc<PeerInner<T>>,
- table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner<T>>>>,
- callback: Box<dyn Fn(A, u32) -> R>,
-) -> Vec<R>
+fn treebit_list<A, O, T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>>(
+ peer: &Arc<PeerInner<T, S, R, K>>,
+ table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner<T, S, R, K>>>>,
+ callback: Box<dyn Fn(A, u32) -> O>,
+) -> Vec<O>
where
A: Address,
{
@@ -69,9 +69,9 @@ where
res
}
-fn treebit_remove<A: Address, T: Opaque>(
- peer: &Peer<T>,
- table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner<T>>>>,
+fn treebit_remove<A: Address, T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>>(
+ peer: &Peer<T, S, R, K>,
+ table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner<T, S, R, K>>>>,
) {
let mut m = table.write();
@@ -93,7 +93,7 @@ fn treebit_remove<A: Address, T: Opaque>(
}
}
-impl<T: Opaque> Drop for Peer<T> {
+impl<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>> Drop for Peer<T, S, R, K> {
fn drop(&mut self) {
// mark peer as stopped
@@ -148,7 +148,11 @@ impl<T: Opaque> Drop for Peer<T> {
}
}
-pub fn new_peer<T: Opaque>(device: Arc<DeviceInner<T>>, opaque: T) -> Peer<T> {
+pub fn new_peer<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>>(
+ device: Arc<DeviceInner<T, S, R, K>>,
+ opaque: T
+) -> Peer<T, S, R, K> {
+
// allocate in-order queues
let (send_inbound, recv_inbound) = sync_channel(MAX_STAGED_PACKETS);
let (send_outbound, recv_outbound) = sync_channel(MAX_STAGED_PACKETS);
@@ -199,8 +203,8 @@ pub fn new_peer<T: Opaque>(device: Arc<DeviceInner<T>>, opaque: T) -> Peer<T> {
Peer(peer)
}
-impl<T: Opaque> Peer<T> {
- fn new(inner: PeerInner<T>) -> Peer<T> {
+impl<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>> Peer<T, S, R, K> {
+ fn new(inner: PeerInner<T, S, R, K>) -> Peer<T, S, R, K> {
Peer(Arc::new(inner))
}
diff --git a/src/router/types.rs b/src/router/types.rs
index 0b5e162..2ed011b 100644
--- a/src/router/types.rs
+++ b/src/router/types.rs
@@ -2,6 +2,18 @@ pub trait Opaque: Send + Sync + 'static {}
impl<T> Opaque for T where T: Send + Sync + 'static {}
-pub trait Callback<T>: Fn(&T, bool) -> () + Sync + Send + 'static {}
+/// A send/recv callback takes 3 arguments:
+///
+/// * `0`, a reference to the opaque value assigned to the peer
+/// * `1`, a bool indicating whether the message contained data (not just keepalive)
+/// * `2`, a bool indicating whether the message was transmitted (i.e. did the peer have an associated endpoint?)
+pub trait Callback<T>: Fn(&T, bool, bool) -> () + Sync + Send + 'static {}
-impl<T, F> Callback<T> for F where F: Fn(&T, bool) -> () + Sync + Send + 'static {}
+impl<T, F> Callback<T> for F where F: Fn(&T, bool, bool) -> () + Sync + Send + 'static {}
+
+/// A key callback takes 1 argument
+///
+/// * `0`, a reference to the opaque value assigned to the peer
+pub trait KeyCallback<T>: Fn(&T) -> () + Sync + Send + 'static {}
+
+impl<T, F> KeyCallback<T> for F where F: Fn(&T) -> () + Sync + Send + 'static {} \ No newline at end of file
diff --git a/src/router/workers.rs b/src/router/workers.rs
index 4942491..4f39fb2 100644
--- a/src/router/workers.rs
+++ b/src/router/workers.rs
@@ -10,7 +10,7 @@ use std::sync::mpsc::{sync_channel, Receiver, TryRecvError};
use std::sync::{Arc, Weak};
use std::thread;
-use super::types::{Opaque, Callback};
+use super::types::{Opaque, Callback, KeyCallback};
#[derive(PartialEq)]
enum Operation {
@@ -34,7 +34,7 @@ pub struct JobInner {
pub type JobBuffer = Arc<spin::Mutex<JobInner>>;
pub type JobParallel = (Arc<thread::JoinHandle<()>>, JobBuffer);
-pub type JobInbound<T> = (Weak<DecryptionState<T>>, JobBuffer);
+pub type JobInbound<T, S, R, K> = (Weak<DecryptionState<T, S, R, K>>, JobBuffer);
pub type JobOutbound = JobBuffer;
/* Strategy for workers acquiring a new job:
@@ -82,10 +82,10 @@ fn wait_recv<T>(stopped: &AtomicBool, recv: &Receiver<T>) -> Result<T, TryRecvEr
return Err(TryRecvError::Disconnected);
}
-pub fn worker_inbound<T : Opaque>(
- device: Arc<DeviceInner<T>>, // related device
- peer: Arc<PeerInner<T>>, // related peer
- recv: Receiver<JobInbound<T>>, // in order queue
+pub fn worker_inbound<T : Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>>(
+ device: Arc<DeviceInner<T, S, R, K>>, // related device
+ peer: Arc<PeerInner<T, S, R, K>>, // related peer
+ recv: Receiver<JobInbound<T, S, R, K>>, // in order queue
) {
loop {
match wait_recv(&peer.stopped, &recv) {
@@ -110,9 +110,9 @@ pub fn worker_inbound<T : Opaque>(
}
}
-pub fn worker_outbound<T : Opaque>(
- device: Arc<DeviceInner<T>>, // related device
- peer: Arc<PeerInner<T>>, // related peer
+pub fn worker_outbound<T : Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>>(
+ device: Arc<DeviceInner<T, S, R, K>>, // related device
+ peer: Arc<PeerInner<T, S, R, K>>, // related peer
recv: Receiver<JobOutbound>, // in order queue
) {
loop {