From 929eadb651ba41bb72ba8f85a0d68c0cbad18661 Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Sun, 1 Sep 2019 17:16:01 +0200 Subject: Outbound cryptkey routing --- src/main.rs | 3 +- src/router/device.rs | 88 +++++++++++++++++++++---- src/router/mod.rs | 8 +++ src/router/peer.rs | 70 +++++++++++++++++--- src/router/tests.rs | 173 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/router/types.rs | 29 +++++++++ src/router/workers.rs | 31 +++++---- 7 files changed, 367 insertions(+), 35 deletions(-) create mode 100644 src/router/tests.rs (limited to 'src') diff --git a/src/main.rs b/src/main.rs index 6d1d2e1..8d92048 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,10 +10,9 @@ use hjul::*; use std::error::Error; use std::fmt; use std::net::SocketAddr; -use std::sync::Arc; use std::time::Duration; -use types::{Bind, KeyPair, Tun}; +use types::{Bind, Tun}; #[derive(Debug)] enum TunError {} diff --git a/src/router/device.rs b/src/router/device.rs index f04cf97..4fb0334 100644 --- a/src/router/device.rs +++ b/src/router/device.rs @@ -1,6 +1,7 @@ +use std::cmp; use std::collections::HashMap; use std::net::{Ipv4Addr, Ipv6Addr}; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::{Arc, Weak}; use std::thread; use std::time::Instant; @@ -14,10 +15,21 @@ use super::super::types::{Bind, KeyPair, Tun}; use super::anti_replay::AntiReplay; use super::peer; use super::peer::{Peer, PeerInner}; +use super::SIZE_MESSAGE_PREFIX; -use super::types::{Callback, Callbacks, KeyCallback, Opaque, PhantomCallbacks}; +use super::types::{Callback, Callbacks, KeyCallback, Opaque, PhantomCallbacks, RouterError}; use super::workers::{worker_parallel, JobParallel}; +// minimum sizes for IP headers +const SIZE_IP4_HEADER: usize = 16; +const SIZE_IP6_HEADER: usize = 36; + +const VERSION_IP4: u8 = 4; +const VERSION_IP6: u8 = 6; + +const OFFSET_IP4_DST: usize = 16; +const OFFSET_IP6_DST: usize = 24; + pub struct DeviceInner { // IO & timer generics pub tun: T, @@ -27,9 +39,9 @@ pub struct DeviceInner { pub call_need_key: C::CallbackKey, // threading and workers - pub running: AtomicBool, // workers running? - pub parked: AtomicBool, // any workers parked? - pub injector: Injector, // parallel enc/dec task injector + pub running: AtomicBool, // workers running? + pub parked: AtomicBool, // any workers parked? + pub injector: Injector>, // parallel enc/dec task injector // routing pub recv: spin::RwLock>>, // receiver id -> decryption state @@ -38,11 +50,10 @@ pub struct DeviceInner { } pub struct EncryptionState { - pub key: [u8; 32], // encryption key - pub id: u32, // sender id - pub nonce: u64, // next available nonce - pub death: Instant, // time when the key no longer can be used for encryption - // (birth + reject-after-time - keepalive-timeout - rekey-timeout) + pub key: [u8; 32], // encryption key + pub id: u32, // receiver id + pub nonce: u64, // next available nonce + pub death: Instant, // (birth + reject-after-time - keepalive-timeout - rekey-timeout) } pub struct DecryptionState { @@ -144,8 +155,61 @@ impl Device { /// /// - pt_msg: IP packet to cryptkey route /// - pub fn send(&self, pt_msg: &mut [u8]) { - unimplemented!(); + pub fn send(&self, msg: Vec) -> Result<(), RouterError> { + // ensure that the type field access is within bounds + if msg.len() < cmp::min(SIZE_IP4_HEADER, SIZE_IP6_HEADER) + SIZE_MESSAGE_PREFIX { + return Err(RouterError::MalformedIPHeader); + } + + // ignore header prefix (for in-place transport message construction) + let packet = &msg[SIZE_MESSAGE_PREFIX..]; + + // lookup peer based on IP packet destination address + let peer = match packet[0] >> 4 { + VERSION_IP4 => { + if msg.len() >= SIZE_IP4_HEADER { + // extract IPv4 destination address + let mut dst = [0u8; 4]; + dst.copy_from_slice(&packet[OFFSET_IP4_DST..OFFSET_IP4_DST + 4]); + let dst = Ipv4Addr::from(dst); + + // lookup peer (project unto and clone "value" field) + self.0 + .ipv4 + .read() + .longest_match(dst) + .and_then(|(_, _, p)| p.upgrade()) + .ok_or(RouterError::NoCryptKeyRoute) + } else { + Err(RouterError::MalformedIPHeader) + } + } + VERSION_IP6 => { + if msg.len() >= SIZE_IP6_HEADER { + // extract IPv6 destination address + let mut dst = [0u8; 16]; + dst.copy_from_slice(&packet[OFFSET_IP6_DST..OFFSET_IP6_DST + 16]); + let dst = Ipv6Addr::from(dst); + + // lookup peer (project unto and clone "value" field) + self.0 + .ipv6 + .read() + .longest_match(dst) + .and_then(|(_, _, p)| p.upgrade()) + .ok_or(RouterError::NoCryptKeyRoute) + } else { + Err(RouterError::MalformedIPHeader) + } + } + _ => Err(RouterError::MalformedIPHeader), + }?; + + // schedule for encryption and transmission to peer + if let Some(job) = peer.send_job(msg) { + self.0.injector.push((peer.clone(), job)); + } + Ok(()) } /// Receive an encrypted transport message diff --git a/src/router/mod.rs b/src/router/mod.rs index c1ecf1c..0e4bce1 100644 --- a/src/router/mod.rs +++ b/src/router/mod.rs @@ -5,5 +5,13 @@ mod peer; mod types; mod workers; +#[cfg(test)] +mod tests; + +use messages::TransportHeader; +use std::mem; + +pub const SIZE_MESSAGE_PREFIX: usize = mem::size_of::(); + pub use device::Device; pub use peer::Peer; diff --git a/src/router/peer.rs b/src/router/peer.rs index e21e69c..d755fa5 100644 --- a/src/router/peer.rs +++ b/src/router/peer.rs @@ -1,3 +1,4 @@ +use std::mem; use std::net::{IpAddr, SocketAddr}; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::mpsc::{sync_channel, SyncSender}; @@ -7,18 +8,23 @@ use std::thread; use spin; use arraydeque::{ArrayDeque, Wrapping}; +use zerocopy::{AsBytes, LayoutVerified}; use treebitmap::address::Address; use treebitmap::IpLookupTable; use super::super::constants::*; -use super::super::types::{KeyPair, Tun, Bind}; +use super::super::types::{Bind, KeyPair, Tun}; use super::anti_replay::AntiReplay; use super::device::DecryptionState; use super::device::DeviceInner; use super::device::EncryptionState; -use super::workers::{worker_inbound, worker_outbound, JobInbound, JobOutbound}; +use super::messages::TransportHeader; + +use super::workers::{worker_inbound, worker_outbound}; +use super::workers::{JobBuffer, JobInbound, JobInner, JobOutbound}; +use super::workers::{Operation, Status}; use super::types::Callbacks; @@ -40,16 +46,14 @@ pub struct PeerInner { pub queue_outbound: SyncSender, pub queue_inbound: SyncSender>, pub staged_packets: spin::Mutex; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake - pub rx_bytes: AtomicU64, // received bytes - pub tx_bytes: AtomicU64, // transmitted bytes + pub rx_bytes: AtomicU64, // received bytes + pub tx_bytes: AtomicU64, // transmitted bytes pub keys: spin::Mutex, // key-wheel pub ekey: spin::Mutex>, // encryption state pub endpoint: spin::Mutex>>, } -pub struct Peer( - Arc>, -); +pub struct Peer(Arc>); fn treebit_list( peer: &Arc>, @@ -212,6 +216,51 @@ impl PeerInner { // rotate key-wheel } + + pub fn send_job(&self, mut msg: Vec) -> Option { + debug_assert!(msg.len() >= mem::size_of::()); + + // parse / cast + let (header, _) = LayoutVerified::new_from_prefix(&mut msg[..]).unwrap(); + let mut header: LayoutVerified<&mut [u8], TransportHeader> = header; + + // check if has key + let key = match self.ekey.lock().as_mut() { + None => { + // add to staged packets (create no job) + (self.device.call_need_key)(&self.opaque); + self.staged_packets.lock().push_back(msg); + return None; + } + Some(mut state) => { + // allocate nonce + state.nonce += 1; + if state.nonce >= REJECT_AFTER_MESSAGES { + state.nonce -= 1; + return None; + } + + // set transport message fields + header.f_counter.set(state.nonce); + header.f_receiver.set(state.id); + state.key + } + }; + + // create job + let job = Arc::new(spin::Mutex::new(JobInner { + msg, + key, + status: Status::Waiting, + op: Operation::Encryption, + })); + + // add job to in-order queue and return to device for inclusion in worker pool + match self.queue_outbound.try_send(job.clone()) { + Ok(_) => Some(job), + Err(_) => None, + } + } } impl Peer { @@ -332,5 +381,10 @@ impl Peer { res } - pub fn send(&self, msg: Vec) {} + pub fn remove_subnets(&self) { + treebit_remove(self, &self.0.device.ipv4); + treebit_remove(self, &self.0.device.ipv6); + } + + fn send(&self, msg: Vec) {} } diff --git a/src/router/tests.rs b/src/router/tests.rs new file mode 100644 index 0000000..07851a8 --- /dev/null +++ b/src/router/tests.rs @@ -0,0 +1,173 @@ +use std::error::Error; +use std::fmt; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +use pnet::packet::ipv4::MutableIpv4Packet; +use pnet::packet::ipv6::MutableIpv6Packet; + +use super::super::types::{Bind, Tun}; +use super::{Device, Peer, SIZE_MESSAGE_PREFIX}; + +#[derive(Debug)] +enum TunError {} + +impl Error for TunError { + fn description(&self) -> &str { + "Generic Tun Error" + } + + fn source(&self) -> Option<&(dyn Error + 'static)> { + None + } +} + +impl fmt::Display for TunError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Not Possible") + } +} + +struct TunTest {} + +impl Tun for TunTest { + type Error = TunError; + + fn mtu(&self) -> usize { + 1500 + } + + fn read(&self, buf: &mut [u8], offset: usize) -> Result { + Ok(0) + } + + fn write(&self, src: &[u8]) -> Result<(), Self::Error> { + Ok(()) + } +} + +struct BindTest {} + +impl Bind for BindTest { + type Error = BindError; + type Endpoint = SocketAddr; + + fn new() -> BindTest { + BindTest {} + } + + fn set_port(&self, port: u16) -> Result<(), Self::Error> { + Ok(()) + } + + fn get_port(&self) -> Option { + None + } + + fn recv(&self, buf: &mut [u8]) -> Result<(usize, Self::Endpoint), Self::Error> { + Ok((0, "127.0.0.1:8080".parse().unwrap())) + } + + fn send(&self, buf: &[u8], dst: &Self::Endpoint) -> Result<(), Self::Error> { + Ok(()) + } +} + +#[derive(Debug)] +enum BindError {} + +impl Error for BindError { + fn description(&self) -> &str { + "Generic Bind Error" + } + + fn source(&self) -> Option<&(dyn Error + 'static)> { + None + } +} + +impl fmt::Display for BindError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Not Possible") + } +} + +#[test] +fn test_outbound() { + let opaque = Arc::new(AtomicBool::new(false)); + + // create device (with Opaque = ()) + let workers = 4; + let router = Device::new( + workers, + TunTest {}, + BindTest {}, + |t: &Arc, data: bool, sent: bool| {}, + |t: &Arc, data: bool, sent: bool| {}, + |t: &Arc| t.store(true, Ordering::SeqCst), + ); + + // create peer + let peer = router.new_peer(opaque.clone()); + let tests = vec![ + ("192.168.1.0", 24, "192.168.1.20", true), + ("172.133.133.133", 32, "172.133.133.133", true), + ("172.133.133.133", 32, "172.133.133.132", false), + ( + "2001:db8::ff00:42:0000", + 112, + "2001:db8::ff00:42:3242", + true, + ), + ( + "2001:db8::ff00:42:8000", + 113, + "2001:db8::ff00:42:0660", + false, + ), + ( + "2001:db8::ff00:42:8000", + 113, + "2001:db8::ff00:42:ffff", + true, + ), + ]; + + for (mask, len, ip, okay) in &tests { + opaque.store(false, Ordering::SeqCst); + + let mask: IpAddr = mask.parse().unwrap(); + + // map subnet to peer + peer.add_subnet(mask, *len); + + // create "IP packet" + let mut msg = Vec::::new(); + msg.resize(SIZE_MESSAGE_PREFIX + 1024, 0); + if mask.is_ipv4() { + let mut packet = MutableIpv4Packet::new(&mut msg[SIZE_MESSAGE_PREFIX..]).unwrap(); + packet.set_destination(ip.parse().unwrap()); + packet.set_version(4); + } else { + let mut packet = MutableIpv6Packet::new(&mut msg[SIZE_MESSAGE_PREFIX..]).unwrap(); + packet.set_destination(ip.parse().unwrap()); + packet.set_version(6); + } + + // cryptkey route the IP packet + let res = router.send(msg); + if *okay { + // cryptkey routing succeeded + assert!(res.is_ok()); + + // and a key should have been requested + assert!(opaque.load(Ordering::Acquire)); + } else { + assert!(res.is_err()); + } + + // clear subnets for next test + peer.remove_subnets(); + } +} diff --git a/src/router/types.rs b/src/router/types.rs index 82dcd09..5077686 100644 --- a/src/router/types.rs +++ b/src/router/types.rs @@ -1,4 +1,6 @@ +use std::fmt; use std::marker::PhantomData; +use std::error::Error; pub trait Opaque: Send + Sync + 'static {} @@ -49,3 +51,30 @@ impl, S: Callback, K: KeyCallback> Callbacks type CallbackSend = S; type CallbackKey = K; } + + + +#[derive(Debug)] +pub enum RouterError { + NoCryptKeyRoute, + MalformedIPHeader, +} + +impl fmt::Display for RouterError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + RouterError::NoCryptKeyRoute => write!(f, "No cryptkey route configured for subnet"), + RouterError::MalformedIPHeader => write!(f, "IP header is malformed") + } + } +} + +impl Error for RouterError { + fn description(&self) -> &str { + "Generic Handshake Error" + } + + fn source(&self) -> Option<&(dyn Error + 'static)> { + None + } +} \ No newline at end of file diff --git a/src/router/workers.rs b/src/router/workers.rs index c4a9f18..1af2cae 100644 --- a/src/router/workers.rs +++ b/src/router/workers.rs @@ -9,7 +9,7 @@ use spin; use crossbeam_deque::{Injector, Steal, Stealer, Worker}; use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305}; -use zerocopy::{AsBytes, ByteSlice, ByteSliceMut, FromBytes, LayoutVerified, Unaligned}; +use zerocopy::{AsBytes, LayoutVerified}; use super::device::DecryptionState; use super::device::DeviceInner; @@ -17,7 +17,7 @@ use super::messages::TransportHeader; use super::peer::PeerInner; use super::types::Callbacks; -use super::super::types::{Tun, Bind}; +use super::super::types::{Bind, Tun}; #[derive(PartialEq, Debug)] pub enum Operation { @@ -26,21 +26,21 @@ pub enum Operation { } #[derive(PartialEq, Debug)] -enum Status { +pub enum Status { Fault, // unsealing failed Done, // job valid and complete Waiting, // job awaiting completion } pub struct JobInner { - msg: Vec, // message buffer (nonce and receiver id set) - key: [u8; 32], // chacha20poly1305 key - status: Status, // state of the job - op: Operation, // should be buffer be encrypted / decrypted? + pub msg: Vec, // message buffer (nonce and receiver id set) + pub key: [u8; 32], // chacha20poly1305 key + pub status: Status, // state of the job + pub op: Operation, // should be buffer be encrypted / decrypted? } pub type JobBuffer = Arc>; -pub type JobParallel = (Arc>, JobBuffer); +pub type JobParallel = (Arc>, JobBuffer); pub type JobInbound = (Weak>, JobBuffer); pub type JobOutbound = JobBuffer; @@ -207,13 +207,13 @@ pub fn worker_outbound( pub fn worker_parallel( device: Arc>, - local: Worker, // local job queue (local to thread) - stealers: Vec>, // stealers (from other threads) + local: Worker>, // local job queue (local to thread) + stealers: Vec>>, // stealers (from other threads) ) { while device.running.load(Ordering::SeqCst) { match find_task(&local, &device.injector, &stealers) { Some(job) => { - let (handle, buf) = job; + let (peer, buf) = job; // take ownership of the job buffer and complete it { @@ -260,8 +260,13 @@ pub fn worker_parallel( } } - // ensure consumer is unparked - handle.thread().unpark(); + // ensure consumer is unparked (TODO: better looking + wrap in atomic?) + peer.thread_outbound + .lock() + .as_ref() + .unwrap() + .thread() + .unpark(); } None => { device.parked.store(true, Ordering::Release); -- cgit v1.2.3-59-g8ed1b