aboutsummaryrefslogtreecommitdiffstats
path: root/src/wireguard/wireguard.rs
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2019-10-28 14:48:24 +0100
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2019-10-28 14:48:24 +0100
commit4ff328b7da876fb3305fefd83865553af9c8ab2c (patch)
treebd1680d9f7316415e3044fd152ef503729d97239 /src/wireguard/wireguard.rs
parentFixed Ordering::Acquire -> Ordering::SeqCst typo (diff)
downloadwireguard-rs-4ff328b7da876fb3305fefd83865553af9c8ab2c.tar.xz
wireguard-rs-4ff328b7da876fb3305fefd83865553af9c8ab2c.zip
First full test of pure WireGuard
Diffstat (limited to 'src/wireguard/wireguard.rs')
-rw-r--r--src/wireguard/wireguard.rs47
1 files changed, 41 insertions, 6 deletions
diff --git a/src/wireguard/wireguard.rs b/src/wireguard/wireguard.rs
index 25544d9..233559e 100644
--- a/src/wireguard/wireguard.rs
+++ b/src/wireguard/wireguard.rs
@@ -21,6 +21,7 @@ use std::collections::HashMap;
use log::debug;
use rand::rngs::OsRng;
+use rand::Rng;
use spin::{Mutex, RwLock, RwLockReadGuard};
use byteorder::{ByteOrder, LittleEndian};
@@ -37,6 +38,8 @@ pub struct Peer<T: Tun, B: Bind> {
}
pub struct PeerInner<B: Bind> {
+ pub id: u64,
+
pub keepalive: AtomicUsize, // keepalive interval
pub rx_bytes: AtomicU64,
pub tx_bytes: AtomicU64,
@@ -50,6 +53,9 @@ pub struct PeerInner<B: Bind> {
}
pub struct WireguardInner<T: Tun, B: Bind> {
+ // identifier (for logging)
+ id: u32,
+
// provides access to the MTU value of the tun device
// (otherwise owned solely by the router and a dedicated read IO thread)
mtu: T::MTU,
@@ -96,7 +102,13 @@ impl<B: Bind> PeerInner<B> {
impl<T: Tun, B: Bind> fmt::Display for Peer<T, B> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(f, "peer()")
+ write!(f, "peer(id = {})", self.id)
+ }
+}
+
+impl<T: Tun, B: Bind> fmt::Display for WireguardInner<T, B> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "wireguard({:x})", self.id)
}
}
@@ -209,7 +221,9 @@ impl<T: Tun, B: Bind> Wireguard<T, B> {
}
pub fn new_peer(&self, pk: PublicKey) {
+ let mut rng = OsRng::new().unwrap();
let state = Arc::new(PeerInner {
+ id: rng.gen(),
pk,
last_handshake: Mutex::new(SystemTime::UNIX_EPOCH),
handshake_queued: AtomicBool::new(false),
@@ -277,11 +291,17 @@ impl<T: Tun, B: Bind> Wireguard<T, B> {
handshake::TYPE_COOKIE_REPLY
| handshake::TYPE_INITIATION
| handshake::TYPE_RESPONSE => {
+ debug!("{} : reader, received handshake message", wg);
+
+ let pending = wg.pending.fetch_add(1, Ordering::SeqCst);
+
// update under_load flag
- if wg.pending.fetch_add(1, Ordering::SeqCst) > THRESHOLD_UNDER_LOAD {
+ if pending > THRESHOLD_UNDER_LOAD {
+ debug!("{} : reader, set under load (pending = {})", wg, pending);
last_under_load = Instant::now();
wg.under_load.store(true, Ordering::SeqCst);
} else if last_under_load.elapsed() > DURATION_UNDER_LOAD {
+ debug!("{} : reader, clear under load", wg);
wg.under_load.store(false, Ordering::SeqCst);
}
@@ -291,6 +311,8 @@ impl<T: Tun, B: Bind> Wireguard<T, B> {
.unwrap();
}
router::TYPE_TRANSPORT => {
+ debug!("{} : reader, received transport message", wg);
+
// transport message
let _ = wg.router.recv(src, msg).map_err(|e| {
debug!("Failed to handle incoming transport message: {}", e);
@@ -313,6 +335,7 @@ impl<T: Tun, B: Bind> Wireguard<T, B> {
let mut rng = OsRng::new().unwrap();
let (tx, rx): (Sender<HandshakeJob<B::Endpoint>>, _) = bounded(SIZE_HANDSHAKE_QUEUE);
let wg = Arc::new(WireguardInner {
+ id: rng.gen(),
mtu: mtu.clone(),
peers: RwLock::new(HashMap::new()),
send: RwLock::new(None),
@@ -331,12 +354,13 @@ impl<T: Tun, B: Bind> Wireguard<T, B> {
let wg = wg.clone();
let rx = rx.clone();
thread::spawn(move || {
+ debug!("{} : handshake worker, started", wg);
+
// prepare OsRng instance for this thread
let mut rng = OsRng::new().unwrap();
// process elements from the handshake queue
for job in rx {
- wg.pending.fetch_sub(1, Ordering::SeqCst);
let state = wg.handshake.read();
if !state.active {
continue;
@@ -344,6 +368,8 @@ impl<T: Tun, B: Bind> Wireguard<T, B> {
match job {
HandshakeJob::Message(msg, src) => {
+ wg.pending.fetch_sub(1, Ordering::SeqCst);
+
// feed message to handshake device
let src_validate = (&src).into_address(); // TODO avoid
@@ -352,6 +378,7 @@ impl<T: Tun, B: Bind> Wireguard<T, B> {
&mut rng,
&msg[..],
if wg.under_load.load(Ordering::Relaxed) {
+ debug!("{} : handshake worker, under load", wg);
Some(&src_validate)
} else {
None
@@ -364,9 +391,14 @@ impl<T: Tun, B: Bind> Wireguard<T, B> {
resp_len = msg.len() as u64;
let send: &Option<B::Writer> = &*wg.send.read();
if let Some(writer) = send.as_ref() {
+ debug!(
+ "{} : handshake worker, send response ({} bytes)",
+ wg, resp_len
+ );
let _ = writer.write(&msg[..], &src).map_err(|e| {
debug!(
- "handshake worker, failed to send response, error = {}",
+ "{} : handshake worker, failed to send response, error = {}",
+ wg,
e
)
});
@@ -387,11 +419,13 @@ impl<T: Tun, B: Bind> Wireguard<T, B> {
// update timers after sending handshake response
if resp_len > 0 {
+ debug!("{} : handshake worker, handshake response sent", wg);
peer.state.sent_handshake_response();
}
// add resulting keypair to peer
keypair.map(|kp| {
+ debug!("{} : handshake worker, new keypair", wg);
// free any unused ids
for id in peer.router.add_keypair(kp) {
state.device.release(id);
@@ -400,14 +434,15 @@ impl<T: Tun, B: Bind> Wireguard<T, B> {
}
}
}
- Err(e) => debug!("handshake worker, error = {:?}", e),
+ Err(e) => debug!("{} : handshake worker, error = {:?}", wg, e),
}
}
HandshakeJob::New(pk) => {
+ debug!("{} : handshake worker, new handshake requested", wg);
let _ = state.device.begin(&mut rng, &pk).map(|msg| {
if let Some(peer) = wg.peers.read().get(pk.as_bytes()) {
let _ = peer.router.send(&msg[..]).map_err(|e| {
- debug!("handshake worker, failed to send handshake initiation, error = {}", e)
+ debug!("{} : handshake worker, failed to send handshake initiation, error = {}", wg, e)
});
peer.state.sent_handshake_initiation();
}