From afc96611a5d853a0bfaff46ea17695ab5332b13b Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Wed, 30 Oct 2019 12:01:12 +0100 Subject: Change router job to accommodate keep_key_fresh --- src/wireguard/router/device.rs | 13 +-- src/wireguard/router/mod.rs | 3 + src/wireguard/router/peer.rs | 74 +++++++--------- src/wireguard/router/workers.rs | 184 ++++++++++++++++++++++------------------ src/wireguard/tests.rs | 5 -- 5 files changed, 139 insertions(+), 140 deletions(-) (limited to 'src/wireguard') diff --git a/src/wireguard/router/device.rs b/src/wireguard/router/device.rs index 0818637..7c3b0a1 100644 --- a/src/wireguard/router/device.rs +++ b/src/wireguard/router/device.rs @@ -19,7 +19,7 @@ use super::constants::*; use super::messages::{TransportHeader, TYPE_TRANSPORT}; use super::peer::{new_peer, Peer, PeerInner}; use super::types::{Callbacks, RouterError}; -use super::workers::{worker_parallel, JobParallel, Operation}; +use super::workers::{worker_parallel, JobParallel}; use super::SIZE_MESSAGE_PREFIX; use super::route::get_route; @@ -44,10 +44,9 @@ pub struct DeviceInner, // keypair + pub nonce: u64, // next available nonce + pub death: Instant, // (birth + reject-after-time - keepalive-timeout - rekey-timeout) } pub struct DecryptionState> { @@ -143,8 +142,6 @@ impl> Device> Device(); pub const CAPACITY_MESSAGE_POSTFIX: usize = workers::SIZE_TAG; diff --git a/src/wireguard/router/peer.rs b/src/wireguard/router/peer.rs index 66a6e9f..50fdfe7 100644 --- a/src/wireguard/router/peer.rs +++ b/src/wireguard/router/peer.rs @@ -24,9 +24,8 @@ use super::messages::TransportHeader; use futures::*; -use super::workers::Operation; use super::workers::{worker_inbound, worker_outbound}; -use super::workers::{JobBuffer, JobInbound, JobOutbound, JobParallel}; +use super::workers::{JobDecryption, JobEncryption, JobInbound, JobOutbound, JobParallel}; use super::SIZE_MESSAGE_PREFIX; use super::constants::*; @@ -99,9 +98,8 @@ fn treebit_remove) -> EncryptionState { EncryptionState { - id: keypair.send.id, - key: keypair.send.key, nonce: 0, + keypair: keypair.clone(), death: keypair.birth + REJECT_AFTER_TIME, } } @@ -294,22 +292,14 @@ impl> PeerInner, ) -> Option { let (tx, rx) = oneshot(); - let key = dec.keypair.recv.key; + let keypair = dec.keypair.clone(); match self.inbound.lock().try_send((dec, src, rx)) { - Ok(_) => Some(( - tx, - JobBuffer { - msg, - key: key, - okay: false, - op: Operation::Decryption, - }, - )), + Ok(_) => Some(JobParallel::Decryption(tx, JobDecryption { msg, keypair })), Err(_) => None, } } - pub fn send_job(&self, mut msg: Vec, stage: bool) -> Option { + pub fn send_job(&self, msg: Vec, stage: bool) -> Option { debug!("peer.send_job"); debug_assert!( msg.len() >= mem::size_of::(), @@ -317,29 +307,24 @@ impl> PeerInner = header; - // check if has key - let key = { - let mut ekey = self.ekey.lock(); - let key = match ekey.as_mut() { - None => None, - Some(mut state) => { - // avoid integer overflow in nonce - if state.nonce >= REJECT_AFTER_MESSAGES - 1 { - *ekey = None; - None - } else { - // there should be no stacked packets lingering around - debug!("encryption state available, nonce = {}", state.nonce); - - // set transport message fields - header.f_counter.set(state.nonce); - header.f_receiver.set(state.id); - state.nonce += 1; - Some(state.key) + let (keypair, counter) = { + let keypair = { + // TODO: consider using atomic ptr for ekey state + let mut ekey = self.ekey.lock(); + match ekey.as_mut() { + None => None, + Some(mut state) => { + // avoid integer overflow in nonce + if state.nonce >= REJECT_AFTER_MESSAGES - 1 { + *ekey = None; + None + } else { + debug!("encryption state available, nonce = {}", state.nonce); + let counter = state.nonce; + state.nonce += 1; + Some((state.keypair.clone(), counter)) + } } } }; @@ -347,25 +332,24 @@ impl> PeerInner Some(( + Ok(_) => Some(JobParallel::Encryption( tx, - JobBuffer { + JobEncryption { msg, - key, - okay: false, - op: Operation::Encryption, + counter, + keypair, }, )), Err(_) => None, diff --git a/src/wireguard/router/workers.rs b/src/wireguard/router/workers.rs index 70334c1..3d85188 100644 --- a/src/wireguard/router/workers.rs +++ b/src/wireguard/router/workers.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use futures::sync::oneshot; use futures::*; -use log::debug; +use log::{debug, trace}; use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305}; @@ -16,34 +16,40 @@ use super::messages::{TransportHeader, TYPE_TRANSPORT}; use super::peer::PeerInner; use super::route::check_route; use super::types::Callbacks; +use super::REJECT_AFTER_MESSAGES; +use super::super::types::KeyPair; use super::super::{bind, tun, Endpoint}; pub const SIZE_TAG: usize = 16; -#[derive(PartialEq, Debug)] -pub enum Operation { - Encryption, - Decryption, +#[derive(Debug)] +pub struct JobEncryption { + pub msg: Vec, + pub keypair: Arc, + pub counter: u64, } -pub struct JobBuffer { - pub msg: Vec, // message buffer (nonce and receiver id set) - pub key: [u8; 32], // chacha20poly1305 key - pub okay: bool, // state of the job - pub op: Operation, // should be buffer be encrypted / decrypted? +#[derive(Debug)] +pub struct JobDecryption { + pub msg: Vec, + pub keypair: Arc, } -pub type JobParallel = (oneshot::Sender, JobBuffer); +#[derive(Debug)] +pub enum JobParallel { + Encryption(oneshot::Sender, JobEncryption), + Decryption(oneshot::Sender>, JobDecryption), +} #[allow(type_alias_bounds)] pub type JobInbound> = ( Arc>, E, - oneshot::Receiver, + oneshot::Receiver>, ); -pub type JobOutbound = oneshot::Receiver; +pub type JobOutbound = oneshot::Receiver; pub fn worker_inbound>( device: Arc>, // related device @@ -64,7 +70,7 @@ pub fn worker_inbound, &[u8]) = match LayoutVerified::new_from_prefix(&buf.msg[..]) { @@ -134,6 +140,10 @@ pub fn worker_outbound>, // related peer receiver: Receiver, ) { + fn keep_key_fresh(keypair: &KeyPair, counter: u64) -> bool { + false + } + loop { // fetch job let rx = match receiver.recv() { @@ -148,27 +158,30 @@ pub fn worker_outbound = &*device.outbound.read(); - if let Some(writer) = send.as_ref() { - match writer.write(&buf.msg[..], dst) { - Err(e) => { - debug!("failed to send outbound packet: {:?}", e); - false - } - Ok(_) => true, + // write to UDP bind + let xmit = if let Some(dst) = peer.endpoint.lock().as_ref() { + let send: &Option = &*device.outbound.read(); + if let Some(writer) = send.as_ref() { + match writer.write(&buf.msg[..], dst) { + Err(e) => { + debug!("failed to send outbound packet: {:?}", e); + false } - } else { - false + Ok(_) => true, } } else { false - }; + } + } else { + false + }; - // trigger callback - C::send(&peer.opaque, buf.msg.len(), xmit); + // trigger callback + C::send(&peer.opaque, buf.msg.len(), xmit); + + // keep_key_fresh semantics + if keep_key_fresh(&buf.keypair, buf.counter) { + C::need_key(&peer.opaque); } }) .wait(); @@ -178,76 +191,85 @@ pub fn worker_outbound) { loop { // fetch next job - let (tx, mut buf) = match receiver.recv() { + let job = match receiver.recv() { Err(_) => { return; } Ok(val) => val, }; - debug!("parallel worker: obtained job"); - - // make space for tag (TODO: consider moving this out) - if buf.op == Operation::Encryption { - buf.msg.extend([0u8; SIZE_TAG].iter()); - } + trace!("parallel worker: obtained job"); - // cast and check size of packet - let (mut header, packet): (LayoutVerified<&mut [u8], TransportHeader>, &mut [u8]) = - match LayoutVerified::new_from_prefix(&mut buf.msg[..]) { - Some(v) => v, - None => { - debug_assert!( - false, - "parallel worker: failed to parse message (insufficient size)" - ); - continue; - } - }; - debug_assert!(packet.len() >= CHACHA20_POLY1305.tag_len()); + // handle job + match job { + JobParallel::Encryption(tx, mut job) => { + job.msg.extend([0u8; SIZE_TAG].iter()); - // do the weird ring AEAD dance - let key = LessSafeKey::new(UnboundKey::new(&CHACHA20_POLY1305, &buf.key[..]).unwrap()); + // cast to header (should never fail) + let (mut header, body): (LayoutVerified<&mut [u8], TransportHeader>, &mut [u8]) = + LayoutVerified::new_from_prefix(&mut job.msg[..]) + .expect("earlier code should ensure that there is ample space"); - // create a nonce object - let mut nonce = [0u8; 12]; - debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len()); - nonce[4..].copy_from_slice(header.f_counter.as_bytes()); - let nonce = Nonce::assume_unique_for_key(nonce); + // set header fields + header.f_type.set(TYPE_TRANSPORT); + header.f_receiver.set(job.keypair.send.id); + header.f_counter.set(job.counter); - match buf.op { - Operation::Encryption => { - debug!("parallel worker: process encryption"); + // create a nonce object + let mut nonce = [0u8; 12]; + debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len()); + nonce[4..].copy_from_slice(header.f_counter.as_bytes()); + let nonce = Nonce::assume_unique_for_key(nonce); - // set the type field - header.f_type.set(TYPE_TRANSPORT); + // do the weird ring AEAD dance + let key = LessSafeKey::new( + UnboundKey::new(&CHACHA20_POLY1305, &job.keypair.send.key[..]).unwrap(), + ); // encrypt content of transport message in-place - let end = packet.len() - SIZE_TAG; + let end = body.len() - SIZE_TAG; let tag = key - .seal_in_place_separate_tag(nonce, Aad::empty(), &mut packet[..end]) + .seal_in_place_separate_tag(nonce, Aad::empty(), &mut body[..end]) .unwrap(); // append tag - packet[end..].copy_from_slice(tag.as_ref()); + body[end..].copy_from_slice(tag.as_ref()); - buf.okay = true; + // pass ownership + let _ = tx.send(job); } - Operation::Decryption => { - debug!("parallel worker: process decryption"); - - // opening failure is signaled by fault state - buf.okay = match key.open_in_place(nonce, Aad::empty(), packet) { - Ok(_) => true, - Err(_) => false, - }; + JobParallel::Decryption(tx, mut job) => { + // cast to header (could fail) + let layout: Option<(LayoutVerified<&mut [u8], TransportHeader>, &mut [u8])> = + LayoutVerified::new_from_prefix(&mut job.msg[..]); + + let _ = tx.send(match layout { + Some((header, body)) => { + debug_assert_eq!(header.f_type.get(), TYPE_TRANSPORT); + if header.f_counter.get() >= REJECT_AFTER_MESSAGES { + None + } else { + // create a nonce object + let mut nonce = [0u8; 12]; + debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len()); + nonce[4..].copy_from_slice(header.f_counter.as_bytes()); + let nonce = Nonce::assume_unique_for_key(nonce); + + // do the weird ring AEAD dance + let key = LessSafeKey::new( + UnboundKey::new(&CHACHA20_POLY1305, &job.keypair.recv.key[..]) + .unwrap(), + ); + + // attempt to open (and authenticate) the body + match key.open_in_place(nonce, Aad::empty(), body) { + Ok(_) => Some(job), + Err(_) => None, + } + } + } + None => None, + }); } } - - // pass ownership to consumer - let okay = tx.send(buf); - debug!( - "parallel worker: passing ownership to sequential worker: {}", - okay.is_ok() - ); } } diff --git a/src/wireguard/tests.rs b/src/wireguard/tests.rs index 3ecb979..37dd571 100644 --- a/src/wireguard/tests.rs +++ b/src/wireguard/tests.rs @@ -77,7 +77,6 @@ fn wait() { } /* Create and configure two matching pure instances of WireGuard - * */ #[test] fn test_pure_wireguard() { @@ -166,8 +165,6 @@ fn test_pure_wireguard() { fake1.write(p); } - wait(); - while let Some(p) = backup.pop() { assert_eq!( hex::encode(fake2.read()), @@ -197,8 +194,6 @@ fn test_pure_wireguard() { fake2.write(p); } - wait(); - while let Some(p) = backup.pop() { assert_eq!( hex::encode(fake1.read()), -- cgit v1.2.3-59-g8ed1b