diff options
author | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-11-08 19:00:12 +0100 |
---|---|---|
committer | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2019-11-08 19:00:12 +0100 |
commit | dd85201c15244fbd380eef8ee359a535335b7250 (patch) | |
tree | 5ccbb72702b7c5da8e6ab570bde192f66fff66bc /src/wireguard/wireguard.rs | |
parent | Implement disable/enable timers (diff) | |
download | wireguard-rs-dd85201c15244fbd380eef8ee359a535335b7250.tar.xz wireguard-rs-dd85201c15244fbd380eef8ee359a535335b7250.zip |
Removal of secret key in the handshake module
Diffstat (limited to '')
-rw-r--r-- | src/wireguard/wireguard.rs | 128 |
1 files changed, 65 insertions, 63 deletions
diff --git a/src/wireguard/wireguard.rs b/src/wireguard/wireguard.rs index 6da428c..a890d5e 100644 --- a/src/wireguard/wireguard.rs +++ b/src/wireguard/wireguard.rs @@ -42,46 +42,56 @@ pub struct WireguardInner<T: Tun, B: Bind> { mtu: T::MTU, send: RwLock<Option<B::Writer>>, - // identify and configuration map + // identity and configuration map peers: RwLock<HashMap<[u8; 32], Peer<T, B>>>, // cryptokey router router: router::Device<B::Endpoint, Events<T, B>, T::Writer, B::Writer>, // handshake related state - handshake: RwLock<Handshake>, + handshake: RwLock<handshake::Device>, under_load: AtomicBool, pending: AtomicUsize, // num of pending handshake packets in queue queue: Mutex<Sender<HandshakeJob<B::Endpoint>>>, } +impl<T: Tun, B: Bind> PeerInner<T, B> { + /* Queue a handshake request for the parallel workers + * (if one does not already exist) + * + * The function is ratelimited. + */ + pub fn packet_send_handshake_initiation(&self) { + // the function is rate limited + + { + let mut lhs = self.last_handshake_sent.lock(); + if lhs.elapsed() < REKEY_TIMEOUT { + return; + } + *lhs = Instant::now(); + } + + // create a new handshake job for the peer + + if !self.handshake_queued.swap(true, Ordering::SeqCst) { + self.wg.pending.fetch_add(1, Ordering::SeqCst); + self.queue.lock().send(HandshakeJob::New(self.pk)).unwrap(); + } + } +} + pub enum HandshakeJob<E> { Message(Vec<u8>, E), New(PublicKey), } -#[derive(Clone)] -pub struct WireguardHandle<T: Tun, B: Bind> { - inner: Arc<WireguardInner<T, B>>, -} - impl<T: Tun, B: Bind> fmt::Display for WireguardInner<T, B> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "wireguard({:x})", self.id) } } -struct Handshake { - device: handshake::Device, - active: bool, -} - -impl<T: Tun, B: Bind> Deref for WireguardHandle<T, B> { - type Target = Arc<WireguardInner<T, B>>; - fn deref(&self) -> &Self::Target { - &self.inner - } -} impl<T: Tun, B: Bind> Deref for Wireguard<T, B> { type Target = Arc<WireguardInner<T, B>>; fn deref(&self) -> &Self::Target { @@ -91,7 +101,7 @@ impl<T: Tun, B: Bind> Deref for Wireguard<T, B> { pub struct Wireguard<T: Tun, B: Bind> { runner: Runner, - state: WireguardHandle<T, B>, + state: Arc<WireguardInner<T, B>>, } /* Returns the padded length of a message: @@ -181,31 +191,18 @@ impl<T: Tun, B: Bind> Wireguard<T, B> { } pub fn set_key(&self, sk: Option<StaticSecret>) { - let mut handshake = self.state.handshake.write(); - match sk { - None => { - let mut rng = OsRng::new().unwrap(); - handshake.device.set_sk(StaticSecret::new(&mut rng)); - handshake.active = false; - } - Some(sk) => { - handshake.device.set_sk(sk); - handshake.active = true; - } - } + self.handshake.write().set_sk(sk); } pub fn get_sk(&self) -> Option<StaticSecret> { - let handshake = self.state.handshake.read(); - if handshake.active { - Some(handshake.device.get_sk()) - } else { - None - } + self.handshake + .read() + .get_sk() + .map(|sk| StaticSecret::from(sk.to_bytes())) } pub fn set_psk(&self, pk: PublicKey, psk: Option<[u8; 32]>) -> bool { - self.state.handshake.write().device.set_psk(pk, psk).is_ok() + self.state.handshake.write().set_psk(pk, psk).is_ok() } pub fn add_peer(&self, pk: PublicKey) { @@ -217,6 +214,7 @@ impl<T: Tun, B: Bind> Wireguard<T, B> { let state = Arc::new(PeerInner { id: rng.gen(), pk, + wg: self.state.clone(), walltime_last_handshake: Mutex::new(SystemTime::UNIX_EPOCH), last_handshake_sent: Mutex::new(self.state.start - TIME_HORIZON), handshake_queued: AtomicBool::new(false), @@ -245,14 +243,14 @@ impl<T: Tun, B: Bind> Wireguard<T, B> { peers.entry(*pk.as_bytes()).or_insert(peer); // add to the handshake device - self.state.handshake.write().device.add(pk).unwrap(); // TODO: handle adding of public key for interface + self.state.handshake.write().add(pk).unwrap(); // TODO: handle adding of public key for interface } - /* Begin consuming messages from the reader. - * - * Any previous reader thread is stopped by closing the previous reader, - * which unblocks the thread and causes an error on reader.read - */ + /// Begin consuming messages from the reader. + /// Multiple readers can be added to support multi-queue and individual Ipv6/Ipv4 sockets interfaces + /// + /// Any previous reader thread is stopped by closing the previous reader, + /// which unblocks the thread and causes an error on reader.read pub fn add_reader(&self, reader: B::Reader) { let wg = self.state.clone(); thread::spawn(move || { @@ -285,6 +283,7 @@ impl<T: Tun, B: Bind> Wireguard<T, B> { | handshake::TYPE_RESPONSE => { debug!("{} : reader, received handshake message", wg); + // add one to pending let pending = wg.pending.fetch_add(1, Ordering::SeqCst); // update under_load flag @@ -297,6 +296,7 @@ impl<T: Tun, B: Bind> Wireguard<T, B> { wg.under_load.store(false, Ordering::SeqCst); } + // add to handshake queue wg.queue .lock() .send(HandshakeJob::Message(msg, src)) @@ -325,7 +325,10 @@ impl<T: Tun, B: Bind> Wireguard<T, B> { pub fn new(mut readers: Vec<T::Reader>, writer: T::Writer, mtu: T::MTU) -> Wireguard<T, B> { // create device state let mut rng = OsRng::new().unwrap(); + + // handshake queue let (tx, rx): (Sender<HandshakeJob<B::Endpoint>>, _) = bounded(SIZE_HANDSHAKE_QUEUE); + let wg = Arc::new(WireguardInner { start: Instant::now(), id: rng.gen(), @@ -334,10 +337,7 @@ impl<T: Tun, B: Bind> Wireguard<T, B> { send: RwLock::new(None), router: router::Device::new(num_cpus::get(), writer), // router owns the writing half pending: AtomicUsize::new(0), - handshake: RwLock::new(Handshake { - device: handshake::Device::new(StaticSecret::new(&mut rng)), - active: false, - }), + handshake: RwLock::new(handshake::Device::new()), under_load: AtomicBool::new(false), queue: Mutex::new(tx), }); @@ -350,24 +350,22 @@ impl<T: Tun, B: Bind> Wireguard<T, B> { debug!("{} : handshake worker, started", wg); // prepare OsRng instance for this thread - let mut rng = OsRng::new().unwrap(); + let mut rng = OsRng::new().expect("Unable to obtain a CSPRNG"); // process elements from the handshake queue for job in rx { - let state = wg.handshake.read(); - if !state.active { - continue; - } + // decrement pending + wg.pending.fetch_sub(1, Ordering::SeqCst); + + let device = wg.handshake.read(); match job { HandshakeJob::Message(msg, src) => { - wg.pending.fetch_sub(1, Ordering::SeqCst); - // feed message to handshake device let src_validate = (&src).into_address(); // TODO avoid // process message - match state.device.process( + match device.process( &mut rng, &msg[..], if wg.under_load.load(Ordering::Relaxed) { @@ -428,7 +426,7 @@ impl<T: Tun, B: Bind> Wireguard<T, B> { // free any unused ids for id in peer.router.add_keypair(kp) { - state.device.release(id); + device.release(id); } }); } @@ -438,15 +436,19 @@ impl<T: Tun, B: Bind> Wireguard<T, B> { } } HandshakeJob::New(pk) => { - debug!("{} : handshake worker, new handshake requested", wg); - let _ = state.device.begin(&mut rng, &pk).map(|msg| { - if let Some(peer) = wg.peers.read().get(pk.as_bytes()) { + if let Some(peer) = wg.peers.read().get(pk.as_bytes()) { + debug!( + "{} : handshake worker, new handshake requested for {}", + wg, peer + ); + let _ = device.begin(&mut rng, &peer.pk).map(|msg| { let _ = peer.router.send(&msg[..]).map_err(|e| { debug!("{} : handshake worker, failed to send handshake initiation, error = {}", wg, e) }); peer.state.sent_handshake_initiation(); - } - }); + }); + peer.handshake_queued.store(false, Ordering::SeqCst); + } } } } @@ -498,7 +500,7 @@ impl<T: Tun, B: Bind> Wireguard<T, B> { } Wireguard { - state: WireguardHandle { inner: wg }, + state: wg, runner: Runner::new(TIMERS_TICK, TIMERS_SLOTS, TIMERS_CAPACITY), } } |