diff options
Diffstat (limited to 'src/router/peer.rs')
-rw-r--r-- | src/router/peer.rs | 70 |
1 files changed, 62 insertions, 8 deletions
diff --git a/src/router/peer.rs b/src/router/peer.rs index e21e69c..d755fa5 100644 --- a/src/router/peer.rs +++ b/src/router/peer.rs @@ -1,3 +1,4 @@ +use std::mem; use std::net::{IpAddr, SocketAddr}; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::mpsc::{sync_channel, SyncSender}; @@ -7,18 +8,23 @@ use std::thread; use spin; use arraydeque::{ArrayDeque, Wrapping}; +use zerocopy::{AsBytes, LayoutVerified}; use treebitmap::address::Address; use treebitmap::IpLookupTable; use super::super::constants::*; -use super::super::types::{KeyPair, Tun, Bind}; +use super::super::types::{Bind, KeyPair, Tun}; use super::anti_replay::AntiReplay; use super::device::DecryptionState; use super::device::DeviceInner; use super::device::EncryptionState; -use super::workers::{worker_inbound, worker_outbound, JobInbound, JobOutbound}; +use super::messages::TransportHeader; + +use super::workers::{worker_inbound, worker_outbound}; +use super::workers::{JobBuffer, JobInbound, JobInner, JobOutbound}; +use super::workers::{Operation, Status}; use super::types::Callbacks; @@ -40,16 +46,14 @@ pub struct PeerInner<C: Callbacks, T: Tun, B: Bind> { pub queue_outbound: SyncSender<JobOutbound>, pub queue_inbound: SyncSender<JobInbound<C, T, B>>, pub staged_packets: spin::Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake - pub rx_bytes: AtomicU64, // received bytes - pub tx_bytes: AtomicU64, // transmitted bytes + pub rx_bytes: AtomicU64, // received bytes + pub tx_bytes: AtomicU64, // transmitted bytes pub keys: spin::Mutex<KeyWheel>, // key-wheel pub ekey: spin::Mutex<Option<EncryptionState>>, // encryption state pub endpoint: spin::Mutex<Option<Arc<SocketAddr>>>, } -pub struct Peer<C: Callbacks, T: Tun, B: Bind>( - Arc<PeerInner<C, T, B>>, -); +pub struct Peer<C: Callbacks, T: Tun, B: Bind>(Arc<PeerInner<C, T, B>>); fn treebit_list<A, E, C: Callbacks, T: Tun, B: Bind>( peer: &Arc<PeerInner<C, T, B>>, @@ -212,6 +216,51 @@ impl<C: Callbacks, T: Tun, B: Bind> PeerInner<C, T, B> { // rotate key-wheel } + + pub fn send_job(&self, mut msg: Vec<u8>) -> Option<JobBuffer> { + debug_assert!(msg.len() >= mem::size_of::<TransportHeader>()); + + // parse / cast + let (header, _) = LayoutVerified::new_from_prefix(&mut msg[..]).unwrap(); + let mut header: LayoutVerified<&mut [u8], TransportHeader> = header; + + // check if has key + let key = match self.ekey.lock().as_mut() { + None => { + // add to staged packets (create no job) + (self.device.call_need_key)(&self.opaque); + self.staged_packets.lock().push_back(msg); + return None; + } + Some(mut state) => { + // allocate nonce + state.nonce += 1; + if state.nonce >= REJECT_AFTER_MESSAGES { + state.nonce -= 1; + return None; + } + + // set transport message fields + header.f_counter.set(state.nonce); + header.f_receiver.set(state.id); + state.key + } + }; + + // create job + let job = Arc::new(spin::Mutex::new(JobInner { + msg, + key, + status: Status::Waiting, + op: Operation::Encryption, + })); + + // add job to in-order queue and return to device for inclusion in worker pool + match self.queue_outbound.try_send(job.clone()) { + Ok(_) => Some(job), + Err(_) => None, + } + } } impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> { @@ -332,5 +381,10 @@ impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> { res } - pub fn send(&self, msg: Vec<u8>) {} + pub fn remove_subnets(&self) { + treebit_remove(self, &self.0.device.ipv4); + treebit_remove(self, &self.0.device.ipv6); + } + + fn send(&self, msg: Vec<u8>) {} } |