aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main.rs2
-rw-r--r--src/router/device.rs262
2 files changed, 173 insertions, 91 deletions
diff --git a/src/main.rs b/src/main.rs
index 82a4b0c..272c5e2 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -16,5 +16,5 @@ fn main() {
let mut rdev = router::Device::new(8);
- let pref = rdev.add();
+ let pref = rdev.new_peer();
}
diff --git a/src/router/device.rs b/src/router/device.rs
index 4dd6539..8f3d485 100644
--- a/src/router/device.rs
+++ b/src/router/device.rs
@@ -6,7 +6,7 @@ use crossbeam_deque::{Injector, Steal};
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
-use std::sync::mpsc::SyncSender;
+use std::sync::mpsc::{sync_channel, SyncSender};
use std::sync::{Arc, Mutex, Weak};
use std::thread;
use std::time::Instant;
@@ -37,11 +37,11 @@ struct PeerInner {
thread_inbound: spin::Mutex<thread::JoinHandle<()>>,
inorder_outbound: SyncSender<()>,
inorder_inbound: SyncSender<()>,
- staged_packets: Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake
- rx_bytes: AtomicU64, // received bytes
- tx_bytes: AtomicU64, // transmitted bytes
- keys: spin::Mutex<KeyWheel>, // key-wheel
- ekey: spin::Mutex<Option<EncryptionState>>, // encryption state
+ staged_packets: spin::Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake
+ rx_bytes: AtomicU64, // received bytes
+ tx_bytes: AtomicU64, // transmitted bytes
+ keys: spin::Mutex<KeyWheel>, // key-wheel
+ ekey: spin::Mutex<Option<EncryptionState>>, // encryption state
endpoint: spin::Mutex<Option<Arc<SocketAddr>>>,
}
@@ -55,22 +55,24 @@ struct EncryptionState {
struct DecryptionState {
key: [u8; 32],
- protector: Arc<spin::Mutex<AntiReplay>>,
+ // keypair: Weak<KeyPair>,
+ protector: spin::Mutex<AntiReplay>,
peer: Weak<PeerInner>,
death: Instant, // time when the key can no longer be used for decryption
}
struct KeyWheel {
- next: Option<KeyPair>, // next key state (unconfirmed)
- current: Option<KeyPair>, // current key state (used for encryption)
- previous: Option<KeyPair>, // old key state (used for decryption)
+ next: Option<Arc<KeyPair>>, // next key state (unconfirmed)
+ current: Option<Arc<KeyPair>>, // current key state (used for encryption)
+ previous: Option<Arc<KeyPair>>, // old key state (used for decryption)
+ retired: Option<u32>, // retired id (previous id, after confirming key-pair)
}
pub struct Peer(Arc<PeerInner>);
-pub struct Device(DeviceInner);
+pub struct Device(Arc<DeviceInner>);
fn treebit_list<A, R>(
- peer: &Peer,
+ peer: &Arc<PeerInner>,
table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner>>>,
callback: Box<dyn Fn(A, u32) -> R>,
) -> Vec<R>
@@ -81,7 +83,7 @@ where
for subnet in table.read().iter() {
let (ip, masklen, p) = subnet;
if let Some(p) = p.upgrade() {
- if Arc::ptr_eq(&p, &peer.0) {
+ if Arc::ptr_eq(&p, &peer) {
res.push(callback(ip, masklen))
}
}
@@ -116,10 +118,12 @@ where
impl Drop for Peer {
fn drop(&mut self) {
// mark peer as stopped
+
let peer = &self.0;
peer.stopped.store(true, Ordering::SeqCst);
// remove from cryptkey router
+
treebit_remove(self, &peer.device.ipv4);
treebit_remove(self, &peer.device.ipv6);
@@ -127,15 +131,16 @@ impl Drop for Peer {
peer.thread_inbound.lock().thread().unpark();
peer.thread_outbound.lock().thread().unpark();
- // collect ids to release
+
+ // release ids from the receiver map
+
let mut keys = peer.keys.lock();
let mut release = Vec::with_capacity(3);
- keys.next.map(|k| release.push(k.recv.id));
- keys.current.map(|k| release.push(k.recv.id));
- keys.previous.map(|k| release.push(k.recv.id));
+ keys.next.as_ref().map(|k| release.push(k.recv.id));
+ keys.current.as_ref().map(|k| release.push(k.recv.id));
+ keys.previous.as_ref().map(|k| release.push(k.recv.id));
- // remove from receive id map
if release.len() > 0 {
let mut recv = peer.device.recv.write();
for id in &release {
@@ -170,23 +175,73 @@ impl Drop for Device {
}
}
-impl Peer {
- pub fn set_endpoint(&self, endpoint: SocketAddr) {
- *self.0.endpoint.lock() = Some(Arc::new(endpoint))
- }
+impl PeerInner {
+ pub fn keypair_confirm(&self, kp: Weak<KeyPair>) {
+ let mut keys = self.keys.lock();
+
+ // Attempt to upgrade Weak -> Arc
+ // (this should ensure that the key is in the key-wheel,
+ // which holds the only strong reference)
+ let kp = match kp.upgrade() {
+ Some(kp) => kp,
+ None => {
+ return;
+ }
+ };
+
+ debug_assert!(
+ keys.retired.is_none(),
+ "retired spot is not free for previous"
+ );
- pub fn keypair_confirm(&self, ks: Arc<KeyPair>) {
- *self.0.ekey.lock() = Some(EncryptionState {
- id: ks.send.id,
- key: ks.send.key,
+ debug_assert!(
+ if let Some(key) = &keys.next {
+ Arc::ptr_eq(&kp, &key)
+ } else {
+ false
+ },
+ "if next has been overwritten, before confirmation, the key-pair should have been dropped!"
+ );
+
+ // enable use for encryption and set confirmed
+ *self.ekey.lock() = Some(EncryptionState {
+ id: kp.send.id,
+ key: kp.send.key,
nonce: 0,
- death: ks.birth + REJECT_AFTER_TIME,
+ death: kp.birth + REJECT_AFTER_TIME,
});
+
+ // rotate the key-wheel
+ let release = keys.previous.as_ref().map(|k| k.recv.id);
+ keys.previous = keys.current.as_ref().map(|v| v.clone());
+ keys.current = Some(kp.clone());
+ keys.retired = release;
}
+}
- fn keypair_add(&self, new: KeyPair) -> Option<u32> {
+/// Public interface and handle to the peer
+impl Peer {
+ pub fn set_endpoint(&self, endpoint: SocketAddr) {
+ *self.0.endpoint.lock() = Some(Arc::new(endpoint))
+ }
+
+ /// Add a new keypair
+ ///
+ /// # Arguments
+ ///
+ /// - new: The new confirmed/unconfirmed key pair
+ ///
+ /// # Returns
+ ///
+ /// A vector of ids which has been released.
+ /// These should be released in the handshake module.
+ pub fn add_keypair(&self, new: KeyPair) -> Vec<u32> {
let mut keys = self.0.keys.lock();
- let release = keys.previous.map(|k| k.recv.id);
+ let mut release = Vec::with_capacity(2);
+
+ // collect ids to be released
+ keys.retired.map(|v| release.push(v));
+ keys.previous.as_ref().map(|k| release.push(k.recv.id));
// update key-wheel
if new.confirmed {
@@ -199,14 +254,37 @@ impl Peer {
});
// move current into previous
- keys.previous = keys.current;
- keys.current = Some(new);
+ keys.previous = keys.current.as_ref().map(|v| v.clone());;
+ keys.current = Some(Arc::new(new));
} else {
// store the key and await confirmation
- keys.previous = keys.next;
- keys.next = Some(new);
+ keys.previous = keys.next.as_ref().map(|v| v.clone());;
+ keys.next = Some(Arc::new(new));
};
+ // update incoming packet id map
+ {
+ let mut recv = self.0.device.recv.write();
+
+ // purge recv map of released ids
+ for id in &release {
+ recv.remove(&id);
+ }
+
+ // map new id to keypair
+ debug_assert!(!recv.contains_key(&new.recv.id));
+
+ recv.insert(
+ new.recv.id,
+ DecryptionState {
+ key: new.recv.key,
+ protector: spin::Mutex::new(AntiReplay::new()),
+ peer: Arc::downgrade(&self.0),
+ death: new.birth + REJECT_AFTER_TIME,
+ },
+ );
+ }
+
// return the released id (for handshake state machine)
release
}
@@ -218,77 +296,52 @@ impl Peer {
pub fn tx_bytes(&self) -> u64 {
self.0.tx_bytes.load(Ordering::Relaxed)
}
-}
-impl Device {
- pub fn new(workers: usize) -> Device {
- Device(DeviceInner {
- threads: vec![],
- stopped: AtomicBool::new(false),
- injector: Injector::new(),
- recv: spin::RwLock::new(HashMap::new()),
- ipv4: spin::RwLock::new(IpLookupTable::new()),
- ipv6: spin::RwLock::new(IpLookupTable::new()),
- })
- }
-
- pub fn add_subnet(&mut self, ip: IpAddr, masklen: u32, peer: Peer) {
+ pub fn add_subnet(&self, ip: IpAddr, masklen: u32) {
match ip {
- IpAddr::V4(v4) => self
- .0
- .ipv4
- .write()
- .insert(v4, masklen, Arc::downgrade(&peer.0)),
- IpAddr::V6(v6) => self
- .0
- .ipv6
- .write()
- .insert(v6, masklen, Arc::downgrade(&peer.0)),
+ IpAddr::V4(v4) => {
+ self.0
+ .device
+ .ipv4
+ .write()
+ .insert(v4, masklen, Arc::downgrade(&self.0))
+ }
+ IpAddr::V6(v6) => {
+ self.0
+ .device
+ .ipv6
+ .write()
+ .insert(v6, masklen, Arc::downgrade(&self.0))
+ }
};
}
- pub fn list_subnets(&self, peer: Peer) -> Vec<(IpAddr, u32)> {
+ pub fn list_subnets(&self) -> Vec<(IpAddr, u32)> {
let mut res = Vec::new();
res.append(&mut treebit_list(
- &peer,
- &self.0.ipv4,
+ &self.0,
+ &self.0.device.ipv4,
Box::new(|ip, masklen| (IpAddr::V4(ip), masklen)),
));
res.append(&mut treebit_list(
- &peer,
- &self.0.ipv6,
+ &self.0,
+ &self.0.device.ipv6,
Box::new(|ip, masklen| (IpAddr::V6(ip), masklen)),
));
res
}
+}
- pub fn keypair_add(&self, peer: Peer, new: KeyPair) -> Option<u32> {
- // update key-wheel of peer
- let release = peer.keypair_add(new);
-
- // update incoming packet id map
- let mut recv = self.0.recv.write();
-
- // release id of previous keypair
- if let Some(id) = release {
- debug_assert!(recv.contains_key(&id));
- recv.remove(&id);
- };
-
- // map new id to keypair
- debug_assert!(!recv.contains_key(&new.recv.id));
-
- recv.insert(
- new.recv.id,
- DecryptionState {
- key: new.recv.key,
- protector: Arc::new(spin::Mutex::new(AntiReplay::new())),
- peer: Arc::downgrade(&peer.0),
- death: new.birth + REJECT_AFTER_TIME,
- },
- );
-
- release
+impl Device {
+ pub fn new(workers: usize) -> Device {
+ Device(Arc::new(DeviceInner {
+ threads: vec![],
+ stopped: AtomicBool::new(false),
+ injector: Injector::new(),
+ recv: spin::RwLock::new(HashMap::new()),
+ ipv4: spin::RwLock::new(IpLookupTable::new()),
+ ipv6: spin::RwLock::new(IpLookupTable::new()),
+ }))
}
/// Adds a new peer to the device
@@ -296,7 +349,36 @@ impl Device {
/// # Returns
///
/// A atomic ref. counted peer (with liftime matching the device)
- pub fn add(&mut self) -> () {}
+ pub fn new_peer(&self) -> Peer {
+ // spawn inbound thread
+ let (send_inbound, recv_inbound) = sync_channel(1);
+ let handle_inbound = thread::spawn(move || {});
+
+ // spawn outbound thread
+ let (send_outbound, recv_inbound) = sync_channel(1);
+ let handle_outbound = thread::spawn(move || {});
+
+ // allocate peer object
+ Peer(Arc::new(PeerInner {
+ stopped: AtomicBool::new(false),
+ device: self.0.clone(),
+ ekey: spin::Mutex::new(None),
+ endpoint: spin::Mutex::new(None),
+ inorder_inbound: send_inbound,
+ inorder_outbound: send_outbound,
+ keys: spin::Mutex::new(KeyWheel {
+ next: None,
+ current: None,
+ previous: None,
+ retired: None,
+ }),
+ rx_bytes: AtomicU64::new(0),
+ tx_bytes: AtomicU64::new(0),
+ staged_packets: spin::Mutex::new(ArrayDeque::new()),
+ thread_inbound: spin::Mutex::new(handle_inbound),
+ thread_outbound: spin::Mutex::new(handle_outbound),
+ }))
+ }
/// Cryptkey routes and sends a plaintext message (IP packet)
///