diff options
Diffstat (limited to 'src/router/peer.rs')
-rw-r--r-- | src/router/peer.rs | 129 |
1 files changed, 107 insertions, 22 deletions
diff --git a/src/router/peer.rs b/src/router/peer.rs index 0cd588d..9ad5d2f 100644 --- a/src/router/peer.rs +++ b/src/router/peer.rs @@ -1,19 +1,17 @@ use std::mem; use std::net::{IpAddr, SocketAddr}; -use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; use std::sync::mpsc::{sync_channel, SyncSender}; -use std::sync::{Arc, Weak}; +use std::sync::Arc; use std::thread; +use arraydeque::{ArrayDeque, Wrapping}; use log::debug; - use spin::Mutex; - -use arraydeque::{ArrayDeque, Saturating, Wrapping}; -use zerocopy::{AsBytes, LayoutVerified}; - use treebitmap::address::Address; use treebitmap::IpLookupTable; +use zerocopy::LayoutVerified; use super::super::constants::*; use super::super::types::{Bind, KeyPair, Tun}; @@ -29,9 +27,10 @@ use futures::*; use super::workers::Operation; use super::workers::{worker_inbound, worker_outbound}; use super::workers::{JobBuffer, JobInbound, JobOutbound, JobParallel}; +use super::SIZE_MESSAGE_PREFIX; use super::constants::*; -use super::types::Callbacks; +use super::types::{Callbacks, RouterError}; pub struct KeyWheel { next: Option<Arc<KeyPair>>, // next key state (unconfirmed) @@ -45,11 +44,9 @@ pub struct PeerInner<C: Callbacks, T: Tun, B: Bind> { pub opaque: C::Opaque, pub outbound: Mutex<SyncSender<JobOutbound>>, pub inbound: Mutex<SyncSender<JobInbound<C, T, B>>>, - pub staged_packets: Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake - pub rx_bytes: AtomicU64, // received bytes - pub tx_bytes: AtomicU64, // transmitted bytes - pub keys: Mutex<KeyWheel>, // key-wheel - pub ekey: Mutex<Option<EncryptionState>>, // encryption state + pub staged_packets: Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, + pub keys: Mutex<KeyWheel>, + pub ekey: Mutex<Option<EncryptionState>>, pub endpoint: Mutex<Option<B::Endpoint>>, } @@ -193,8 +190,6 @@ pub fn new_peer<C: Callbacks, T: Tun, B: Bind>( previous: None, retired: None, }), - rx_bytes: AtomicU64::new(0), - tx_bytes: AtomicU64::new(0), staged_packets: spin::Mutex::new(ArrayDeque::new()), }) }; @@ -254,7 +249,7 @@ impl<C: Callbacks, T: Tun, B: Bind> PeerInner<C, T, B> { mut msg: Vec<u8>, ) -> Option<JobParallel> { let (tx, rx) = oneshot(); - let key = dec.keypair.send.key; + let key = dec.keypair.recv.key; match self.inbound.lock().try_send((dec, src, rx)) { Ok(_) => Some(( tx, @@ -270,7 +265,11 @@ impl<C: Callbacks, T: Tun, B: Bind> PeerInner<C, T, B> { } pub fn send_job(&self, mut msg: Vec<u8>) -> Option<JobParallel> { - debug_assert!(msg.len() >= mem::size_of::<TransportHeader>()); + debug_assert!( + msg.len() >= mem::size_of::<TransportHeader>(), + "received message with size: {:}", + msg.len() + ); // parse / cast let (header, _) = LayoutVerified::new_from_prefix(&mut msg[..]).unwrap(); @@ -318,6 +317,16 @@ impl<C: Callbacks, T: Tun, B: Bind> PeerInner<C, T, B> { } impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> { + /// Set the endpoint of the peer + /// + /// # Arguments + /// + /// - `endpoint`, socket address converted to bind endpoint + /// + /// # Note + /// + /// This API still permits support for the "sticky socket" behavior, + /// as sockets should be "unsticked" when manually updating the endpoint pub fn set_endpoint(&self, endpoint: SocketAddr) { *self.state.endpoint.lock() = Some(endpoint.into()); } @@ -372,18 +381,67 @@ impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> { ); } + // schedule confirmation + if new.initiator { + // attempt to confirm with staged packets + let mut staged = self.state.staged_packets.lock(); + let keepalive = staged.len() == 0; + loop { + match staged.pop_front() { + Some(msg) => { + debug!("send staged packet to confirm key-pair"); + self.send_raw(msg); + } + None => break, + } + } + + // fall back to keepalive packet + if keepalive { + let ok = self.keepalive(); + debug!("keepalive for confirmation, sent = {}", ok); + } + } + // return the released id (for handshake state machine) release } - pub fn rx_bytes(&self) -> u64 { - self.state.rx_bytes.load(Ordering::Relaxed) + fn send_raw(&self, msg: Vec<u8>) -> bool { + match self.state.send_job(msg) { + Some(job) => { + debug!("send_raw: got obtained send_job"); + let device = &self.state.device; + let index = device.queue_next.fetch_add(1, Ordering::SeqCst); + let queues = device.queues.lock(); + match queues[index % queues.len()].send(job) { + Ok(_) => true, + Err(_) => false, + } + } + None => false, + } } - pub fn tx_bytes(&self) -> u64 { - self.state.tx_bytes.load(Ordering::Relaxed) + pub fn keepalive(&self) -> bool { + debug!("send keepalive"); + self.send_raw(vec![0u8; SIZE_MESSAGE_PREFIX]) } + /// Map a subnet to the peer + /// + /// # Arguments + /// + /// - `ip`, the mask of the subnet + /// - `masklen`, the length of the mask + /// + /// # Note + /// + /// The `ip` must not have any bits set right of `masklen`. + /// e.g. `192.168.1.0/24` is valid, while `192.168.1.128/24` is not. + /// + /// If an identical value already exists as part of a prior peer, + /// the allowed IP entry will be removed from that peer and added to this peer. pub fn add_subnet(&self, ip: IpAddr, masklen: u32) { match ip { IpAddr::V4(v4) => { @@ -403,6 +461,11 @@ impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> { }; } + /// List subnets mapped to the peer + /// + /// # Returns + /// + /// A vector of subnets, represented by as mask/size pub fn list_subnets(&self) -> Vec<(IpAddr, u32)> { let mut res = Vec::new(); res.append(&mut treebit_list( @@ -418,10 +481,32 @@ impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> { res } + /// Clear subnets mapped to the peer. + /// After the call, no subnets will be cryptkey routed to the peer. + /// Used for the UAPI command "replace_allowed_ips=true" pub fn remove_subnets(&self) { treebit_remove(self, &self.state.device.ipv4); treebit_remove(self, &self.state.device.ipv6); } - fn send(&self, msg: Vec<u8>) {} + /// Send a raw message to the peer (used for handshake messages) + /// + /// # Arguments + /// + /// - `msg`, message body to send to peer + /// + /// # Returns + /// + /// Unit if packet was sent, or an error indicating why sending failed + pub fn send(&self, msg: &[u8]) -> Result<(), RouterError> { + let inner = &self.state; + match inner.endpoint.lock().as_ref() { + Some(endpoint) => inner + .device + .bind + .send(msg, endpoint) + .map_err(|_| RouterError::SendError), + None => Err(RouterError::NoEndpoint), + } + } } |