1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
use super::router;
use super::timers::{Events, Timers};
use super::tun::Tun;
use super::udp::UDP;
use super::constants::REKEY_TIMEOUT;
use super::wireguard::WireGuard;
use super::workers::HandshakeJob;
use std::fmt;
use std::ops::Deref;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Instant, SystemTime};
use spin::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
use x25519_dalek::PublicKey;
pub struct PeerInner<T: Tun, B: UDP> {
// internal id (for logging)
pub id: u64,
// wireguard device state
pub wg: WireGuard<T, B>,
// handshake state
pub walltime_last_handshake: Mutex<Option<SystemTime>>, // walltime for last handshake (for UAPI status)
pub last_handshake_sent: Mutex<Instant>, // instant for last handshake
pub handshake_queued: AtomicBool, // is a handshake job currently queued for the peer?
// stats and configuration
pub pk: PublicKey, // public key
pub rx_bytes: AtomicU64, // received bytes
pub tx_bytes: AtomicU64, // transmitted bytes
// timer model
pub timers: RwLock<Timers>,
}
pub struct Peer<T: Tun, B: UDP> {
pub router: Arc<router::PeerHandle<B::Endpoint, Events<T, B>, T::Writer, B::Writer>>,
pub state: Arc<PeerInner<T, B>>,
}
impl<T: Tun, B: UDP> Clone for Peer<T, B> {
fn clone(&self) -> Peer<T, B> {
Peer {
router: self.router.clone(),
state: self.state.clone(),
}
}
}
impl<T: Tun, B: UDP> PeerInner<T, B> {
/* Queue a handshake request for the parallel workers
* (if one does not already exist)
*
* The function is ratelimited.
*/
pub fn packet_send_handshake_initiation(&self) {
log::trace!("{} : packet_send_handshake_initiation", self);
// the function is rate limited
{
let mut lhs = self.last_handshake_sent.lock();
if lhs.elapsed() < REKEY_TIMEOUT {
log::trace!("{} : packet_send_handshake_initiation, rate-limited!", self);
return;
}
*lhs = Instant::now();
}
// create a new handshake job for the peer
if !self.handshake_queued.swap(true, Ordering::SeqCst) {
self.wg.pending.fetch_add(1, Ordering::SeqCst);
self.wg.queue.send(HandshakeJob::New(self.pk));
log::trace!(
"{} : packet_send_handshake_initiation, handshake queued",
self
);
} else {
log::trace!(
"{} : packet_send_handshake_initiation, handshake already queued",
self
);
}
}
#[inline(always)]
pub fn timers(&self) -> RwLockReadGuard<Timers> {
self.timers.read()
}
#[inline(always)]
pub fn timers_mut(&self) -> RwLockWriteGuard<Timers> {
self.timers.write()
}
}
impl<T: Tun, B: UDP> fmt::Display for PeerInner<T, B> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "peer(id = {})", self.id)
}
}
impl<T: Tun, B: UDP> fmt::Display for Peer<T, B> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "peer(id = {})", self.id)
}
}
impl<T: Tun, B: UDP> Deref for Peer<T, B> {
type Target = PeerInner<T, B>;
fn deref(&self) -> &Self::Target {
&self.state
}
}
impl<T: Tun, B: UDP> Peer<T, B> {
/// Bring the peer down. Causing:
///
/// - Timers to be stopped and disabled.
/// - All keystate to be zeroed
pub fn down(&self) {
self.stop_timers();
self.router.down();
}
/// Bring the peer up.
pub fn up(&self) {
self.router.up();
self.start_timers();
}
}
|