diff options
author | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-08-20 14:33:11 +0200 |
---|---|---|
committer | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-08-20 14:33:11 +0200 |
commit | 7e727d120b4c7375b7dd2f1210a883c876531c06 (patch) | |
tree | 43b30640038535c9f3d6640659707c5fd36b595b /src/router/peer.rs | |
parent | Implemented keypair_confirm (diff) | |
download | wireguard-rs-7e727d120b4c7375b7dd2f1210a883c876531c06.tar.xz wireguard-rs-7e727d120b4c7375b7dd2f1210a883c876531c06.zip |
Restructure and job stealing work queue
Diffstat (limited to 'src/router/peer.rs')
-rw-r--r-- | src/router/peer.rs | 285 |
1 files changed, 285 insertions, 0 deletions
diff --git a/src/router/peer.rs b/src/router/peer.rs new file mode 100644 index 0000000..f7b8bf4 --- /dev/null +++ b/src/router/peer.rs @@ -0,0 +1,285 @@ +use std::sync::atomic::{AtomicU64, AtomicBool, Ordering}; +use std::sync::{Weak, Arc}; +use std::thread; + +use std::net::{IpAddr, SocketAddr}; + +use std::sync::mpsc::{sync_channel, SyncSender}; + +use spin; + +use arraydeque::{ArrayDeque, Wrapping}; + +use treebitmap::IpLookupTable; +use treebitmap::address::Address; + +use super::super::types::KeyPair; +use super::super::constants::*; + +use super::anti_replay::AntiReplay; +use super::device::DeviceInner; +use super::device::EncryptionState; +use super::device::DecryptionState; + +const MAX_STAGED_PACKETS: usize = 128; + +struct KeyWheel { + next: Option<Arc<KeyPair>>, // next key state (unconfirmed) + current: Option<Arc<KeyPair>>, // current key state (used for encryption) + previous: Option<Arc<KeyPair>>, // old key state (used for decryption) + retired: Option<u32>, // retired id (previous id, after confirming key-pair) +} + +pub struct PeerInner { + stopped: AtomicBool, + device: Arc<DeviceInner>, + thread_outbound: spin::Mutex<thread::JoinHandle<()>>, + thread_inbound: spin::Mutex<thread::JoinHandle<()>>, + inorder_outbound: SyncSender<()>, + inorder_inbound: SyncSender<()>, + staged_packets: spin::Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake + rx_bytes: AtomicU64, // received bytes + tx_bytes: AtomicU64, // transmitted bytes + keys: spin::Mutex<KeyWheel>, // key-wheel + ekey: spin::Mutex<Option<EncryptionState>>, // encryption state + endpoint: spin::Mutex<Option<Arc<SocketAddr>>>, +} + +pub struct Peer(Arc<PeerInner>); + +fn treebit_list<A, R>( + peer: &Arc<PeerInner>, + table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner>>>, + callback: Box<dyn Fn(A, u32) -> R>, +) -> Vec<R> +where + A: Address, +{ + let mut res = Vec::new(); + for subnet in table.read().iter() { + let (ip, masklen, p) = subnet; + if let Some(p) = p.upgrade() { + if Arc::ptr_eq(&p, &peer) { + res.push(callback(ip, masklen)) + } + } + } + res +} + +fn treebit_remove<A>(peer: &Peer, table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner>>>) +where + A: Address, +{ + let mut m = table.write(); + + // collect keys for value + let mut subnets = vec![]; + for subnet in m.iter() { + let (ip, masklen, p) = subnet; + if let Some(p) = p.upgrade() { + if Arc::ptr_eq(&p, &peer.0) { + subnets.push((ip, masklen)) + } + } + } + + // remove all key mappings + for subnet in subnets { + let r = m.remove(subnet.0, subnet.1); + debug_assert!(r.is_some()); + } +} + +impl Drop for Peer { + fn drop(&mut self) { + // mark peer as stopped + + let peer = &self.0; + peer.stopped.store(true, Ordering::SeqCst); + + // remove from cryptkey router + + treebit_remove(self, &peer.device.ipv4); + treebit_remove(self, &peer.device.ipv6); + + // unpark threads + + peer.thread_inbound.lock().thread().unpark(); + peer.thread_outbound.lock().thread().unpark(); + + // release ids from the receiver map + + let mut keys = peer.keys.lock(); + let mut release = Vec::with_capacity(3); + + keys.next.as_ref().map(|k| release.push(k.recv.id)); + keys.current.as_ref().map(|k| release.push(k.recv.id)); + keys.previous.as_ref().map(|k| release.push(k.recv.id)); + + if release.len() > 0 { + let mut recv = peer.device.recv.write(); + for id in &release { + recv.remove(id); + } + } + + // null key-material (TODO: extend) + + keys.next = None; + keys.current = None; + keys.previous = None; + + *peer.ekey.lock() = None; + *peer.endpoint.lock() = None; + } +} + +pub fn new_peer(device: Arc<DeviceInner>) -> Peer { + // spawn inbound thread + let (send_inbound, recv_inbound) = sync_channel(1); + let handle_inbound = thread::spawn(move || {}); + + // spawn outbound thread + let (send_outbound, recv_inbound) = sync_channel(1); + let handle_outbound = thread::spawn(move || {}); + + // allocate peer object + Peer::new(PeerInner { + stopped: AtomicBool::new(false), + device: device, + ekey: spin::Mutex::new(None), + endpoint: spin::Mutex::new(None), + inorder_inbound: send_inbound, + inorder_outbound: send_outbound, + keys: spin::Mutex::new(KeyWheel { + next: None, + current: None, + previous: None, + retired: None, + }), + rx_bytes: AtomicU64::new(0), + tx_bytes: AtomicU64::new(0), + staged_packets: spin::Mutex::new(ArrayDeque::new()), + thread_inbound: spin::Mutex::new(handle_inbound), + thread_outbound: spin::Mutex::new(handle_outbound), + }) +} + +impl Peer { + fn new(inner : PeerInner) -> Peer { + Peer(Arc::new(inner)) + } + + pub fn set_endpoint(&self, endpoint: SocketAddr) { + *self.0.endpoint.lock() = Some(Arc::new(endpoint)) + } + + /// Add a new keypair + /// + /// # Arguments + /// + /// - new: The new confirmed/unconfirmed key pair + /// + /// # Returns + /// + /// A vector of ids which has been released. + /// These should be released in the handshake module. + pub fn add_keypair(&self, new: KeyPair) -> Vec<u32> { + let mut keys = self.0.keys.lock(); + let mut release = Vec::with_capacity(2); + let new = Arc::new(new); + + // collect ids to be released + keys.retired.map(|v| release.push(v)); + keys.previous.as_ref().map(|k| release.push(k.recv.id)); + + // update key-wheel + if new.confirmed { + // start using key for encryption + *self.0.ekey.lock() = Some(EncryptionState { + id: new.send.id, + key: new.send.key, + nonce: 0, + death: new.birth + REJECT_AFTER_TIME, + }); + + // move current into previous + keys.previous = keys.current.as_ref().map(|v| v.clone());; + keys.current = Some(new.clone()); + } else { + // store the key and await confirmation + keys.previous = keys.next.as_ref().map(|v| v.clone());; + keys.next = Some(new.clone()); + }; + + // update incoming packet id map + { + let mut recv = self.0.device.recv.write(); + + // purge recv map of released ids + for id in &release { + recv.remove(&id); + } + + // map new id to keypair + debug_assert!(!recv.contains_key(&new.recv.id)); + + recv.insert( + new.recv.id, + DecryptionState { + keypair: Arc::downgrade(&new), + key: new.recv.key, + protector: spin::Mutex::new(AntiReplay::new()), + peer: Arc::downgrade(&self.0), + death: new.birth + REJECT_AFTER_TIME, + }, + ); + } + + // return the released id (for handshake state machine) + release + } + + pub fn rx_bytes(&self) -> u64 { + self.0.rx_bytes.load(Ordering::Relaxed) + } + + pub fn tx_bytes(&self) -> u64 { + self.0.tx_bytes.load(Ordering::Relaxed) + } + + pub fn add_subnet(&self, ip: IpAddr, masklen: u32) { + match ip { + IpAddr::V4(v4) => { + self.0 + .device + .ipv4 + .write() + .insert(v4, masklen, Arc::downgrade(&self.0)) + } + IpAddr::V6(v6) => { + self.0 + .device + .ipv6 + .write() + .insert(v6, masklen, Arc::downgrade(&self.0)) + } + }; + } + + pub fn list_subnets(&self) -> Vec<(IpAddr, u32)> { + let mut res = Vec::new(); + res.append(&mut treebit_list( + &self.0, + &self.0.device.ipv4, + Box::new(|ip, masklen| (IpAddr::V4(ip), masklen)), + )); + res.append(&mut treebit_list( + &self.0, + &self.0.device.ipv6, + Box::new(|ip, masklen| (IpAddr::V6(ip), masklen)), + )); + res + } +}
\ No newline at end of file |