summaryrefslogtreecommitdiffstats
path: root/src/router/peer.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/router/peer.rs')
-rw-r--r--src/router/peer.rs70
1 files changed, 62 insertions, 8 deletions
diff --git a/src/router/peer.rs b/src/router/peer.rs
index e21e69c..d755fa5 100644
--- a/src/router/peer.rs
+++ b/src/router/peer.rs
@@ -1,3 +1,4 @@
+use std::mem;
use std::net::{IpAddr, SocketAddr};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::mpsc::{sync_channel, SyncSender};
@@ -7,18 +8,23 @@ use std::thread;
use spin;
use arraydeque::{ArrayDeque, Wrapping};
+use zerocopy::{AsBytes, LayoutVerified};
use treebitmap::address::Address;
use treebitmap::IpLookupTable;
use super::super::constants::*;
-use super::super::types::{KeyPair, Tun, Bind};
+use super::super::types::{Bind, KeyPair, Tun};
use super::anti_replay::AntiReplay;
use super::device::DecryptionState;
use super::device::DeviceInner;
use super::device::EncryptionState;
-use super::workers::{worker_inbound, worker_outbound, JobInbound, JobOutbound};
+use super::messages::TransportHeader;
+
+use super::workers::{worker_inbound, worker_outbound};
+use super::workers::{JobBuffer, JobInbound, JobInner, JobOutbound};
+use super::workers::{Operation, Status};
use super::types::Callbacks;
@@ -40,16 +46,14 @@ pub struct PeerInner<C: Callbacks, T: Tun, B: Bind> {
pub queue_outbound: SyncSender<JobOutbound>,
pub queue_inbound: SyncSender<JobInbound<C, T, B>>,
pub staged_packets: spin::Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake
- pub rx_bytes: AtomicU64, // received bytes
- pub tx_bytes: AtomicU64, // transmitted bytes
+ pub rx_bytes: AtomicU64, // received bytes
+ pub tx_bytes: AtomicU64, // transmitted bytes
pub keys: spin::Mutex<KeyWheel>, // key-wheel
pub ekey: spin::Mutex<Option<EncryptionState>>, // encryption state
pub endpoint: spin::Mutex<Option<Arc<SocketAddr>>>,
}
-pub struct Peer<C: Callbacks, T: Tun, B: Bind>(
- Arc<PeerInner<C, T, B>>,
-);
+pub struct Peer<C: Callbacks, T: Tun, B: Bind>(Arc<PeerInner<C, T, B>>);
fn treebit_list<A, E, C: Callbacks, T: Tun, B: Bind>(
peer: &Arc<PeerInner<C, T, B>>,
@@ -212,6 +216,51 @@ impl<C: Callbacks, T: Tun, B: Bind> PeerInner<C, T, B> {
// rotate key-wheel
}
+
+ pub fn send_job(&self, mut msg: Vec<u8>) -> Option<JobBuffer> {
+ debug_assert!(msg.len() >= mem::size_of::<TransportHeader>());
+
+ // parse / cast
+ let (header, _) = LayoutVerified::new_from_prefix(&mut msg[..]).unwrap();
+ let mut header: LayoutVerified<&mut [u8], TransportHeader> = header;
+
+ // check if has key
+ let key = match self.ekey.lock().as_mut() {
+ None => {
+ // add to staged packets (create no job)
+ (self.device.call_need_key)(&self.opaque);
+ self.staged_packets.lock().push_back(msg);
+ return None;
+ }
+ Some(mut state) => {
+ // allocate nonce
+ state.nonce += 1;
+ if state.nonce >= REJECT_AFTER_MESSAGES {
+ state.nonce -= 1;
+ return None;
+ }
+
+ // set transport message fields
+ header.f_counter.set(state.nonce);
+ header.f_receiver.set(state.id);
+ state.key
+ }
+ };
+
+ // create job
+ let job = Arc::new(spin::Mutex::new(JobInner {
+ msg,
+ key,
+ status: Status::Waiting,
+ op: Operation::Encryption,
+ }));
+
+ // add job to in-order queue and return to device for inclusion in worker pool
+ match self.queue_outbound.try_send(job.clone()) {
+ Ok(_) => Some(job),
+ Err(_) => None,
+ }
+ }
}
impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> {
@@ -332,5 +381,10 @@ impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> {
res
}
- pub fn send(&self, msg: Vec<u8>) {}
+ pub fn remove_subnets(&self) {
+ treebit_remove(self, &self.0.device.ipv4);
+ treebit_remove(self, &self.0.device.ipv6);
+ }
+
+ fn send(&self, msg: Vec<u8>) {}
}