summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2019-10-30 12:01:12 +0100
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2019-10-30 12:01:12 +0100
commitafc96611a5d853a0bfaff46ea17695ab5332b13b (patch)
treef16fb932047d051ba1bc511657e5adb7fa67be03
parentUnified use of make_packet during tests (diff)
downloadwireguard-rs-afc96611a5d853a0bfaff46ea17695ab5332b13b.tar.xz
wireguard-rs-afc96611a5d853a0bfaff46ea17695ab5332b13b.zip
Change router job to accommodate keep_key_fresh
-rw-r--r--src/wireguard/router/device.rs13
-rw-r--r--src/wireguard/router/mod.rs3
-rw-r--r--src/wireguard/router/peer.rs74
-rw-r--r--src/wireguard/router/workers.rs184
-rw-r--r--src/wireguard/tests.rs5
5 files changed, 139 insertions, 140 deletions
diff --git a/src/wireguard/router/device.rs b/src/wireguard/router/device.rs
index 0818637..7c3b0a1 100644
--- a/src/wireguard/router/device.rs
+++ b/src/wireguard/router/device.rs
@@ -19,7 +19,7 @@ use super::constants::*;
use super::messages::{TransportHeader, TYPE_TRANSPORT};
use super::peer::{new_peer, Peer, PeerInner};
use super::types::{Callbacks, RouterError};
-use super::workers::{worker_parallel, JobParallel, Operation};
+use super::workers::{worker_parallel, JobParallel};
use super::SIZE_MESSAGE_PREFIX;
use super::route::get_route;
@@ -44,10 +44,9 @@ pub struct DeviceInner<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Write
}
pub struct EncryptionState {
- pub key: [u8; 32], // encryption key
- pub id: u32, // receiver id
- pub nonce: u64, // next available nonce
- pub death: Instant, // (birth + reject-after-time - keepalive-timeout - rekey-timeout)
+ pub keypair: Arc<KeyPair>, // keypair
+ pub nonce: u64, // next available nonce
+ pub death: Instant, // (birth + reject-after-time - keepalive-timeout - rekey-timeout)
}
pub struct DecryptionState<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Writer<E>> {
@@ -143,8 +142,6 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Writer<E>> Device<E, C,
// schedule for encryption and transmission to peer
if let Some(job) = peer.send_job(msg, true) {
- debug_assert_eq!(job.1.op, Operation::Encryption);
-
// add job to worker queue
let idx = self.state.queue_next.fetch_add(1, Ordering::SeqCst);
let queues = self.state.queues.lock();
@@ -186,8 +183,6 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Writer<E>> Device<E, C,
// schedule for decryption and TUN write
if let Some(job) = dec.peer.recv_job(src, dec.clone(), msg) {
- debug_assert_eq!(job.1.op, Operation::Decryption);
-
// add job to worker queue
let idx = self.state.queue_next.fetch_add(1, Ordering::SeqCst);
let queues = self.state.queues.lock();
diff --git a/src/wireguard/router/mod.rs b/src/wireguard/router/mod.rs
index 7b317f2..f3565e2 100644
--- a/src/wireguard/router/mod.rs
+++ b/src/wireguard/router/mod.rs
@@ -14,6 +14,9 @@ mod tests;
use messages::TransportHeader;
use std::mem;
+use super::constants::REJECT_AFTER_MESSAGES;
+use super::constants::REKEY_AFTER_MESSAGES;
+
pub const SIZE_MESSAGE_PREFIX: usize = mem::size_of::<TransportHeader>();
pub const CAPACITY_MESSAGE_POSTFIX: usize = workers::SIZE_TAG;
diff --git a/src/wireguard/router/peer.rs b/src/wireguard/router/peer.rs
index 66a6e9f..50fdfe7 100644
--- a/src/wireguard/router/peer.rs
+++ b/src/wireguard/router/peer.rs
@@ -24,9 +24,8 @@ use super::messages::TransportHeader;
use futures::*;
-use super::workers::Operation;
use super::workers::{worker_inbound, worker_outbound};
-use super::workers::{JobBuffer, JobInbound, JobOutbound, JobParallel};
+use super::workers::{JobDecryption, JobEncryption, JobInbound, JobOutbound, JobParallel};
use super::SIZE_MESSAGE_PREFIX;
use super::constants::*;
@@ -99,9 +98,8 @@ fn treebit_remove<E: Endpoint, A: Address, C: Callbacks, T: tun::Writer, B: bind
impl EncryptionState {
fn new(keypair: &Arc<KeyPair>) -> EncryptionState {
EncryptionState {
- id: keypair.send.id,
- key: keypair.send.key,
nonce: 0,
+ keypair: keypair.clone(),
death: keypair.birth + REJECT_AFTER_TIME,
}
}
@@ -294,22 +292,14 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Writer<E>> PeerInner<E,
msg: Vec<u8>,
) -> Option<JobParallel> {
let (tx, rx) = oneshot();
- let key = dec.keypair.recv.key;
+ let keypair = dec.keypair.clone();
match self.inbound.lock().try_send((dec, src, rx)) {
- Ok(_) => Some((
- tx,
- JobBuffer {
- msg,
- key: key,
- okay: false,
- op: Operation::Decryption,
- },
- )),
+ Ok(_) => Some(JobParallel::Decryption(tx, JobDecryption { msg, keypair })),
Err(_) => None,
}
}
- pub fn send_job(&self, mut msg: Vec<u8>, stage: bool) -> Option<JobParallel> {
+ pub fn send_job(&self, msg: Vec<u8>, stage: bool) -> Option<JobParallel> {
debug!("peer.send_job");
debug_assert!(
msg.len() >= mem::size_of::<TransportHeader>(),
@@ -317,29 +307,24 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Writer<E>> PeerInner<E,
msg.len()
);
- // parse / cast
- let (header, _) = LayoutVerified::new_from_prefix(&mut msg[..]).unwrap();
- let mut header: LayoutVerified<&mut [u8], TransportHeader> = header;
-
// check if has key
- let key = {
- let mut ekey = self.ekey.lock();
- let key = match ekey.as_mut() {
- None => None,
- Some(mut state) => {
- // avoid integer overflow in nonce
- if state.nonce >= REJECT_AFTER_MESSAGES - 1 {
- *ekey = None;
- None
- } else {
- // there should be no stacked packets lingering around
- debug!("encryption state available, nonce = {}", state.nonce);
-
- // set transport message fields
- header.f_counter.set(state.nonce);
- header.f_receiver.set(state.id);
- state.nonce += 1;
- Some(state.key)
+ let (keypair, counter) = {
+ let keypair = {
+ // TODO: consider using atomic ptr for ekey state
+ let mut ekey = self.ekey.lock();
+ match ekey.as_mut() {
+ None => None,
+ Some(mut state) => {
+ // avoid integer overflow in nonce
+ if state.nonce >= REJECT_AFTER_MESSAGES - 1 {
+ *ekey = None;
+ None
+ } else {
+ debug!("encryption state available, nonce = {}", state.nonce);
+ let counter = state.nonce;
+ state.nonce += 1;
+ Some((state.keypair.clone(), counter))
+ }
}
}
};
@@ -347,25 +332,24 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Writer<E>> PeerInner<E,
// If not suitable key was found:
// 1. Stage packet for later transmission
// 2. Request new key
- if key.is_none() && stage {
+ if keypair.is_none() && stage {
self.staged_packets.lock().push_back(msg);
C::need_key(&self.opaque);
return None;
};
- key
+ keypair
}?;
- // add job to in-order queue and return sendeer to device for inclusion in worker pool
+ // add job to in-order queue and return sender to device for inclusion in worker pool
let (tx, rx) = oneshot();
match self.outbound.lock().try_send(rx) {
- Ok(_) => Some((
+ Ok(_) => Some(JobParallel::Encryption(
tx,
- JobBuffer {
+ JobEncryption {
msg,
- key,
- okay: false,
- op: Operation::Encryption,
+ counter,
+ keypair,
},
)),
Err(_) => None,
diff --git a/src/wireguard/router/workers.rs b/src/wireguard/router/workers.rs
index 70334c1..3d85188 100644
--- a/src/wireguard/router/workers.rs
+++ b/src/wireguard/router/workers.rs
@@ -4,7 +4,7 @@ use std::sync::Arc;
use futures::sync::oneshot;
use futures::*;
-use log::debug;
+use log::{debug, trace};
use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305};
@@ -16,34 +16,40 @@ use super::messages::{TransportHeader, TYPE_TRANSPORT};
use super::peer::PeerInner;
use super::route::check_route;
use super::types::Callbacks;
+use super::REJECT_AFTER_MESSAGES;
+use super::super::types::KeyPair;
use super::super::{bind, tun, Endpoint};
pub const SIZE_TAG: usize = 16;
-#[derive(PartialEq, Debug)]
-pub enum Operation {
- Encryption,
- Decryption,
+#[derive(Debug)]
+pub struct JobEncryption {
+ pub msg: Vec<u8>,
+ pub keypair: Arc<KeyPair>,
+ pub counter: u64,
}
-pub struct JobBuffer {
- pub msg: Vec<u8>, // message buffer (nonce and receiver id set)
- pub key: [u8; 32], // chacha20poly1305 key
- pub okay: bool, // state of the job
- pub op: Operation, // should be buffer be encrypted / decrypted?
+#[derive(Debug)]
+pub struct JobDecryption {
+ pub msg: Vec<u8>,
+ pub keypair: Arc<KeyPair>,
}
-pub type JobParallel = (oneshot::Sender<JobBuffer>, JobBuffer);
+#[derive(Debug)]
+pub enum JobParallel {
+ Encryption(oneshot::Sender<JobEncryption>, JobEncryption),
+ Decryption(oneshot::Sender<Option<JobDecryption>>, JobDecryption),
+}
#[allow(type_alias_bounds)]
pub type JobInbound<E, C, T, B: bind::Writer<E>> = (
Arc<DecryptionState<E, C, T, B>>,
E,
- oneshot::Receiver<JobBuffer>,
+ oneshot::Receiver<Option<JobDecryption>>,
);
-pub type JobOutbound = oneshot::Receiver<JobBuffer>;
+pub type JobOutbound = oneshot::Receiver<JobEncryption>;
pub fn worker_inbound<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Writer<E>>(
device: Arc<DeviceInner<E, C, T, B>>, // related device
@@ -64,7 +70,7 @@ pub fn worker_inbound<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Writer
let _ = rx
.map(|buf| {
debug!("inbound worker: job complete");
- if buf.okay {
+ if let Some(buf) = buf {
// cast transport header
let (header, packet): (LayoutVerified<&[u8], TransportHeader>, &[u8]) =
match LayoutVerified::new_from_prefix(&buf.msg[..]) {
@@ -134,6 +140,10 @@ pub fn worker_outbound<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Write
peer: Arc<PeerInner<E, C, T, B>>, // related peer
receiver: Receiver<JobOutbound>,
) {
+ fn keep_key_fresh(keypair: &KeyPair, counter: u64) -> bool {
+ false
+ }
+
loop {
// fetch job
let rx = match receiver.recv() {
@@ -148,27 +158,30 @@ pub fn worker_outbound<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Write
let _ = rx
.map(|buf| {
debug!("outbound worker: job complete");
- if buf.okay {
- // write to UDP bind
- let xmit = if let Some(dst) = peer.endpoint.lock().as_ref() {
- let send: &Option<B> = &*device.outbound.read();
- if let Some(writer) = send.as_ref() {
- match writer.write(&buf.msg[..], dst) {
- Err(e) => {
- debug!("failed to send outbound packet: {:?}", e);
- false
- }
- Ok(_) => true,
+ // write to UDP bind
+ let xmit = if let Some(dst) = peer.endpoint.lock().as_ref() {
+ let send: &Option<B> = &*device.outbound.read();
+ if let Some(writer) = send.as_ref() {
+ match writer.write(&buf.msg[..], dst) {
+ Err(e) => {
+ debug!("failed to send outbound packet: {:?}", e);
+ false
}
- } else {
- false
+ Ok(_) => true,
}
} else {
false
- };
+ }
+ } else {
+ false
+ };
- // trigger callback
- C::send(&peer.opaque, buf.msg.len(), xmit);
+ // trigger callback
+ C::send(&peer.opaque, buf.msg.len(), xmit);
+
+ // keep_key_fresh semantics
+ if keep_key_fresh(&buf.keypair, buf.counter) {
+ C::need_key(&peer.opaque);
}
})
.wait();
@@ -178,76 +191,85 @@ pub fn worker_outbound<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Write
pub fn worker_parallel(receiver: Receiver<JobParallel>) {
loop {
// fetch next job
- let (tx, mut buf) = match receiver.recv() {
+ let job = match receiver.recv() {
Err(_) => {
return;
}
Ok(val) => val,
};
- debug!("parallel worker: obtained job");
-
- // make space for tag (TODO: consider moving this out)
- if buf.op == Operation::Encryption {
- buf.msg.extend([0u8; SIZE_TAG].iter());
- }
+ trace!("parallel worker: obtained job");
- // cast and check size of packet
- let (mut header, packet): (LayoutVerified<&mut [u8], TransportHeader>, &mut [u8]) =
- match LayoutVerified::new_from_prefix(&mut buf.msg[..]) {
- Some(v) => v,
- None => {
- debug_assert!(
- false,
- "parallel worker: failed to parse message (insufficient size)"
- );
- continue;
- }
- };
- debug_assert!(packet.len() >= CHACHA20_POLY1305.tag_len());
+ // handle job
+ match job {
+ JobParallel::Encryption(tx, mut job) => {
+ job.msg.extend([0u8; SIZE_TAG].iter());
- // do the weird ring AEAD dance
- let key = LessSafeKey::new(UnboundKey::new(&CHACHA20_POLY1305, &buf.key[..]).unwrap());
+ // cast to header (should never fail)
+ let (mut header, body): (LayoutVerified<&mut [u8], TransportHeader>, &mut [u8]) =
+ LayoutVerified::new_from_prefix(&mut job.msg[..])
+ .expect("earlier code should ensure that there is ample space");
- // create a nonce object
- let mut nonce = [0u8; 12];
- debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len());
- nonce[4..].copy_from_slice(header.f_counter.as_bytes());
- let nonce = Nonce::assume_unique_for_key(nonce);
+ // set header fields
+ header.f_type.set(TYPE_TRANSPORT);
+ header.f_receiver.set(job.keypair.send.id);
+ header.f_counter.set(job.counter);
- match buf.op {
- Operation::Encryption => {
- debug!("parallel worker: process encryption");
+ // create a nonce object
+ let mut nonce = [0u8; 12];
+ debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len());
+ nonce[4..].copy_from_slice(header.f_counter.as_bytes());
+ let nonce = Nonce::assume_unique_for_key(nonce);
- // set the type field
- header.f_type.set(TYPE_TRANSPORT);
+ // do the weird ring AEAD dance
+ let key = LessSafeKey::new(
+ UnboundKey::new(&CHACHA20_POLY1305, &job.keypair.send.key[..]).unwrap(),
+ );
// encrypt content of transport message in-place
- let end = packet.len() - SIZE_TAG;
+ let end = body.len() - SIZE_TAG;
let tag = key
- .seal_in_place_separate_tag(nonce, Aad::empty(), &mut packet[..end])
+ .seal_in_place_separate_tag(nonce, Aad::empty(), &mut body[..end])
.unwrap();
// append tag
- packet[end..].copy_from_slice(tag.as_ref());
+ body[end..].copy_from_slice(tag.as_ref());
- buf.okay = true;
+ // pass ownership
+ let _ = tx.send(job);
}
- Operation::Decryption => {
- debug!("parallel worker: process decryption");
-
- // opening failure is signaled by fault state
- buf.okay = match key.open_in_place(nonce, Aad::empty(), packet) {
- Ok(_) => true,
- Err(_) => false,
- };
+ JobParallel::Decryption(tx, mut job) => {
+ // cast to header (could fail)
+ let layout: Option<(LayoutVerified<&mut [u8], TransportHeader>, &mut [u8])> =
+ LayoutVerified::new_from_prefix(&mut job.msg[..]);
+
+ let _ = tx.send(match layout {
+ Some((header, body)) => {
+ debug_assert_eq!(header.f_type.get(), TYPE_TRANSPORT);
+ if header.f_counter.get() >= REJECT_AFTER_MESSAGES {
+ None
+ } else {
+ // create a nonce object
+ let mut nonce = [0u8; 12];
+ debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len());
+ nonce[4..].copy_from_slice(header.f_counter.as_bytes());
+ let nonce = Nonce::assume_unique_for_key(nonce);
+
+ // do the weird ring AEAD dance
+ let key = LessSafeKey::new(
+ UnboundKey::new(&CHACHA20_POLY1305, &job.keypair.recv.key[..])
+ .unwrap(),
+ );
+
+ // attempt to open (and authenticate) the body
+ match key.open_in_place(nonce, Aad::empty(), body) {
+ Ok(_) => Some(job),
+ Err(_) => None,
+ }
+ }
+ }
+ None => None,
+ });
}
}
-
- // pass ownership to consumer
- let okay = tx.send(buf);
- debug!(
- "parallel worker: passing ownership to sequential worker: {}",
- okay.is_ok()
- );
}
}
diff --git a/src/wireguard/tests.rs b/src/wireguard/tests.rs
index 3ecb979..37dd571 100644
--- a/src/wireguard/tests.rs
+++ b/src/wireguard/tests.rs
@@ -77,7 +77,6 @@ fn wait() {
}
/* Create and configure two matching pure instances of WireGuard
- *
*/
#[test]
fn test_pure_wireguard() {
@@ -166,8 +165,6 @@ fn test_pure_wireguard() {
fake1.write(p);
}
- wait();
-
while let Some(p) = backup.pop() {
assert_eq!(
hex::encode(fake2.read()),
@@ -197,8 +194,6 @@ fn test_pure_wireguard() {
fake2.write(p);
}
- wait();
-
while let Some(p) = backup.pop() {
assert_eq!(
hex::encode(fake1.read()),