aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2019-09-15 21:10:23 +0200
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2019-09-15 21:10:23 +0200
commit32c030367cb017f0318cb97ccf27f8788acadf72 (patch)
tree6641288107f77ee34bea82301927e9af85df8c17
parentSent staged packets when key-pair confirmed (diff)
downloadwireguard-rs-32c030367cb017f0318cb97ccf27f8788acadf72.tar.xz
wireguard-rs-32c030367cb017f0318cb97ccf27f8788acadf72.zip
WIP: Handshake queue and workers
-rw-r--r--Cargo.lock10
-rw-r--r--Cargo.toml1
-rw-r--r--src/router/peer.rs50
-rw-r--r--src/wireguard.rs55
4 files changed, 90 insertions, 26 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 8d04e33..41aa6e0 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -178,6 +178,14 @@ dependencies = [
]
[[package]]
+name = "crossbeam-channel"
+version = "0.3.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "crossbeam-utils 0.6.6 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
name = "crossbeam-deque"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1582,6 +1590,7 @@ dependencies = [
"byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"chacha20poly1305 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"clear_on_drop 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
+ "crossbeam-channel 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-deque 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"digest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -1676,6 +1685,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum chacha20poly1305 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "40cd3ddeae0b0ea7fe848a06e4fbf3f02463648b9395bd1139368ce42b44543e"
"checksum clear_on_drop 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "97276801e127ffb46b66ce23f35cc96bd454fa311294bced4bbace7baa8b1d17"
"checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f"
+"checksum crossbeam-channel 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "c8ec7fcd21571dc78f96cc96243cab8d8f035247c3efd16c687be154c3fa9efa"
"checksum crossbeam-deque 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b18cd2e169ad86297e6bc0ad9aa679aee9daa4f19e8163860faf7c164e4f5a71"
"checksum crossbeam-epoch 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "fedcd6772e37f3da2a9af9bf12ebe046c0dfe657992377b4df982a2b54cd37a9"
"checksum crossbeam-queue 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7c979cd6cfe72335896575c6b5688da489e420d36a27a0b9eb0c73db574b4a4b"
diff --git a/Cargo.toml b/Cargo.toml
index caa6d20..379af42 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -22,6 +22,7 @@ futures = "0.1.28"
arraydeque = "0.4.5"
treebitmap = "^0.4"
crossbeam-deque = "0.7"
+crossbeam-channel = "0.3.9"
hjul = "0.1.2"
ring = "0.16.7"
chacha20poly1305 = "^0.1"
diff --git a/src/router/peer.rs b/src/router/peer.rs
index 728be11..952e439 100644
--- a/src/router/peer.rs
+++ b/src/router/peer.rs
@@ -308,28 +308,40 @@ impl<C: Callbacks, T: Tun, B: Bind> PeerInner<C, T, B> {
let mut header: LayoutVerified<&mut [u8], TransportHeader> = header;
// check if has key
- let key = match self.ekey.lock().as_mut() {
- None => {
- // add to staged packets (create no job)
- debug!("execute callback: call_need_key");
- C::need_key(&self.opaque);
+ let key = {
+ let mut ekey = self.ekey.lock();
+ let key = match ekey.as_mut() {
+ None => None,
+ Some(mut state) => {
+ // avoid integer overflow in nonce
+ if state.nonce >= REJECT_AFTER_MESSAGES - 1 {
+ *ekey = None;
+ None
+ } else {
+ // there should be no stacked packets lingering around
+ debug_assert_eq!(self.staged_packets.lock().len(), 0);
+ debug!("encryption state available, nonce = {}", state.nonce);
+
+ // set transport message fields
+ header.f_counter.set(state.nonce);
+ header.f_receiver.set(state.id);
+ state.nonce += 1;
+ Some(state.key)
+ }
+ }
+ };
+
+ // If not suitable key was found:
+ // 1. Stage packet for later transmission
+ // 2. Request new key
+ if key.is_none() {
self.staged_packets.lock().push_back(msg);
+ C::need_key(&self.opaque);
return None;
- }
- Some(mut state) => {
- // avoid integer overflow in nonce
- if state.nonce >= REJECT_AFTER_MESSAGES - 1 {
- return None;
- }
- debug!("encryption state available, nonce = {}", state.nonce);
+ };
- // set transport message fields
- header.f_counter.set(state.nonce);
- header.f_receiver.set(state.id);
- state.nonce += 1;
- state.key
- }
- };
+ key
+ }?;
// add job to in-order queue and return sendeer to device for inclusion in worker pool
let (tx, rx) = oneshot();
diff --git a/src/wireguard.rs b/src/wireguard.rs
index 0bd5da7..71b981e 100644
--- a/src/wireguard.rs
+++ b/src/wireguard.rs
@@ -2,12 +2,20 @@ use crate::handshake;
use crate::router;
use crate::types::{Bind, Tun};
-use byteorder::{ByteOrder, LittleEndian};
-
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::mpsc::sync_channel;
+use std::sync::Arc;
use std::thread;
+use std::time::{Duration, Instant};
+use byteorder::{ByteOrder, LittleEndian};
+use crossbeam_channel::bounded;
use x25519_dalek::StaticSecret;
+const SIZE_HANDSHAKE_QUEUE: usize = 128;
+const THRESHOLD_UNDER_LOAD: usize = SIZE_HANDSHAKE_QUEUE / 4;
+const DURATION_UNDER_LOAD: Duration = Duration::from_millis(10_000);
+
pub struct Timers {}
pub struct Events();
@@ -23,18 +31,30 @@ impl router::Callbacks for Events {
}
pub struct Wireguard<T: Tun, B: Bind> {
- router: router::Device<Events, T, B>,
- handshake: Option<handshake::Device<()>>,
+ router: Arc<router::Device<Events, T, B>>,
+ handshake: Option<Arc<handshake::Device<()>>>,
}
impl<T: Tun, B: Bind> Wireguard<T, B> {
+ fn start(&self) {}
+
fn new(tun: T, bind: B) -> Wireguard<T, B> {
- let router = router::Device::new(num_cpus::get(), tun.clone(), bind.clone());
+ let router = Arc::new(router::Device::new(
+ num_cpus::get(),
+ tun.clone(),
+ bind.clone(),
+ ));
+
+ let handshake_staged = Arc::new(AtomicUsize::new(0));
// start UDP read IO thread
+ let (handshake_tx, handshake_rx) = bounded(128);
{
let tun = tun.clone();
thread::spawn(move || {
+ let mut under_load =
+ Instant::now() - DURATION_UNDER_LOAD - Duration::from_millis(1000);
+
loop {
// read UDP packet into vector
let size = tun.mtu() + 148; // maximum message size
@@ -45,14 +65,27 @@ impl<T: Tun, B: Bind> Wireguard<T, B> {
msg.truncate(size);
// message type de-multiplexer
- if msg.len() < 4 {
+ if msg.len() < std::mem::size_of::<u32>() {
continue;
}
+
match LittleEndian::read_u32(&msg[..]) {
handshake::TYPE_COOKIE_REPLY
| handshake::TYPE_INITIATION
| handshake::TYPE_RESPONSE => {
- // handshake message
+ // detect if under load
+ if handshake_staged.fetch_add(1, Ordering::SeqCst)
+ > THRESHOLD_UNDER_LOAD
+ {
+ under_load = Instant::now()
+ }
+
+ // pass source address along if under load
+ if under_load.elapsed() < DURATION_UNDER_LOAD {
+ handshake_tx.send((msg, Some(src))).unwrap();
+ } else {
+ handshake_tx.send((msg, None)).unwrap();
+ }
}
router::TYPE_TRANSPORT => {
// transport message
@@ -63,6 +96,14 @@ impl<T: Tun, B: Bind> Wireguard<T, B> {
});
}
+ // start handshake workers
+ for _ in 0..num_cpus::get() {
+ let handshake_rx = handshake_rx.clone();
+ thread::spawn(move || loop {
+ let (msg, src) = handshake_rx.recv().unwrap(); // TODO handle error
+ });
+ }
+
// start TUN read IO thread
thread::spawn(move || {});