summaryrefslogtreecommitdiffstats
path: root/src/router/device.rs
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2019-08-16 12:33:10 +0200
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2019-08-16 12:33:10 +0200
commit726163b7f1a788d084643926dd92f03e36fdf27a (patch)
treebe39bcf2b6701562e53f93ad114b489ea6b444e6 /src/router/device.rs
parentImplement add_keypair semantics (diff)
downloadwireguard-rs-726163b7f1a788d084643926dd92f03e36fdf27a.tar.xz
wireguard-rs-726163b7f1a788d084643926dd92f03e36fdf27a.zip
Layout work on router
Diffstat (limited to 'src/router/device.rs')
-rw-r--r--src/router/device.rs235
1 files changed, 146 insertions, 89 deletions
diff --git a/src/router/device.rs b/src/router/device.rs
index ec63111..3b29312 100644
--- a/src/router/device.rs
+++ b/src/router/device.rs
@@ -1,14 +1,17 @@
use arraydeque::{ArrayDeque, Saturating, Wrapping};
-use lifeguard::{Pool, Recycled};
use treebitmap::IpLookupTable;
use std::collections::HashMap;
+use std::error::Error;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
-use std::sync::atomic::{AtomicPtr, AtomicU64, Ordering};
+use std::ptr;
+use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU64, Ordering};
+use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::sync::{Arc, Mutex, Weak};
+use std::thread;
use std::time::{Duration, Instant};
-use spin::RwLock;
+use spin;
use super::super::types::KeyPair;
use super::anti_replay::AntiReplay;
@@ -18,143 +21,197 @@ use std::u64;
const REJECT_AFTER_MESSAGES: u64 = u64::MAX - (1 << 4);
const MAX_STAGED_PACKETS: usize = 128;
-pub struct Device<'a> {
- recv: RwLock<HashMap<u32, Arc<Peer<'a>>>>, // map receiver id -> peer
- ipv4: IpLookupTable<Ipv4Addr, Arc<Peer<'a>>>, // ipv4 trie
- ipv6: IpLookupTable<Ipv6Addr, Arc<Peer<'a>>>, // ipv6 trie
- pool: Pool<Vec<u8>>, // message buffer pool
+pub struct Device {
+ recv: spin::RwLock<HashMap<u32, DecryptionState>>,
+ ipv4: IpLookupTable<Ipv4Addr, Weak<PeerInner>>,
+ ipv6: IpLookupTable<Ipv6Addr, Weak<PeerInner>>,
}
-struct KeyState(KeyPair, AntiReplay);
-
struct EncryptionState {
- key: [u8; 32], // encryption key
- id: u64, // sender id
- nonce: AtomicU64, // next available nonce
- death: Instant, // can must the key no longer be used:
- // (birth + reject-after-time - keepalive-timeout - rekey-timeout)
+ key: [u8; 32], // encryption key
+ id: u32, // sender id
+ nonce: u64, // next available nonce
+ death: Instant, // time when the key no longer can be used for encryption
+ // (birth + reject-after-time - keepalive-timeout - rekey-timeout)
+}
+
+struct DecryptionState {
+ key: [u8; 32],
+ protector: Arc<spin::Mutex<AntiReplay>>,
+ peer: Weak<PeerInner>,
+ death: Instant, // time when the key can no longer be used for decryption
}
struct KeyWheel {
- next: AtomicPtr<Arc<Option<KeyState>>>, // next key state (unconfirmed)
- current: AtomicPtr<Arc<Option<KeyState>>>, // current key state (used for encryption)
- previous: AtomicPtr<Arc<Option<KeyState>>>, // old key state (used for decryption)
+ next: Option<KeyPair>, // next key state (unconfirmed)
+ current: Option<KeyPair>, // current key state (used for encryption)
+ previous: Option<KeyPair>, // old key state (used for decryption)
}
-pub struct Peer<'a> {
- inorder: Mutex<ArrayDeque<[Option<Recycled<'a, Vec<u8>>>; MAX_STAGED_PACKETS], Saturating>>, // inorder queue
+struct PeerInner {
+ inorder_outbound: SyncSender<()>,
+ inorder_inbound: SyncSender<()>,
staged_packets: Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake
rx_bytes: AtomicU64, // received bytes
tx_bytes: AtomicU64, // transmitted bytes
- keys: KeyWheel, // key-wheel
- ekey: AtomicPtr<Arc<EncryptionState>>, // encryption state
- endpoint: AtomicPtr<Arc<Option<SocketAddr>>>,
+ keys: spin::Mutex<KeyWheel>, // key-wheel
+ ekey: spin::Mutex<EncryptionState>, // encryption state
+ endpoint: spin::Mutex<Option<Arc<SocketAddr>>>,
+}
+
+pub struct Peer(Arc<PeerInner>);
+
+impl Drop for Peer {
+ fn drop(&mut self) {
+ // stop threads and remove peer from device
+ }
+}
+
+impl Drop for Device {
+ fn drop(&mut self) {
+ // stop threads
+ }
}
-impl<'a> Peer<'a> {
- pub fn set_endpoint(&self, endpoint: SocketAddr) {
- self.endpoint
- .store(&mut Arc::new(Some(endpoint)), Ordering::Relaxed)
- }
-
- pub fn add_keypair(&self, keypair: KeyPair) {
- let confirmed = keypair.confirmed;
- let mut st_new = Arc::new(Some(KeyState(keypair, AntiReplay::new())));
- let st_previous = self.keys.previous.load(Ordering::Relaxed);
- if confirmed {
- // previous <- current
- self.keys.previous.compare_and_swap(
- st_previous,
- self.keys.current.load(Ordering::Relaxed),
- Ordering::Relaxed,
- );
-
- // current <- new
- self.keys.next.store(&mut st_new, Ordering::Relaxed)
+impl Peer {
+ fn set_endpoint(&self, endpoint: SocketAddr) {
+ *self.0.endpoint.lock() = Some(Arc::new(endpoint))
+ }
+
+ pub fn keypair_confirm(&self, ks: Arc<KeyPair>) {
+ *self.0.ekey.lock() = EncryptionState {
+ id: ks.send.id,
+ key: ks.send.key,
+ nonce: 0,
+ death: ks.birth + Duration::from_millis(1337), // todo
+ };
+ }
+
+ pub fn keypair_add(&self, new: KeyPair) -> Option<u32> {
+ let mut keys = self.0.keys.lock();
+ let release = keys.previous.map(|k| k.recv.id);
+
+ // update key-wheel
+ if new.confirmed {
+ // start using key for encryption
+ *self.0.ekey.lock() = EncryptionState {
+ id: new.send.id,
+ key: new.send.key,
+ nonce: 0,
+ death: new.birth + Duration::from_millis(1337), // todo
+ };
+
+ // move current into previous
+ keys.previous = keys.current;
+ keys.current = Some(new);
} else {
- // previous <- next
- self.keys.previous.compare_and_swap(
- st_previous,
- self.keys.next.load(Ordering::Relaxed),
- Ordering::Relaxed,
- );
-
- // next <- new
- self.keys.next.store(&mut st_new, Ordering::Relaxed)
- }
+ // store the key and await confirmation
+ keys.previous = keys.next;
+ keys.next = Some(new);
+ };
+
+ // return the released id (for handshake state machine)
+ release
}
pub fn rx_bytes(&self) -> u64 {
- self.rx_bytes.load(Ordering::Relaxed)
+ self.0.rx_bytes.load(Ordering::Relaxed)
}
pub fn tx_bytes(&self) -> u64 {
- self.tx_bytes.load(Ordering::Relaxed)
+ self.0.tx_bytes.load(Ordering::Relaxed)
}
}
-impl<'a> Device<'a> {
- pub fn new() -> Device<'a> {
+impl Device {
+ pub fn new() -> Device {
Device {
- recv: RwLock::new(HashMap::new()),
+ recv: spin::RwLock::new(HashMap::new()),
ipv4: IpLookupTable::new(),
ipv6: IpLookupTable::new(),
- pool: Pool::with_size_and_max(0, MAX_STAGED_PACKETS * 2),
}
}
- pub fn subnets(&self, peer: Arc<Peer<'a>>) -> Vec<(IpAddr, u32)> {
+ pub fn release(&self, id: u32) {
+ debug_assert!(
+ if let Some(_) = self.recv.read().get(&id) {
+ true
+ } else {
+ false
+ },
+ true
+ );
+ self.recv.write().remove(&id);
+ }
+
+ pub fn add_subnet(&mut self, ip: IpAddr, masklen: u32, peer: Peer) {
+ match ip {
+ IpAddr::V4(v4) => self.ipv4.insert(v4, masklen, Arc::downgrade(&peer.0)),
+ IpAddr::V6(v6) => self.ipv6.insert(v6, masklen, Arc::downgrade(&peer.0)),
+ };
+ }
+
+ pub fn subnets(&self, peer: Peer) -> Vec<(IpAddr, u32)> {
let mut subnets = Vec::new();
// extract ipv4 entries
for subnet in self.ipv4.iter() {
let (ip, masklen, p) = subnet;
- if Arc::ptr_eq(&peer, p) {
- subnets.push((IpAddr::V4(ip), masklen))
+ if let Some(p) = p.upgrade() {
+ if Arc::ptr_eq(&p, &peer.0) {
+ subnets.push((IpAddr::V4(ip), masklen))
+ }
}
}
// extract ipv6 entries
for subnet in self.ipv6.iter() {
let (ip, masklen, p) = subnet;
- if Arc::ptr_eq(&peer, p) {
- subnets.push((IpAddr::V6(ip), masklen))
+ if let Some(p) = p.upgrade() {
+ if Arc::ptr_eq(&p, &peer.0) {
+ subnets.push((IpAddr::V6(ip), masklen))
+ }
}
}
subnets
}
+ pub fn keypair_add(&self, peer: Peer, new: KeyPair) -> Option<u32> {
+ // update key-wheel of peer
+ let release = peer.keypair_add(new);
+
+ // update incoming packet id map
+ let mut recv = self.recv.write();
+
+ // release id of previous keypair
+ if let Some(id) = release {
+ debug_assert!(recv.contains_key(&id));
+ recv.remove(&id);
+ };
+
+ // map new id to keypair
+ debug_assert!(!recv.contains_key(&new.recv.id));
+
+ recv.insert(
+ new.recv.id,
+ DecryptionState {
+ key: new.recv.key,
+ protector: Arc::new(spin::Mutex::new(AntiReplay::new())),
+ peer: Arc::downgrade(&peer.0),
+ death: new.birth + Duration::from_millis(2600), // todo
+ },
+ );
+
+ release
+ }
+
/// Adds a new peer to the device
///
/// # Returns
///
/// A atomic ref. counted peer (with liftime matching the device)
- pub fn add(&mut self) -> Arc<Peer<'a>> {
- Arc::new(Peer {
- inorder: Mutex::new(ArrayDeque::new()),
- staged_packets: Mutex::new(ArrayDeque::new()),
- rx_bytes: AtomicU64::new(0),
- tx_bytes: AtomicU64::new(0),
- keys: KeyWheel {
- next: AtomicPtr::new(&mut Arc::new(None)),
- current: AtomicPtr::new(&mut Arc::new(None)),
- previous: AtomicPtr::new(&mut Arc::new(None)),
- },
- // long expired encryption key
- ekey: AtomicPtr::new(&mut Arc::new(EncryptionState {
- key: [0u8; 32],
- id: 0,
- nonce: AtomicU64::new(REJECT_AFTER_MESSAGES),
- death: Instant::now() - Duration::from_secs(31536000),
- })),
- endpoint: AtomicPtr::new(&mut Arc::new(None)),
- })
- }
-
- pub fn get_buffer(&self) -> Recycled<Vec<u8>> {
- self.pool.new()
- }
+ pub fn add(&mut self) -> () {}
/// Cryptkey routes and sends a plaintext message (IP packet)
///