summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main.rs3
-rw-r--r--src/router/device.rs88
-rw-r--r--src/router/mod.rs8
-rw-r--r--src/router/peer.rs70
-rw-r--r--src/router/tests.rs173
-rw-r--r--src/router/types.rs29
-rw-r--r--src/router/workers.rs31
7 files changed, 367 insertions, 35 deletions
diff --git a/src/main.rs b/src/main.rs
index 6d1d2e1..8d92048 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -10,10 +10,9 @@ use hjul::*;
use std::error::Error;
use std::fmt;
use std::net::SocketAddr;
-use std::sync::Arc;
use std::time::Duration;
-use types::{Bind, KeyPair, Tun};
+use types::{Bind, Tun};
#[derive(Debug)]
enum TunError {}
diff --git a/src/router/device.rs b/src/router/device.rs
index f04cf97..4fb0334 100644
--- a/src/router/device.rs
+++ b/src/router/device.rs
@@ -1,6 +1,7 @@
+use std::cmp;
use std::collections::HashMap;
use std::net::{Ipv4Addr, Ipv6Addr};
-use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Weak};
use std::thread;
use std::time::Instant;
@@ -14,10 +15,21 @@ use super::super::types::{Bind, KeyPair, Tun};
use super::anti_replay::AntiReplay;
use super::peer;
use super::peer::{Peer, PeerInner};
+use super::SIZE_MESSAGE_PREFIX;
-use super::types::{Callback, Callbacks, KeyCallback, Opaque, PhantomCallbacks};
+use super::types::{Callback, Callbacks, KeyCallback, Opaque, PhantomCallbacks, RouterError};
use super::workers::{worker_parallel, JobParallel};
+// minimum sizes for IP headers
+const SIZE_IP4_HEADER: usize = 16;
+const SIZE_IP6_HEADER: usize = 36;
+
+const VERSION_IP4: u8 = 4;
+const VERSION_IP6: u8 = 6;
+
+const OFFSET_IP4_DST: usize = 16;
+const OFFSET_IP6_DST: usize = 24;
+
pub struct DeviceInner<C: Callbacks, T: Tun, B: Bind> {
// IO & timer generics
pub tun: T,
@@ -27,9 +39,9 @@ pub struct DeviceInner<C: Callbacks, T: Tun, B: Bind> {
pub call_need_key: C::CallbackKey,
// threading and workers
- pub running: AtomicBool, // workers running?
- pub parked: AtomicBool, // any workers parked?
- pub injector: Injector<JobParallel>, // parallel enc/dec task injector
+ pub running: AtomicBool, // workers running?
+ pub parked: AtomicBool, // any workers parked?
+ pub injector: Injector<JobParallel<C, T, B>>, // parallel enc/dec task injector
// routing
pub recv: spin::RwLock<HashMap<u32, DecryptionState<C, T, B>>>, // receiver id -> decryption state
@@ -38,11 +50,10 @@ pub struct DeviceInner<C: Callbacks, T: Tun, B: Bind> {
}
pub struct EncryptionState {
- pub key: [u8; 32], // encryption key
- pub id: u32, // sender id
- pub nonce: u64, // next available nonce
- pub death: Instant, // time when the key no longer can be used for encryption
- // (birth + reject-after-time - keepalive-timeout - rekey-timeout)
+ pub key: [u8; 32], // encryption key
+ pub id: u32, // receiver id
+ pub nonce: u64, // next available nonce
+ pub death: Instant, // (birth + reject-after-time - keepalive-timeout - rekey-timeout)
}
pub struct DecryptionState<C: Callbacks, T: Tun, B: Bind> {
@@ -144,8 +155,61 @@ impl<C: Callbacks, T: Tun, B: Bind> Device<C, T, B> {
///
/// - pt_msg: IP packet to cryptkey route
///
- pub fn send(&self, pt_msg: &mut [u8]) {
- unimplemented!();
+ pub fn send(&self, msg: Vec<u8>) -> Result<(), RouterError> {
+ // ensure that the type field access is within bounds
+ if msg.len() < cmp::min(SIZE_IP4_HEADER, SIZE_IP6_HEADER) + SIZE_MESSAGE_PREFIX {
+ return Err(RouterError::MalformedIPHeader);
+ }
+
+ // ignore header prefix (for in-place transport message construction)
+ let packet = &msg[SIZE_MESSAGE_PREFIX..];
+
+ // lookup peer based on IP packet destination address
+ let peer = match packet[0] >> 4 {
+ VERSION_IP4 => {
+ if msg.len() >= SIZE_IP4_HEADER {
+ // extract IPv4 destination address
+ let mut dst = [0u8; 4];
+ dst.copy_from_slice(&packet[OFFSET_IP4_DST..OFFSET_IP4_DST + 4]);
+ let dst = Ipv4Addr::from(dst);
+
+ // lookup peer (project unto and clone "value" field)
+ self.0
+ .ipv4
+ .read()
+ .longest_match(dst)
+ .and_then(|(_, _, p)| p.upgrade())
+ .ok_or(RouterError::NoCryptKeyRoute)
+ } else {
+ Err(RouterError::MalformedIPHeader)
+ }
+ }
+ VERSION_IP6 => {
+ if msg.len() >= SIZE_IP6_HEADER {
+ // extract IPv6 destination address
+ let mut dst = [0u8; 16];
+ dst.copy_from_slice(&packet[OFFSET_IP6_DST..OFFSET_IP6_DST + 16]);
+ let dst = Ipv6Addr::from(dst);
+
+ // lookup peer (project unto and clone "value" field)
+ self.0
+ .ipv6
+ .read()
+ .longest_match(dst)
+ .and_then(|(_, _, p)| p.upgrade())
+ .ok_or(RouterError::NoCryptKeyRoute)
+ } else {
+ Err(RouterError::MalformedIPHeader)
+ }
+ }
+ _ => Err(RouterError::MalformedIPHeader),
+ }?;
+
+ // schedule for encryption and transmission to peer
+ if let Some(job) = peer.send_job(msg) {
+ self.0.injector.push((peer.clone(), job));
+ }
+ Ok(())
}
/// Receive an encrypted transport message
diff --git a/src/router/mod.rs b/src/router/mod.rs
index c1ecf1c..0e4bce1 100644
--- a/src/router/mod.rs
+++ b/src/router/mod.rs
@@ -5,5 +5,13 @@ mod peer;
mod types;
mod workers;
+#[cfg(test)]
+mod tests;
+
+use messages::TransportHeader;
+use std::mem;
+
+pub const SIZE_MESSAGE_PREFIX: usize = mem::size_of::<TransportHeader>();
+
pub use device::Device;
pub use peer::Peer;
diff --git a/src/router/peer.rs b/src/router/peer.rs
index e21e69c..d755fa5 100644
--- a/src/router/peer.rs
+++ b/src/router/peer.rs
@@ -1,3 +1,4 @@
+use std::mem;
use std::net::{IpAddr, SocketAddr};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::mpsc::{sync_channel, SyncSender};
@@ -7,18 +8,23 @@ use std::thread;
use spin;
use arraydeque::{ArrayDeque, Wrapping};
+use zerocopy::{AsBytes, LayoutVerified};
use treebitmap::address::Address;
use treebitmap::IpLookupTable;
use super::super::constants::*;
-use super::super::types::{KeyPair, Tun, Bind};
+use super::super::types::{Bind, KeyPair, Tun};
use super::anti_replay::AntiReplay;
use super::device::DecryptionState;
use super::device::DeviceInner;
use super::device::EncryptionState;
-use super::workers::{worker_inbound, worker_outbound, JobInbound, JobOutbound};
+use super::messages::TransportHeader;
+
+use super::workers::{worker_inbound, worker_outbound};
+use super::workers::{JobBuffer, JobInbound, JobInner, JobOutbound};
+use super::workers::{Operation, Status};
use super::types::Callbacks;
@@ -40,16 +46,14 @@ pub struct PeerInner<C: Callbacks, T: Tun, B: Bind> {
pub queue_outbound: SyncSender<JobOutbound>,
pub queue_inbound: SyncSender<JobInbound<C, T, B>>,
pub staged_packets: spin::Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake
- pub rx_bytes: AtomicU64, // received bytes
- pub tx_bytes: AtomicU64, // transmitted bytes
+ pub rx_bytes: AtomicU64, // received bytes
+ pub tx_bytes: AtomicU64, // transmitted bytes
pub keys: spin::Mutex<KeyWheel>, // key-wheel
pub ekey: spin::Mutex<Option<EncryptionState>>, // encryption state
pub endpoint: spin::Mutex<Option<Arc<SocketAddr>>>,
}
-pub struct Peer<C: Callbacks, T: Tun, B: Bind>(
- Arc<PeerInner<C, T, B>>,
-);
+pub struct Peer<C: Callbacks, T: Tun, B: Bind>(Arc<PeerInner<C, T, B>>);
fn treebit_list<A, E, C: Callbacks, T: Tun, B: Bind>(
peer: &Arc<PeerInner<C, T, B>>,
@@ -212,6 +216,51 @@ impl<C: Callbacks, T: Tun, B: Bind> PeerInner<C, T, B> {
// rotate key-wheel
}
+
+ pub fn send_job(&self, mut msg: Vec<u8>) -> Option<JobBuffer> {
+ debug_assert!(msg.len() >= mem::size_of::<TransportHeader>());
+
+ // parse / cast
+ let (header, _) = LayoutVerified::new_from_prefix(&mut msg[..]).unwrap();
+ let mut header: LayoutVerified<&mut [u8], TransportHeader> = header;
+
+ // check if has key
+ let key = match self.ekey.lock().as_mut() {
+ None => {
+ // add to staged packets (create no job)
+ (self.device.call_need_key)(&self.opaque);
+ self.staged_packets.lock().push_back(msg);
+ return None;
+ }
+ Some(mut state) => {
+ // allocate nonce
+ state.nonce += 1;
+ if state.nonce >= REJECT_AFTER_MESSAGES {
+ state.nonce -= 1;
+ return None;
+ }
+
+ // set transport message fields
+ header.f_counter.set(state.nonce);
+ header.f_receiver.set(state.id);
+ state.key
+ }
+ };
+
+ // create job
+ let job = Arc::new(spin::Mutex::new(JobInner {
+ msg,
+ key,
+ status: Status::Waiting,
+ op: Operation::Encryption,
+ }));
+
+ // add job to in-order queue and return to device for inclusion in worker pool
+ match self.queue_outbound.try_send(job.clone()) {
+ Ok(_) => Some(job),
+ Err(_) => None,
+ }
+ }
}
impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> {
@@ -332,5 +381,10 @@ impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> {
res
}
- pub fn send(&self, msg: Vec<u8>) {}
+ pub fn remove_subnets(&self) {
+ treebit_remove(self, &self.0.device.ipv4);
+ treebit_remove(self, &self.0.device.ipv6);
+ }
+
+ fn send(&self, msg: Vec<u8>) {}
}
diff --git a/src/router/tests.rs b/src/router/tests.rs
new file mode 100644
index 0000000..07851a8
--- /dev/null
+++ b/src/router/tests.rs
@@ -0,0 +1,173 @@
+use std::error::Error;
+use std::fmt;
+use std::net::{IpAddr, Ipv4Addr, SocketAddr};
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Arc;
+
+use pnet::packet::ipv4::MutableIpv4Packet;
+use pnet::packet::ipv6::MutableIpv6Packet;
+
+use super::super::types::{Bind, Tun};
+use super::{Device, Peer, SIZE_MESSAGE_PREFIX};
+
+#[derive(Debug)]
+enum TunError {}
+
+impl Error for TunError {
+ fn description(&self) -> &str {
+ "Generic Tun Error"
+ }
+
+ fn source(&self) -> Option<&(dyn Error + 'static)> {
+ None
+ }
+}
+
+impl fmt::Display for TunError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "Not Possible")
+ }
+}
+
+struct TunTest {}
+
+impl Tun for TunTest {
+ type Error = TunError;
+
+ fn mtu(&self) -> usize {
+ 1500
+ }
+
+ fn read(&self, buf: &mut [u8], offset: usize) -> Result<usize, Self::Error> {
+ Ok(0)
+ }
+
+ fn write(&self, src: &[u8]) -> Result<(), Self::Error> {
+ Ok(())
+ }
+}
+
+struct BindTest {}
+
+impl Bind for BindTest {
+ type Error = BindError;
+ type Endpoint = SocketAddr;
+
+ fn new() -> BindTest {
+ BindTest {}
+ }
+
+ fn set_port(&self, port: u16) -> Result<(), Self::Error> {
+ Ok(())
+ }
+
+ fn get_port(&self) -> Option<u16> {
+ None
+ }
+
+ fn recv(&self, buf: &mut [u8]) -> Result<(usize, Self::Endpoint), Self::Error> {
+ Ok((0, "127.0.0.1:8080".parse().unwrap()))
+ }
+
+ fn send(&self, buf: &[u8], dst: &Self::Endpoint) -> Result<(), Self::Error> {
+ Ok(())
+ }
+}
+
+#[derive(Debug)]
+enum BindError {}
+
+impl Error for BindError {
+ fn description(&self) -> &str {
+ "Generic Bind Error"
+ }
+
+ fn source(&self) -> Option<&(dyn Error + 'static)> {
+ None
+ }
+}
+
+impl fmt::Display for BindError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "Not Possible")
+ }
+}
+
+#[test]
+fn test_outbound() {
+ let opaque = Arc::new(AtomicBool::new(false));
+
+ // create device (with Opaque = ())
+ let workers = 4;
+ let router = Device::new(
+ workers,
+ TunTest {},
+ BindTest {},
+ |t: &Arc<AtomicBool>, data: bool, sent: bool| {},
+ |t: &Arc<AtomicBool>, data: bool, sent: bool| {},
+ |t: &Arc<AtomicBool>| t.store(true, Ordering::SeqCst),
+ );
+
+ // create peer
+ let peer = router.new_peer(opaque.clone());
+ let tests = vec![
+ ("192.168.1.0", 24, "192.168.1.20", true),
+ ("172.133.133.133", 32, "172.133.133.133", true),
+ ("172.133.133.133", 32, "172.133.133.132", false),
+ (
+ "2001:db8::ff00:42:0000",
+ 112,
+ "2001:db8::ff00:42:3242",
+ true,
+ ),
+ (
+ "2001:db8::ff00:42:8000",
+ 113,
+ "2001:db8::ff00:42:0660",
+ false,
+ ),
+ (
+ "2001:db8::ff00:42:8000",
+ 113,
+ "2001:db8::ff00:42:ffff",
+ true,
+ ),
+ ];
+
+ for (mask, len, ip, okay) in &tests {
+ opaque.store(false, Ordering::SeqCst);
+
+ let mask: IpAddr = mask.parse().unwrap();
+
+ // map subnet to peer
+ peer.add_subnet(mask, *len);
+
+ // create "IP packet"
+ let mut msg = Vec::<u8>::new();
+ msg.resize(SIZE_MESSAGE_PREFIX + 1024, 0);
+ if mask.is_ipv4() {
+ let mut packet = MutableIpv4Packet::new(&mut msg[SIZE_MESSAGE_PREFIX..]).unwrap();
+ packet.set_destination(ip.parse().unwrap());
+ packet.set_version(4);
+ } else {
+ let mut packet = MutableIpv6Packet::new(&mut msg[SIZE_MESSAGE_PREFIX..]).unwrap();
+ packet.set_destination(ip.parse().unwrap());
+ packet.set_version(6);
+ }
+
+ // cryptkey route the IP packet
+ let res = router.send(msg);
+ if *okay {
+ // cryptkey routing succeeded
+ assert!(res.is_ok());
+
+ // and a key should have been requested
+ assert!(opaque.load(Ordering::Acquire));
+ } else {
+ assert!(res.is_err());
+ }
+
+ // clear subnets for next test
+ peer.remove_subnets();
+ }
+}
diff --git a/src/router/types.rs b/src/router/types.rs
index 82dcd09..5077686 100644
--- a/src/router/types.rs
+++ b/src/router/types.rs
@@ -1,4 +1,6 @@
+use std::fmt;
use std::marker::PhantomData;
+use std::error::Error;
pub trait Opaque: Send + Sync + 'static {}
@@ -49,3 +51,30 @@ impl<O: Opaque, R: Callback<O>, S: Callback<O>, K: KeyCallback<O>> Callbacks
type CallbackSend = S;
type CallbackKey = K;
}
+
+
+
+#[derive(Debug)]
+pub enum RouterError {
+ NoCryptKeyRoute,
+ MalformedIPHeader,
+}
+
+impl fmt::Display for RouterError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ RouterError::NoCryptKeyRoute => write!(f, "No cryptkey route configured for subnet"),
+ RouterError::MalformedIPHeader => write!(f, "IP header is malformed")
+ }
+ }
+}
+
+impl Error for RouterError {
+ fn description(&self) -> &str {
+ "Generic Handshake Error"
+ }
+
+ fn source(&self) -> Option<&(dyn Error + 'static)> {
+ None
+ }
+} \ No newline at end of file
diff --git a/src/router/workers.rs b/src/router/workers.rs
index c4a9f18..1af2cae 100644
--- a/src/router/workers.rs
+++ b/src/router/workers.rs
@@ -9,7 +9,7 @@ use spin;
use crossbeam_deque::{Injector, Steal, Stealer, Worker};
use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305};
-use zerocopy::{AsBytes, ByteSlice, ByteSliceMut, FromBytes, LayoutVerified, Unaligned};
+use zerocopy::{AsBytes, LayoutVerified};
use super::device::DecryptionState;
use super::device::DeviceInner;
@@ -17,7 +17,7 @@ use super::messages::TransportHeader;
use super::peer::PeerInner;
use super::types::Callbacks;
-use super::super::types::{Tun, Bind};
+use super::super::types::{Bind, Tun};
#[derive(PartialEq, Debug)]
pub enum Operation {
@@ -26,21 +26,21 @@ pub enum Operation {
}
#[derive(PartialEq, Debug)]
-enum Status {
+pub enum Status {
Fault, // unsealing failed
Done, // job valid and complete
Waiting, // job awaiting completion
}
pub struct JobInner {
- msg: Vec<u8>, // message buffer (nonce and receiver id set)
- key: [u8; 32], // chacha20poly1305 key
- status: Status, // state of the job
- op: Operation, // should be buffer be encrypted / decrypted?
+ pub msg: Vec<u8>, // message buffer (nonce and receiver id set)
+ pub key: [u8; 32], // chacha20poly1305 key
+ pub status: Status, // state of the job
+ pub op: Operation, // should be buffer be encrypted / decrypted?
}
pub type JobBuffer = Arc<spin::Mutex<JobInner>>;
-pub type JobParallel = (Arc<thread::JoinHandle<()>>, JobBuffer);
+pub type JobParallel<C, T, B> = (Arc<PeerInner<C, T, B>>, JobBuffer);
pub type JobInbound<C, T, B> = (Weak<DecryptionState<C, T, B>>, JobBuffer);
pub type JobOutbound = JobBuffer;
@@ -207,13 +207,13 @@ pub fn worker_outbound<C: Callbacks, T: Tun, B: Bind>(
pub fn worker_parallel<C: Callbacks, T: Tun, B: Bind>(
device: Arc<DeviceInner<C, T, B>>,
- local: Worker<JobParallel>, // local job queue (local to thread)
- stealers: Vec<Stealer<JobParallel>>, // stealers (from other threads)
+ local: Worker<JobParallel<C, T, B>>, // local job queue (local to thread)
+ stealers: Vec<Stealer<JobParallel<C, T, B>>>, // stealers (from other threads)
) {
while device.running.load(Ordering::SeqCst) {
match find_task(&local, &device.injector, &stealers) {
Some(job) => {
- let (handle, buf) = job;
+ let (peer, buf) = job;
// take ownership of the job buffer and complete it
{
@@ -260,8 +260,13 @@ pub fn worker_parallel<C: Callbacks, T: Tun, B: Bind>(
}
}
- // ensure consumer is unparked
- handle.thread().unpark();
+ // ensure consumer is unparked (TODO: better looking + wrap in atomic?)
+ peer.thread_outbound
+ .lock()
+ .as_ref()
+ .unwrap()
+ .thread()
+ .unpark();
}
None => {
device.parked.store(true, Ordering::Release);