From dcd567c08f126b09548a98df0468ef1fe86d9f0a Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Sat, 1 Feb 2020 14:39:19 +0100 Subject: Squashed commit of the following: commit 1e26a0bef44e65023a97a16ecf3b123e688d19f7 Author: Mathias Hall-Andersen Date: Sat Feb 1 14:36:50 2020 +0100 Initial version of sticky sockets for Linux commit 605cc656ad235d09ba6cd12d03dee2c5e0a9a80a Author: Mathias Hall-Andersen Date: Thu Jan 30 14:57:00 2020 +0100 Clear src when sendmsg fails with EINVAL commit dffd2b228af70f681e2a161642bbdaa348419bf3 Author: Mathias Hall-Andersen Date: Sun Jan 26 14:01:28 2020 +0100 Fix typoes commit 2015663706fbe15ed1ac443a31de86b3e6c643c7 Author: Mathias Hall-Andersen Date: Sun Jan 26 13:51:59 2020 +0100 Restructure of public key -> peer state Restructured the mapping of public keys to peer state in the project. The handshake device is now generic over an opaque type, which enables it to be the sole place where public keys are mapped to the peer states. This gets rid of the "peer" map in the WireGuard devices and avoids having to include the public key in the handshake peer state. commit bbcfaad4bcc5cf16bacdef0cefe7d29ba1519a23 Author: Mathias Hall-Andersen Date: Fri Jan 10 21:10:27 2020 +0100 Fixed bind6 also binding on IPv4 commit acbca236b70598c20c24de474690bcad883241d4 Author: Mathias Hall-Andersen Date: Thu Jan 9 11:24:13 2020 +0100 Work on sticky sockets --- src/wireguard/workers.rs | 80 +++++++++++++++++++++++------------------------- 1 file changed, 38 insertions(+), 42 deletions(-) (limited to 'src/wireguard/workers.rs') diff --git a/src/wireguard/workers.rs b/src/wireguard/workers.rs index e1d3899..c1a2af7 100644 --- a/src/wireguard/workers.rs +++ b/src/wireguard/workers.rs @@ -152,9 +152,6 @@ pub fn handshake_worker( ) { debug!("{} : handshake worker, started", wg); - // prepare OsRng instance for this thread - let mut rng = OsRng::new().expect("Unable to obtain a CSPRNG"); - // process elements from the handshake queue for job in rx { // check if under load @@ -181,11 +178,11 @@ pub fn handshake_worker( // de-multiplex staged handshake jobs and handshake messages match job { - HandshakeJob::Message(msg, src) => { + HandshakeJob::Message(msg, mut src) => { // process message - let device = wg.handshake.read(); + let device = wg.peers.read(); match device.process( - &mut rng, + &mut OsRng, &msg[..], if under_load { Some(src.into_address()) @@ -193,7 +190,7 @@ pub fn handshake_worker( None }, ) { - Ok((pk, resp, keypair)) => { + Ok((peer, resp, keypair)) => { // send response (might be cookie reply or handshake response) let mut resp_len: u64 = 0; if let Some(msg) = resp { @@ -204,7 +201,7 @@ pub fn handshake_worker( "{} : handshake worker, send response ({} bytes)", wg, resp_len ); - let _ = writer.write(&msg[..], &src).map_err(|e| { + let _ = writer.write(&msg[..], &mut src).map_err(|e| { debug!( "{} : handshake worker, failed to send response, error = {}", wg, @@ -215,56 +212,55 @@ pub fn handshake_worker( } // update peer state - if let Some(pk) = pk { + if let Some(peer) = peer { // authenticated handshake packet received - if let Some(peer) = wg.peers.read().get(pk.as_bytes()) { - // add to rx_bytes and tx_bytes - let req_len = msg.len() as u64; - peer.rx_bytes.fetch_add(req_len, Ordering::Relaxed); - peer.tx_bytes.fetch_add(resp_len, Ordering::Relaxed); - // update endpoint - peer.router.set_endpoint(src); + // add to rx_bytes and tx_bytes + let req_len = msg.len() as u64; + peer.rx_bytes.fetch_add(req_len, Ordering::Relaxed); + peer.tx_bytes.fetch_add(resp_len, Ordering::Relaxed); - if resp_len > 0 { - // update timers after sending handshake response - debug!("{} : handshake worker, handshake response sent", wg); - peer.state.sent_handshake_response(); - } else { - // update timers after receiving handshake response - debug!( - "{} : handshake worker, handshake response was received", - wg - ); - peer.state.timers_handshake_complete(); - } + // update endpoint + peer.router.set_endpoint(src); + + if resp_len > 0 { + // update timers after sending handshake response + debug!("{} : handshake worker, handshake response sent", wg); + peer.state.sent_handshake_response(); + } else { + // update timers after receiving handshake response + debug!( + "{} : handshake worker, handshake response was received", + wg + ); + peer.state.timers_handshake_complete(); + } - // add any new keypair to peer - keypair.map(|kp| { - debug!("{} : handshake worker, new keypair for {}", wg, peer); + // add any new keypair to peer + keypair.map(|kp| { + debug!("{} : handshake worker, new keypair for {}", wg, peer); - // this means that a handshake response was processed or sent - peer.timers_session_derived(); + // this means that a handshake response was processed or sent + peer.timers_session_derived(); - // free any unused ids - for id in peer.router.add_keypair(kp) { - device.release(id); - } - }); - } + // free any unused ids + for id in peer.router.add_keypair(kp) { + device.release(id); + } + }); } } Err(e) => debug!("{} : handshake worker, error = {:?}", wg, e), } } HandshakeJob::New(pk) => { - if let Some(peer) = wg.peers.read().get(pk.as_bytes()) { + if let Some(peer) = wg.peers.read().get(&pk) { debug!( "{} : handshake worker, new handshake requested for {}", wg, peer ); - let device = wg.handshake.read(); - let _ = device.begin(&mut rng, &peer.pk).map(|msg| { + let device = wg.peers.read(); + let _ = device.begin(&mut OsRng, &peer.pk).map(|msg| { let _ = peer.router.send(&msg[..]).map_err(|e| { debug!("{} : handshake worker, failed to send handshake initiation, error = {}", wg, e) }); -- cgit v1.2.3-59-g8ed1b