summaryrefslogtreecommitdiffstats
path: root/src/wireguard/wireguard.rs
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2019-11-08 19:00:12 +0100
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2019-11-08 19:00:12 +0100
commitdd85201c15244fbd380eef8ee359a535335b7250 (patch)
tree5ccbb72702b7c5da8e6ab570bde192f66fff66bc /src/wireguard/wireguard.rs
parentImplement disable/enable timers (diff)
downloadwireguard-rs-dd85201c15244fbd380eef8ee359a535335b7250.tar.xz
wireguard-rs-dd85201c15244fbd380eef8ee359a535335b7250.zip
Removal of secret key in the handshake module
Diffstat (limited to '')
-rw-r--r--src/wireguard/wireguard.rs128
1 files changed, 65 insertions, 63 deletions
diff --git a/src/wireguard/wireguard.rs b/src/wireguard/wireguard.rs
index 6da428c..a890d5e 100644
--- a/src/wireguard/wireguard.rs
+++ b/src/wireguard/wireguard.rs
@@ -42,46 +42,56 @@ pub struct WireguardInner<T: Tun, B: Bind> {
mtu: T::MTU,
send: RwLock<Option<B::Writer>>,
- // identify and configuration map
+ // identity and configuration map
peers: RwLock<HashMap<[u8; 32], Peer<T, B>>>,
// cryptokey router
router: router::Device<B::Endpoint, Events<T, B>, T::Writer, B::Writer>,
// handshake related state
- handshake: RwLock<Handshake>,
+ handshake: RwLock<handshake::Device>,
under_load: AtomicBool,
pending: AtomicUsize, // num of pending handshake packets in queue
queue: Mutex<Sender<HandshakeJob<B::Endpoint>>>,
}
+impl<T: Tun, B: Bind> PeerInner<T, B> {
+ /* Queue a handshake request for the parallel workers
+ * (if one does not already exist)
+ *
+ * The function is ratelimited.
+ */
+ pub fn packet_send_handshake_initiation(&self) {
+ // the function is rate limited
+
+ {
+ let mut lhs = self.last_handshake_sent.lock();
+ if lhs.elapsed() < REKEY_TIMEOUT {
+ return;
+ }
+ *lhs = Instant::now();
+ }
+
+ // create a new handshake job for the peer
+
+ if !self.handshake_queued.swap(true, Ordering::SeqCst) {
+ self.wg.pending.fetch_add(1, Ordering::SeqCst);
+ self.queue.lock().send(HandshakeJob::New(self.pk)).unwrap();
+ }
+ }
+}
+
pub enum HandshakeJob<E> {
Message(Vec<u8>, E),
New(PublicKey),
}
-#[derive(Clone)]
-pub struct WireguardHandle<T: Tun, B: Bind> {
- inner: Arc<WireguardInner<T, B>>,
-}
-
impl<T: Tun, B: Bind> fmt::Display for WireguardInner<T, B> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "wireguard({:x})", self.id)
}
}
-struct Handshake {
- device: handshake::Device,
- active: bool,
-}
-
-impl<T: Tun, B: Bind> Deref for WireguardHandle<T, B> {
- type Target = Arc<WireguardInner<T, B>>;
- fn deref(&self) -> &Self::Target {
- &self.inner
- }
-}
impl<T: Tun, B: Bind> Deref for Wireguard<T, B> {
type Target = Arc<WireguardInner<T, B>>;
fn deref(&self) -> &Self::Target {
@@ -91,7 +101,7 @@ impl<T: Tun, B: Bind> Deref for Wireguard<T, B> {
pub struct Wireguard<T: Tun, B: Bind> {
runner: Runner,
- state: WireguardHandle<T, B>,
+ state: Arc<WireguardInner<T, B>>,
}
/* Returns the padded length of a message:
@@ -181,31 +191,18 @@ impl<T: Tun, B: Bind> Wireguard<T, B> {
}
pub fn set_key(&self, sk: Option<StaticSecret>) {
- let mut handshake = self.state.handshake.write();
- match sk {
- None => {
- let mut rng = OsRng::new().unwrap();
- handshake.device.set_sk(StaticSecret::new(&mut rng));
- handshake.active = false;
- }
- Some(sk) => {
- handshake.device.set_sk(sk);
- handshake.active = true;
- }
- }
+ self.handshake.write().set_sk(sk);
}
pub fn get_sk(&self) -> Option<StaticSecret> {
- let handshake = self.state.handshake.read();
- if handshake.active {
- Some(handshake.device.get_sk())
- } else {
- None
- }
+ self.handshake
+ .read()
+ .get_sk()
+ .map(|sk| StaticSecret::from(sk.to_bytes()))
}
pub fn set_psk(&self, pk: PublicKey, psk: Option<[u8; 32]>) -> bool {
- self.state.handshake.write().device.set_psk(pk, psk).is_ok()
+ self.state.handshake.write().set_psk(pk, psk).is_ok()
}
pub fn add_peer(&self, pk: PublicKey) {
@@ -217,6 +214,7 @@ impl<T: Tun, B: Bind> Wireguard<T, B> {
let state = Arc::new(PeerInner {
id: rng.gen(),
pk,
+ wg: self.state.clone(),
walltime_last_handshake: Mutex::new(SystemTime::UNIX_EPOCH),
last_handshake_sent: Mutex::new(self.state.start - TIME_HORIZON),
handshake_queued: AtomicBool::new(false),
@@ -245,14 +243,14 @@ impl<T: Tun, B: Bind> Wireguard<T, B> {
peers.entry(*pk.as_bytes()).or_insert(peer);
// add to the handshake device
- self.state.handshake.write().device.add(pk).unwrap(); // TODO: handle adding of public key for interface
+ self.state.handshake.write().add(pk).unwrap(); // TODO: handle adding of public key for interface
}
- /* Begin consuming messages from the reader.
- *
- * Any previous reader thread is stopped by closing the previous reader,
- * which unblocks the thread and causes an error on reader.read
- */
+ /// Begin consuming messages from the reader.
+ /// Multiple readers can be added to support multi-queue and individual Ipv6/Ipv4 sockets interfaces
+ ///
+ /// Any previous reader thread is stopped by closing the previous reader,
+ /// which unblocks the thread and causes an error on reader.read
pub fn add_reader(&self, reader: B::Reader) {
let wg = self.state.clone();
thread::spawn(move || {
@@ -285,6 +283,7 @@ impl<T: Tun, B: Bind> Wireguard<T, B> {
| handshake::TYPE_RESPONSE => {
debug!("{} : reader, received handshake message", wg);
+ // add one to pending
let pending = wg.pending.fetch_add(1, Ordering::SeqCst);
// update under_load flag
@@ -297,6 +296,7 @@ impl<T: Tun, B: Bind> Wireguard<T, B> {
wg.under_load.store(false, Ordering::SeqCst);
}
+ // add to handshake queue
wg.queue
.lock()
.send(HandshakeJob::Message(msg, src))
@@ -325,7 +325,10 @@ impl<T: Tun, B: Bind> Wireguard<T, B> {
pub fn new(mut readers: Vec<T::Reader>, writer: T::Writer, mtu: T::MTU) -> Wireguard<T, B> {
// create device state
let mut rng = OsRng::new().unwrap();
+
+ // handshake queue
let (tx, rx): (Sender<HandshakeJob<B::Endpoint>>, _) = bounded(SIZE_HANDSHAKE_QUEUE);
+
let wg = Arc::new(WireguardInner {
start: Instant::now(),
id: rng.gen(),
@@ -334,10 +337,7 @@ impl<T: Tun, B: Bind> Wireguard<T, B> {
send: RwLock::new(None),
router: router::Device::new(num_cpus::get(), writer), // router owns the writing half
pending: AtomicUsize::new(0),
- handshake: RwLock::new(Handshake {
- device: handshake::Device::new(StaticSecret::new(&mut rng)),
- active: false,
- }),
+ handshake: RwLock::new(handshake::Device::new()),
under_load: AtomicBool::new(false),
queue: Mutex::new(tx),
});
@@ -350,24 +350,22 @@ impl<T: Tun, B: Bind> Wireguard<T, B> {
debug!("{} : handshake worker, started", wg);
// prepare OsRng instance for this thread
- let mut rng = OsRng::new().unwrap();
+ let mut rng = OsRng::new().expect("Unable to obtain a CSPRNG");
// process elements from the handshake queue
for job in rx {
- let state = wg.handshake.read();
- if !state.active {
- continue;
- }
+ // decrement pending
+ wg.pending.fetch_sub(1, Ordering::SeqCst);
+
+ let device = wg.handshake.read();
match job {
HandshakeJob::Message(msg, src) => {
- wg.pending.fetch_sub(1, Ordering::SeqCst);
-
// feed message to handshake device
let src_validate = (&src).into_address(); // TODO avoid
// process message
- match state.device.process(
+ match device.process(
&mut rng,
&msg[..],
if wg.under_load.load(Ordering::Relaxed) {
@@ -428,7 +426,7 @@ impl<T: Tun, B: Bind> Wireguard<T, B> {
// free any unused ids
for id in peer.router.add_keypair(kp) {
- state.device.release(id);
+ device.release(id);
}
});
}
@@ -438,15 +436,19 @@ impl<T: Tun, B: Bind> Wireguard<T, B> {
}
}
HandshakeJob::New(pk) => {
- debug!("{} : handshake worker, new handshake requested", wg);
- let _ = state.device.begin(&mut rng, &pk).map(|msg| {
- if let Some(peer) = wg.peers.read().get(pk.as_bytes()) {
+ if let Some(peer) = wg.peers.read().get(pk.as_bytes()) {
+ debug!(
+ "{} : handshake worker, new handshake requested for {}",
+ wg, peer
+ );
+ let _ = device.begin(&mut rng, &peer.pk).map(|msg| {
let _ = peer.router.send(&msg[..]).map_err(|e| {
debug!("{} : handshake worker, failed to send handshake initiation, error = {}", wg, e)
});
peer.state.sent_handshake_initiation();
- }
- });
+ });
+ peer.handshake_queued.store(false, Ordering::SeqCst);
+ }
}
}
}
@@ -498,7 +500,7 @@ impl<T: Tun, B: Bind> Wireguard<T, B> {
}
Wireguard {
- state: WireguardHandle { inner: wg },
+ state: wg,
runner: Runner::new(TIMERS_TICK, TIMERS_SLOTS, TIMERS_CAPACITY),
}
}