aboutsummaryrefslogtreecommitdiffstats
path: root/src/router/peer.rs
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2019-09-10 21:42:21 +0200
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2019-09-10 21:42:21 +0200
commit02d9bdcc96c955b654a45d3629b1ee515902078d (patch)
treed0231989ccca424d26f0dcded640acac079aa9de /src/router/peer.rs
parentBegin work on full router interaction unittest (diff)
downloadwireguard-rs-02d9bdcc96c955b654a45d3629b1ee515902078d.tar.xz
wireguard-rs-02d9bdcc96c955b654a45d3629b1ee515902078d.zip
Full inbound/outbound router test
Diffstat (limited to '')
-rw-r--r--src/router/peer.rs129
1 files changed, 107 insertions, 22 deletions
diff --git a/src/router/peer.rs b/src/router/peer.rs
index 0cd588d..9ad5d2f 100644
--- a/src/router/peer.rs
+++ b/src/router/peer.rs
@@ -1,19 +1,17 @@
use std::mem;
use std::net::{IpAddr, SocketAddr};
-use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
+use std::sync::atomic::AtomicBool;
+use std::sync::atomic::Ordering;
use std::sync::mpsc::{sync_channel, SyncSender};
-use std::sync::{Arc, Weak};
+use std::sync::Arc;
use std::thread;
+use arraydeque::{ArrayDeque, Wrapping};
use log::debug;
-
use spin::Mutex;
-
-use arraydeque::{ArrayDeque, Saturating, Wrapping};
-use zerocopy::{AsBytes, LayoutVerified};
-
use treebitmap::address::Address;
use treebitmap::IpLookupTable;
+use zerocopy::LayoutVerified;
use super::super::constants::*;
use super::super::types::{Bind, KeyPair, Tun};
@@ -29,9 +27,10 @@ use futures::*;
use super::workers::Operation;
use super::workers::{worker_inbound, worker_outbound};
use super::workers::{JobBuffer, JobInbound, JobOutbound, JobParallel};
+use super::SIZE_MESSAGE_PREFIX;
use super::constants::*;
-use super::types::Callbacks;
+use super::types::{Callbacks, RouterError};
pub struct KeyWheel {
next: Option<Arc<KeyPair>>, // next key state (unconfirmed)
@@ -45,11 +44,9 @@ pub struct PeerInner<C: Callbacks, T: Tun, B: Bind> {
pub opaque: C::Opaque,
pub outbound: Mutex<SyncSender<JobOutbound>>,
pub inbound: Mutex<SyncSender<JobInbound<C, T, B>>>,
- pub staged_packets: Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake
- pub rx_bytes: AtomicU64, // received bytes
- pub tx_bytes: AtomicU64, // transmitted bytes
- pub keys: Mutex<KeyWheel>, // key-wheel
- pub ekey: Mutex<Option<EncryptionState>>, // encryption state
+ pub staged_packets: Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>,
+ pub keys: Mutex<KeyWheel>,
+ pub ekey: Mutex<Option<EncryptionState>>,
pub endpoint: Mutex<Option<B::Endpoint>>,
}
@@ -193,8 +190,6 @@ pub fn new_peer<C: Callbacks, T: Tun, B: Bind>(
previous: None,
retired: None,
}),
- rx_bytes: AtomicU64::new(0),
- tx_bytes: AtomicU64::new(0),
staged_packets: spin::Mutex::new(ArrayDeque::new()),
})
};
@@ -254,7 +249,7 @@ impl<C: Callbacks, T: Tun, B: Bind> PeerInner<C, T, B> {
mut msg: Vec<u8>,
) -> Option<JobParallel> {
let (tx, rx) = oneshot();
- let key = dec.keypair.send.key;
+ let key = dec.keypair.recv.key;
match self.inbound.lock().try_send((dec, src, rx)) {
Ok(_) => Some((
tx,
@@ -270,7 +265,11 @@ impl<C: Callbacks, T: Tun, B: Bind> PeerInner<C, T, B> {
}
pub fn send_job(&self, mut msg: Vec<u8>) -> Option<JobParallel> {
- debug_assert!(msg.len() >= mem::size_of::<TransportHeader>());
+ debug_assert!(
+ msg.len() >= mem::size_of::<TransportHeader>(),
+ "received message with size: {:}",
+ msg.len()
+ );
// parse / cast
let (header, _) = LayoutVerified::new_from_prefix(&mut msg[..]).unwrap();
@@ -318,6 +317,16 @@ impl<C: Callbacks, T: Tun, B: Bind> PeerInner<C, T, B> {
}
impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> {
+ /// Set the endpoint of the peer
+ ///
+ /// # Arguments
+ ///
+ /// - `endpoint`, socket address converted to bind endpoint
+ ///
+ /// # Note
+ ///
+ /// This API still permits support for the "sticky socket" behavior,
+ /// as sockets should be "unsticked" when manually updating the endpoint
pub fn set_endpoint(&self, endpoint: SocketAddr) {
*self.state.endpoint.lock() = Some(endpoint.into());
}
@@ -372,18 +381,67 @@ impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> {
);
}
+ // schedule confirmation
+ if new.initiator {
+ // attempt to confirm with staged packets
+ let mut staged = self.state.staged_packets.lock();
+ let keepalive = staged.len() == 0;
+ loop {
+ match staged.pop_front() {
+ Some(msg) => {
+ debug!("send staged packet to confirm key-pair");
+ self.send_raw(msg);
+ }
+ None => break,
+ }
+ }
+
+ // fall back to keepalive packet
+ if keepalive {
+ let ok = self.keepalive();
+ debug!("keepalive for confirmation, sent = {}", ok);
+ }
+ }
+
// return the released id (for handshake state machine)
release
}
- pub fn rx_bytes(&self) -> u64 {
- self.state.rx_bytes.load(Ordering::Relaxed)
+ fn send_raw(&self, msg: Vec<u8>) -> bool {
+ match self.state.send_job(msg) {
+ Some(job) => {
+ debug!("send_raw: got obtained send_job");
+ let device = &self.state.device;
+ let index = device.queue_next.fetch_add(1, Ordering::SeqCst);
+ let queues = device.queues.lock();
+ match queues[index % queues.len()].send(job) {
+ Ok(_) => true,
+ Err(_) => false,
+ }
+ }
+ None => false,
+ }
}
- pub fn tx_bytes(&self) -> u64 {
- self.state.tx_bytes.load(Ordering::Relaxed)
+ pub fn keepalive(&self) -> bool {
+ debug!("send keepalive");
+ self.send_raw(vec![0u8; SIZE_MESSAGE_PREFIX])
}
+ /// Map a subnet to the peer
+ ///
+ /// # Arguments
+ ///
+ /// - `ip`, the mask of the subnet
+ /// - `masklen`, the length of the mask
+ ///
+ /// # Note
+ ///
+ /// The `ip` must not have any bits set right of `masklen`.
+ /// e.g. `192.168.1.0/24` is valid, while `192.168.1.128/24` is not.
+ ///
+ /// If an identical value already exists as part of a prior peer,
+ /// the allowed IP entry will be removed from that peer and added to this peer.
pub fn add_subnet(&self, ip: IpAddr, masklen: u32) {
match ip {
IpAddr::V4(v4) => {
@@ -403,6 +461,11 @@ impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> {
};
}
+ /// List subnets mapped to the peer
+ ///
+ /// # Returns
+ ///
+ /// A vector of subnets, represented by as mask/size
pub fn list_subnets(&self) -> Vec<(IpAddr, u32)> {
let mut res = Vec::new();
res.append(&mut treebit_list(
@@ -418,10 +481,32 @@ impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> {
res
}
+ /// Clear subnets mapped to the peer.
+ /// After the call, no subnets will be cryptkey routed to the peer.
+ /// Used for the UAPI command "replace_allowed_ips=true"
pub fn remove_subnets(&self) {
treebit_remove(self, &self.state.device.ipv4);
treebit_remove(self, &self.state.device.ipv6);
}
- fn send(&self, msg: Vec<u8>) {}
+ /// Send a raw message to the peer (used for handshake messages)
+ ///
+ /// # Arguments
+ ///
+ /// - `msg`, message body to send to peer
+ ///
+ /// # Returns
+ ///
+ /// Unit if packet was sent, or an error indicating why sending failed
+ pub fn send(&self, msg: &[u8]) -> Result<(), RouterError> {
+ let inner = &self.state;
+ match inner.endpoint.lock().as_ref() {
+ Some(endpoint) => inner
+ .device
+ .bind
+ .send(msg, endpoint)
+ .map_err(|_| RouterError::SendError),
+ None => Err(RouterError::NoEndpoint),
+ }
+ }
}