diff options
author | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-10-30 12:01:12 +0100 |
---|---|---|
committer | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-10-30 12:01:12 +0100 |
commit | afc96611a5d853a0bfaff46ea17695ab5332b13b (patch) | |
tree | f16fb932047d051ba1bc511657e5adb7fa67be03 | |
parent | Unified use of make_packet during tests (diff) | |
download | wireguard-rs-afc96611a5d853a0bfaff46ea17695ab5332b13b.tar.xz wireguard-rs-afc96611a5d853a0bfaff46ea17695ab5332b13b.zip |
Change router job to accommodate keep_key_fresh
-rw-r--r-- | src/wireguard/router/device.rs | 13 | ||||
-rw-r--r-- | src/wireguard/router/mod.rs | 3 | ||||
-rw-r--r-- | src/wireguard/router/peer.rs | 74 | ||||
-rw-r--r-- | src/wireguard/router/workers.rs | 184 | ||||
-rw-r--r-- | src/wireguard/tests.rs | 5 |
5 files changed, 139 insertions, 140 deletions
diff --git a/src/wireguard/router/device.rs b/src/wireguard/router/device.rs index 0818637..7c3b0a1 100644 --- a/src/wireguard/router/device.rs +++ b/src/wireguard/router/device.rs @@ -19,7 +19,7 @@ use super::constants::*; use super::messages::{TransportHeader, TYPE_TRANSPORT}; use super::peer::{new_peer, Peer, PeerInner}; use super::types::{Callbacks, RouterError}; -use super::workers::{worker_parallel, JobParallel, Operation}; +use super::workers::{worker_parallel, JobParallel}; use super::SIZE_MESSAGE_PREFIX; use super::route::get_route; @@ -44,10 +44,9 @@ pub struct DeviceInner<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Write } pub struct EncryptionState { - pub key: [u8; 32], // encryption key - pub id: u32, // receiver id - pub nonce: u64, // next available nonce - pub death: Instant, // (birth + reject-after-time - keepalive-timeout - rekey-timeout) + pub keypair: Arc<KeyPair>, // keypair + pub nonce: u64, // next available nonce + pub death: Instant, // (birth + reject-after-time - keepalive-timeout - rekey-timeout) } pub struct DecryptionState<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Writer<E>> { @@ -143,8 +142,6 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Writer<E>> Device<E, C, // schedule for encryption and transmission to peer if let Some(job) = peer.send_job(msg, true) { - debug_assert_eq!(job.1.op, Operation::Encryption); - // add job to worker queue let idx = self.state.queue_next.fetch_add(1, Ordering::SeqCst); let queues = self.state.queues.lock(); @@ -186,8 +183,6 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Writer<E>> Device<E, C, // schedule for decryption and TUN write if let Some(job) = dec.peer.recv_job(src, dec.clone(), msg) { - debug_assert_eq!(job.1.op, Operation::Decryption); - // add job to worker queue let idx = self.state.queue_next.fetch_add(1, Ordering::SeqCst); let queues = self.state.queues.lock(); diff --git a/src/wireguard/router/mod.rs b/src/wireguard/router/mod.rs index 7b317f2..f3565e2 100644 --- a/src/wireguard/router/mod.rs +++ b/src/wireguard/router/mod.rs @@ -14,6 +14,9 @@ mod tests; use messages::TransportHeader; use std::mem; +use super::constants::REJECT_AFTER_MESSAGES; +use super::constants::REKEY_AFTER_MESSAGES; + pub const SIZE_MESSAGE_PREFIX: usize = mem::size_of::<TransportHeader>(); pub const CAPACITY_MESSAGE_POSTFIX: usize = workers::SIZE_TAG; diff --git a/src/wireguard/router/peer.rs b/src/wireguard/router/peer.rs index 66a6e9f..50fdfe7 100644 --- a/src/wireguard/router/peer.rs +++ b/src/wireguard/router/peer.rs @@ -24,9 +24,8 @@ use super::messages::TransportHeader; use futures::*; -use super::workers::Operation; use super::workers::{worker_inbound, worker_outbound}; -use super::workers::{JobBuffer, JobInbound, JobOutbound, JobParallel}; +use super::workers::{JobDecryption, JobEncryption, JobInbound, JobOutbound, JobParallel}; use super::SIZE_MESSAGE_PREFIX; use super::constants::*; @@ -99,9 +98,8 @@ fn treebit_remove<E: Endpoint, A: Address, C: Callbacks, T: tun::Writer, B: bind impl EncryptionState { fn new(keypair: &Arc<KeyPair>) -> EncryptionState { EncryptionState { - id: keypair.send.id, - key: keypair.send.key, nonce: 0, + keypair: keypair.clone(), death: keypair.birth + REJECT_AFTER_TIME, } } @@ -294,22 +292,14 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Writer<E>> PeerInner<E, msg: Vec<u8>, ) -> Option<JobParallel> { let (tx, rx) = oneshot(); - let key = dec.keypair.recv.key; + let keypair = dec.keypair.clone(); match self.inbound.lock().try_send((dec, src, rx)) { - Ok(_) => Some(( - tx, - JobBuffer { - msg, - key: key, - okay: false, - op: Operation::Decryption, - }, - )), + Ok(_) => Some(JobParallel::Decryption(tx, JobDecryption { msg, keypair })), Err(_) => None, } } - pub fn send_job(&self, mut msg: Vec<u8>, stage: bool) -> Option<JobParallel> { + pub fn send_job(&self, msg: Vec<u8>, stage: bool) -> Option<JobParallel> { debug!("peer.send_job"); debug_assert!( msg.len() >= mem::size_of::<TransportHeader>(), @@ -317,29 +307,24 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Writer<E>> PeerInner<E, msg.len() ); - // parse / cast - let (header, _) = LayoutVerified::new_from_prefix(&mut msg[..]).unwrap(); - let mut header: LayoutVerified<&mut [u8], TransportHeader> = header; - // check if has key - let key = { - let mut ekey = self.ekey.lock(); - let key = match ekey.as_mut() { - None => None, - Some(mut state) => { - // avoid integer overflow in nonce - if state.nonce >= REJECT_AFTER_MESSAGES - 1 { - *ekey = None; - None - } else { - // there should be no stacked packets lingering around - debug!("encryption state available, nonce = {}", state.nonce); - - // set transport message fields - header.f_counter.set(state.nonce); - header.f_receiver.set(state.id); - state.nonce += 1; - Some(state.key) + let (keypair, counter) = { + let keypair = { + // TODO: consider using atomic ptr for ekey state + let mut ekey = self.ekey.lock(); + match ekey.as_mut() { + None => None, + Some(mut state) => { + // avoid integer overflow in nonce + if state.nonce >= REJECT_AFTER_MESSAGES - 1 { + *ekey = None; + None + } else { + debug!("encryption state available, nonce = {}", state.nonce); + let counter = state.nonce; + state.nonce += 1; + Some((state.keypair.clone(), counter)) + } } } }; @@ -347,25 +332,24 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Writer<E>> PeerInner<E, // If not suitable key was found: // 1. Stage packet for later transmission // 2. Request new key - if key.is_none() && stage { + if keypair.is_none() && stage { self.staged_packets.lock().push_back(msg); C::need_key(&self.opaque); return None; }; - key + keypair }?; - // add job to in-order queue and return sendeer to device for inclusion in worker pool + // add job to in-order queue and return sender to device for inclusion in worker pool let (tx, rx) = oneshot(); match self.outbound.lock().try_send(rx) { - Ok(_) => Some(( + Ok(_) => Some(JobParallel::Encryption( tx, - JobBuffer { + JobEncryption { msg, - key, - okay: false, - op: Operation::Encryption, + counter, + keypair, }, )), Err(_) => None, diff --git a/src/wireguard/router/workers.rs b/src/wireguard/router/workers.rs index 70334c1..3d85188 100644 --- a/src/wireguard/router/workers.rs +++ b/src/wireguard/router/workers.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use futures::sync::oneshot; use futures::*; -use log::debug; +use log::{debug, trace}; use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305}; @@ -16,34 +16,40 @@ use super::messages::{TransportHeader, TYPE_TRANSPORT}; use super::peer::PeerInner; use super::route::check_route; use super::types::Callbacks; +use super::REJECT_AFTER_MESSAGES; +use super::super::types::KeyPair; use super::super::{bind, tun, Endpoint}; pub const SIZE_TAG: usize = 16; -#[derive(PartialEq, Debug)] -pub enum Operation { - Encryption, - Decryption, +#[derive(Debug)] +pub struct JobEncryption { + pub msg: Vec<u8>, + pub keypair: Arc<KeyPair>, + pub counter: u64, } -pub struct JobBuffer { - pub msg: Vec<u8>, // message buffer (nonce and receiver id set) - pub key: [u8; 32], // chacha20poly1305 key - pub okay: bool, // state of the job - pub op: Operation, // should be buffer be encrypted / decrypted? +#[derive(Debug)] +pub struct JobDecryption { + pub msg: Vec<u8>, + pub keypair: Arc<KeyPair>, } -pub type JobParallel = (oneshot::Sender<JobBuffer>, JobBuffer); +#[derive(Debug)] +pub enum JobParallel { + Encryption(oneshot::Sender<JobEncryption>, JobEncryption), + Decryption(oneshot::Sender<Option<JobDecryption>>, JobDecryption), +} #[allow(type_alias_bounds)] pub type JobInbound<E, C, T, B: bind::Writer<E>> = ( Arc<DecryptionState<E, C, T, B>>, E, - oneshot::Receiver<JobBuffer>, + oneshot::Receiver<Option<JobDecryption>>, ); -pub type JobOutbound = oneshot::Receiver<JobBuffer>; +pub type JobOutbound = oneshot::Receiver<JobEncryption>; pub fn worker_inbound<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Writer<E>>( device: Arc<DeviceInner<E, C, T, B>>, // related device @@ -64,7 +70,7 @@ pub fn worker_inbound<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Writer let _ = rx .map(|buf| { debug!("inbound worker: job complete"); - if buf.okay { + if let Some(buf) = buf { // cast transport header let (header, packet): (LayoutVerified<&[u8], TransportHeader>, &[u8]) = match LayoutVerified::new_from_prefix(&buf.msg[..]) { @@ -134,6 +140,10 @@ pub fn worker_outbound<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Write peer: Arc<PeerInner<E, C, T, B>>, // related peer receiver: Receiver<JobOutbound>, ) { + fn keep_key_fresh(keypair: &KeyPair, counter: u64) -> bool { + false + } + loop { // fetch job let rx = match receiver.recv() { @@ -148,27 +158,30 @@ pub fn worker_outbound<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Write let _ = rx .map(|buf| { debug!("outbound worker: job complete"); - if buf.okay { - // write to UDP bind - let xmit = if let Some(dst) = peer.endpoint.lock().as_ref() { - let send: &Option<B> = &*device.outbound.read(); - if let Some(writer) = send.as_ref() { - match writer.write(&buf.msg[..], dst) { - Err(e) => { - debug!("failed to send outbound packet: {:?}", e); - false - } - Ok(_) => true, + // write to UDP bind + let xmit = if let Some(dst) = peer.endpoint.lock().as_ref() { + let send: &Option<B> = &*device.outbound.read(); + if let Some(writer) = send.as_ref() { + match writer.write(&buf.msg[..], dst) { + Err(e) => { + debug!("failed to send outbound packet: {:?}", e); + false } - } else { - false + Ok(_) => true, } } else { false - }; + } + } else { + false + }; - // trigger callback - C::send(&peer.opaque, buf.msg.len(), xmit); + // trigger callback + C::send(&peer.opaque, buf.msg.len(), xmit); + + // keep_key_fresh semantics + if keep_key_fresh(&buf.keypair, buf.counter) { + C::need_key(&peer.opaque); } }) .wait(); @@ -178,76 +191,85 @@ pub fn worker_outbound<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Write pub fn worker_parallel(receiver: Receiver<JobParallel>) { loop { // fetch next job - let (tx, mut buf) = match receiver.recv() { + let job = match receiver.recv() { Err(_) => { return; } Ok(val) => val, }; - debug!("parallel worker: obtained job"); - - // make space for tag (TODO: consider moving this out) - if buf.op == Operation::Encryption { - buf.msg.extend([0u8; SIZE_TAG].iter()); - } + trace!("parallel worker: obtained job"); - // cast and check size of packet - let (mut header, packet): (LayoutVerified<&mut [u8], TransportHeader>, &mut [u8]) = - match LayoutVerified::new_from_prefix(&mut buf.msg[..]) { - Some(v) => v, - None => { - debug_assert!( - false, - "parallel worker: failed to parse message (insufficient size)" - ); - continue; - } - }; - debug_assert!(packet.len() >= CHACHA20_POLY1305.tag_len()); + // handle job + match job { + JobParallel::Encryption(tx, mut job) => { + job.msg.extend([0u8; SIZE_TAG].iter()); - // do the weird ring AEAD dance - let key = LessSafeKey::new(UnboundKey::new(&CHACHA20_POLY1305, &buf.key[..]).unwrap()); + // cast to header (should never fail) + let (mut header, body): (LayoutVerified<&mut [u8], TransportHeader>, &mut [u8]) = + LayoutVerified::new_from_prefix(&mut job.msg[..]) + .expect("earlier code should ensure that there is ample space"); - // create a nonce object - let mut nonce = [0u8; 12]; - debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len()); - nonce[4..].copy_from_slice(header.f_counter.as_bytes()); - let nonce = Nonce::assume_unique_for_key(nonce); + // set header fields + header.f_type.set(TYPE_TRANSPORT); + header.f_receiver.set(job.keypair.send.id); + header.f_counter.set(job.counter); - match buf.op { - Operation::Encryption => { - debug!("parallel worker: process encryption"); + // create a nonce object + let mut nonce = [0u8; 12]; + debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len()); + nonce[4..].copy_from_slice(header.f_counter.as_bytes()); + let nonce = Nonce::assume_unique_for_key(nonce); - // set the type field - header.f_type.set(TYPE_TRANSPORT); + // do the weird ring AEAD dance + let key = LessSafeKey::new( + UnboundKey::new(&CHACHA20_POLY1305, &job.keypair.send.key[..]).unwrap(), + ); // encrypt content of transport message in-place - let end = packet.len() - SIZE_TAG; + let end = body.len() - SIZE_TAG; let tag = key - .seal_in_place_separate_tag(nonce, Aad::empty(), &mut packet[..end]) + .seal_in_place_separate_tag(nonce, Aad::empty(), &mut body[..end]) .unwrap(); // append tag - packet[end..].copy_from_slice(tag.as_ref()); + body[end..].copy_from_slice(tag.as_ref()); - buf.okay = true; + // pass ownership + let _ = tx.send(job); } - Operation::Decryption => { - debug!("parallel worker: process decryption"); - - // opening failure is signaled by fault state - buf.okay = match key.open_in_place(nonce, Aad::empty(), packet) { - Ok(_) => true, - Err(_) => false, - }; + JobParallel::Decryption(tx, mut job) => { + // cast to header (could fail) + let layout: Option<(LayoutVerified<&mut [u8], TransportHeader>, &mut [u8])> = + LayoutVerified::new_from_prefix(&mut job.msg[..]); + + let _ = tx.send(match layout { + Some((header, body)) => { + debug_assert_eq!(header.f_type.get(), TYPE_TRANSPORT); + if header.f_counter.get() >= REJECT_AFTER_MESSAGES { + None + } else { + // create a nonce object + let mut nonce = [0u8; 12]; + debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len()); + nonce[4..].copy_from_slice(header.f_counter.as_bytes()); + let nonce = Nonce::assume_unique_for_key(nonce); + + // do the weird ring AEAD dance + let key = LessSafeKey::new( + UnboundKey::new(&CHACHA20_POLY1305, &job.keypair.recv.key[..]) + .unwrap(), + ); + + // attempt to open (and authenticate) the body + match key.open_in_place(nonce, Aad::empty(), body) { + Ok(_) => Some(job), + Err(_) => None, + } + } + } + None => None, + }); } } - - // pass ownership to consumer - let okay = tx.send(buf); - debug!( - "parallel worker: passing ownership to sequential worker: {}", - okay.is_ok() - ); } } diff --git a/src/wireguard/tests.rs b/src/wireguard/tests.rs index 3ecb979..37dd571 100644 --- a/src/wireguard/tests.rs +++ b/src/wireguard/tests.rs @@ -77,7 +77,6 @@ fn wait() { } /* Create and configure two matching pure instances of WireGuard - * */ #[test] fn test_pure_wireguard() { @@ -166,8 +165,6 @@ fn test_pure_wireguard() { fake1.write(p); } - wait(); - while let Some(p) = backup.pop() { assert_eq!( hex::encode(fake2.read()), @@ -197,8 +194,6 @@ fn test_pure_wireguard() { fake2.write(p); } - wait(); - while let Some(p) = backup.pop() { assert_eq!( hex::encode(fake1.read()), |