summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/router/device.rs27
-rw-r--r--src/router/messages.rs2
-rw-r--r--src/router/peer.rs14
-rw-r--r--src/router/tests.rs41
-rw-r--r--src/router/types.rs10
-rw-r--r--src/router/workers.rs8
6 files changed, 85 insertions, 17 deletions
diff --git a/src/router/device.rs b/src/router/device.rs
index 4fb0334..f1cbb70 100644
--- a/src/router/device.rs
+++ b/src/router/device.rs
@@ -17,6 +17,7 @@ use super::peer;
use super::peer::{Peer, PeerInner};
use super::SIZE_MESSAGE_PREFIX;
+use super::messages::TYPE_TRANSPORT;
use super::types::{Callback, Callbacks, KeyCallback, Opaque, PhantomCallbacks, RouterError};
use super::workers::{worker_parallel, JobParallel};
@@ -66,8 +67,8 @@ pub struct DecryptionState<C: Callbacks, T: Tun, B: Bind> {
}
pub struct Device<C: Callbacks, T: Tun, B: Bind>(
- Arc<DeviceInner<C, T, B>>,
- Vec<thread::JoinHandle<()>>,
+ Arc<DeviceInner<C, T, B>>, // reference to device state
+ Vec<thread::JoinHandle<()>>, // join handles for workers
);
impl<C: Callbacks, T: Tun, B: Bind> Drop for Device<C, T, B> {
@@ -207,8 +208,17 @@ impl<C: Callbacks, T: Tun, B: Bind> Device<C, T, B> {
// schedule for encryption and transmission to peer
if let Some(job) = peer.send_job(msg) {
+ println!("made job!");
self.0.injector.push((peer.clone(), job));
}
+
+ // ensure workers running
+ if self.0.parked.load(Ordering::Acquire) {
+ for handle in &self.1 {
+ handle.thread().unpark();
+ }
+ }
+
Ok(())
}
@@ -216,8 +226,17 @@ impl<C: Callbacks, T: Tun, B: Bind> Device<C, T, B> {
///
/// # Arguments
///
- /// - ct_msg: Encrypted transport message
- pub fn recv(&self, ct_msg: &mut [u8]) {
+ /// - msg: Encrypted transport message
+ pub fn recv(&self, msg: Vec<u8>) -> Result<(), RouterError> {
+ // ensure that the type field access is within bounds
+ if msg.len() < SIZE_MESSAGE_PREFIX || msg[0] != TYPE_TRANSPORT {
+ return Err(RouterError::MalformedTransportMessage);
+ }
+
+ // parse / cast
+
+ // lookup peer based on receiver id
+
unimplemented!();
}
}
diff --git a/src/router/messages.rs b/src/router/messages.rs
index bec24ac..e7b592b 100644
--- a/src/router/messages.rs
+++ b/src/router/messages.rs
@@ -2,6 +2,8 @@ use byteorder::LittleEndian;
use zerocopy::byteorder::{U32, U64};
use zerocopy::{AsBytes, ByteSlice, FromBytes, LayoutVerified};
+pub const TYPE_TRANSPORT: u8 = 4;
+
#[repr(packed)]
#[derive(Copy, Clone, FromBytes, AsBytes)]
pub struct TransportHeader {
diff --git a/src/router/peer.rs b/src/router/peer.rs
index d755fa5..c1762ad 100644
--- a/src/router/peer.rs
+++ b/src/router/peer.rs
@@ -5,9 +5,9 @@ use std::sync::mpsc::{sync_channel, SyncSender};
use std::sync::{Arc, Weak};
use std::thread;
-use spin;
+use spin::Mutex;
-use arraydeque::{ArrayDeque, Wrapping};
+use arraydeque::{ArrayDeque, Wrapping, Saturating};
use zerocopy::{AsBytes, LayoutVerified};
use treebitmap::address::Address;
@@ -40,6 +40,8 @@ pub struct KeyWheel {
pub struct PeerInner<C: Callbacks, T: Tun, B: Bind> {
pub stopped: AtomicBool,
pub opaque: C::Opaque,
+ pub outbound: Mutex<ArrayDeque<[JobOutbound; MAX_STAGED_PACKETS], Wrapping>>,
+ pub inbound: Mutex<ArrayDeque<[JobInbound<C, T, B>; MAX_STAGED_PACKETS], Wrapping>>,
pub device: Arc<DeviceInner<C, T, B>>,
pub thread_outbound: spin::Mutex<Option<thread::JoinHandle<()>>>,
pub thread_inbound: spin::Mutex<Option<thread::JoinHandle<()>>>,
@@ -101,6 +103,7 @@ fn treebit_remove<A: Address, C: Callbacks, T: Tun, B: Bind>(
impl<C: Callbacks, T: Tun, B: Bind> Drop for Peer<C, T, B> {
fn drop(&mut self) {
+ println!("drop");
// mark peer as stopped
let peer = &self.0;
@@ -167,6 +170,8 @@ pub fn new_peer<C: Callbacks, T: Tun, B: Bind>(
let device = device.clone();
Arc::new(PeerInner {
opaque,
+ inbound: Mutex::new(ArrayDeque::new()),
+ outbound: Mutex::new(ArrayDeque::new()),
stopped: AtomicBool::new(false),
device: device,
ekey: spin::Mutex::new(None),
@@ -258,7 +263,10 @@ impl<C: Callbacks, T: Tun, B: Bind> PeerInner<C, T, B> {
// add job to in-order queue and return to device for inclusion in worker pool
match self.queue_outbound.try_send(job.clone()) {
Ok(_) => Some(job),
- Err(_) => None,
+ Err(e) => {
+ println!("{:?}", e);
+ None
+ }
}
}
}
diff --git a/src/router/tests.rs b/src/router/tests.rs
index 07851a8..8c51ff1 100644
--- a/src/router/tests.rs
+++ b/src/router/tests.rs
@@ -3,11 +3,13 @@ use std::fmt;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
+use std::thread;
+use std::time::{Duration, Instant};
use pnet::packet::ipv4::MutableIpv4Packet;
use pnet::packet::ipv6::MutableIpv6Packet;
-use super::super::types::{Bind, Tun};
+use super::super::types::{Bind, Key, KeyPair, Tun};
use super::{Device, Peer, SIZE_MESSAGE_PREFIX};
#[derive(Debug)]
@@ -93,6 +95,32 @@ impl fmt::Display for BindError {
}
}
+fn dummy_keypair(initiator: bool) -> KeyPair {
+ let k1 = Key {
+ key: [0x53u8; 32],
+ id: 0x646e6573,
+ };
+ let k2 = Key {
+ key: [0x52u8; 32],
+ id: 0x76636572,
+ };
+ if initiator {
+ KeyPair {
+ birth: Instant::now(),
+ initiator: true,
+ send: k1,
+ recv: k2,
+ }
+ } else {
+ KeyPair {
+ birth: Instant::now(),
+ initiator: false,
+ send: k2,
+ recv: k1,
+ }
+ }
+}
+
#[test]
fn test_outbound() {
let opaque = Arc::new(AtomicBool::new(false));
@@ -134,6 +162,11 @@ fn test_outbound() {
),
];
+ thread::sleep(Duration::from_millis(1000));
+ assert!(false);
+
+ peer.add_keypair(dummy_keypair(true));
+
for (mask, len, ip, okay) in &tests {
opaque.store(false, Ordering::SeqCst);
@@ -161,8 +194,8 @@ fn test_outbound() {
// cryptkey routing succeeded
assert!(res.is_ok());
- // and a key should have been requested
- assert!(opaque.load(Ordering::Acquire));
+ // and a key should have been requested
+ // assert!(opaque.load(Ordering::Acquire), "did not request key");
} else {
assert!(res.is_err());
}
@@ -170,4 +203,6 @@ fn test_outbound() {
// clear subnets for next test
peer.remove_subnets();
}
+
+ assert!(false);
}
diff --git a/src/router/types.rs b/src/router/types.rs
index 5077686..336f56b 100644
--- a/src/router/types.rs
+++ b/src/router/types.rs
@@ -1,6 +1,6 @@
+use std::error::Error;
use std::fmt;
use std::marker::PhantomData;
-use std::error::Error;
pub trait Opaque: Send + Sync + 'static {}
@@ -52,19 +52,19 @@ impl<O: Opaque, R: Callback<O>, S: Callback<O>, K: KeyCallback<O>> Callbacks
type CallbackKey = K;
}
-
-
#[derive(Debug)]
pub enum RouterError {
NoCryptKeyRoute,
MalformedIPHeader,
+ MalformedTransportMessage,
}
impl fmt::Display for RouterError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RouterError::NoCryptKeyRoute => write!(f, "No cryptkey route configured for subnet"),
- RouterError::MalformedIPHeader => write!(f, "IP header is malformed")
+ RouterError::MalformedIPHeader => write!(f, "IP header is malformed"),
+ RouterError::MalformedTransportMessage => write!(f, "IP header is malformed"),
}
}
}
@@ -77,4 +77,4 @@ impl Error for RouterError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
None
}
-} \ No newline at end of file
+}
diff --git a/src/router/workers.rs b/src/router/workers.rs
index 1af2cae..241b06f 100644
--- a/src/router/workers.rs
+++ b/src/router/workers.rs
@@ -167,7 +167,7 @@ pub fn worker_outbound<C: Callbacks, T: Tun, B: Bind>(
Ok(buf) => {
while !peer.stopped.load(Ordering::Acquire) {
match buf.try_lock() {
- None => (),
+ None => (), // nothing to do
Some(buf) => match buf.status {
Status::Done => {
// parse / cast
@@ -198,7 +198,8 @@ pub fn worker_outbound<C: Callbacks, T: Tun, B: Bind>(
thread::park();
}
}
- Err(_) => {
+ Err(e) => {
+ println!("park outbound! {:?}", e);
break;
}
}
@@ -211,9 +212,11 @@ pub fn worker_parallel<C: Callbacks, T: Tun, B: Bind>(
stealers: Vec<Stealer<JobParallel<C, T, B>>>, // stealers (from other threads)
) {
while device.running.load(Ordering::SeqCst) {
+ println!("running");
match find_task(&local, &device.injector, &stealers) {
Some(job) => {
let (peer, buf) = job;
+ println!("jobs!");
// take ownership of the job buffer and complete it
{
@@ -269,6 +272,7 @@ pub fn worker_parallel<C: Callbacks, T: Tun, B: Bind>(
.unpark();
}
None => {
+ println!("park");
device.parked.store(true, Ordering::Release);
thread::park();
}