From f46762183a5fad4aba88379714abf1d21c3e1c54 Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Sun, 15 Sep 2019 15:15:15 +0200 Subject: Sent staged packets when key-pair confirmed --- src/router/peer.rs | 77 ++++++++++++++++++++++++++++----------------------- src/router/tests.rs | 11 +++----- src/router/types.rs | 1 - src/types/endpoint.rs | 7 +++-- 4 files changed, 50 insertions(+), 46 deletions(-) (limited to 'src') diff --git a/src/router/peer.rs b/src/router/peer.rs index f032f45..728be11 100644 --- a/src/router/peer.rs +++ b/src/router/peer.rs @@ -14,7 +14,7 @@ use treebitmap::IpLookupTable; use zerocopy::LayoutVerified; use super::super::constants::*; -use super::super::types::{Bind, KeyPair, Tun}; +use super::super::types::{Bind, Endpoint, KeyPair, Tun}; use super::anti_replay::AntiReplay; use super::device::DecryptionState; @@ -216,6 +216,35 @@ pub fn new_peer( } impl PeerInner { + fn send_staged(&self) -> bool { + let mut sent = false; + let mut staged = self.staged_packets.lock(); + loop { + match staged.pop_front() { + Some(msg) => { + sent = true; + self.send_raw(msg); + } + None => break sent, + } + } + } + + fn send_raw(&self, msg: Vec) -> bool { + match self.send_job(msg) { + Some(job) => { + debug!("send_raw: got obtained send_job"); + let index = self.device.queue_next.fetch_add(1, Ordering::SeqCst); + let queues = self.device.queues.lock(); + match queues[index % queues.len()].send(job) { + Ok(_) => true, + Err(_) => false, + } + } + None => false, + } + } + pub fn confirm_key(&self, keypair: &Arc) { // take lock and check keypair = keys.next let mut keys = self.keys.lock(); @@ -240,6 +269,9 @@ impl PeerInner { // set new encryption key *self.ekey.lock() = ekey; + + // start transmission of staged packets + self.send_staged(); } pub fn recv_job( @@ -327,12 +359,16 @@ impl Peer { /// /// This API still permits support for the "sticky socket" behavior, /// as sockets should be "unsticked" when manually updating the endpoint - pub fn set_endpoint(&self, endpoint: SocketAddr) { - *self.state.endpoint.lock() = Some(endpoint.into()); + pub fn set_endpoint(&self, address: SocketAddr) { + *self.state.endpoint.lock() = Some(B::Endpoint::from_address(address)); } pub fn get_endpoint(&self) -> Option { - self.state.endpoint.lock().as_ref().map(|e| (*e).into()) + self.state + .endpoint + .lock() + .as_ref() + .map(|e| e.into_address()) } /// Add a new keypair @@ -387,21 +423,8 @@ impl Peer { // schedule confirmation if new.initiator { - // attempt to confirm with staged packets - let mut staged = self.state.staged_packets.lock(); - let keepalive = staged.len() == 0; - loop { - match staged.pop_front() { - Some(msg) => { - debug!("send staged packet to confirm key-pair"); - self.send_raw(msg); - } - None => break, - } - } - // fall back to keepalive packet - if keepalive { + if !self.state.send_staged() { let ok = self.keepalive(); debug!("keepalive for confirmation, sent = {}", ok); } @@ -411,25 +434,9 @@ impl Peer { release } - fn send_raw(&self, msg: Vec) -> bool { - match self.state.send_job(msg) { - Some(job) => { - debug!("send_raw: got obtained send_job"); - let device = &self.state.device; - let index = device.queue_next.fetch_add(1, Ordering::SeqCst); - let queues = device.queues.lock(); - match queues[index % queues.len()].send(job) { - Ok(_) => true, - Err(_) => false, - } - } - None => false, - } - } - pub fn keepalive(&self) -> bool { debug!("send keepalive"); - self.send_raw(vec![0u8; SIZE_MESSAGE_PREFIX]) + self.state.send_raw(vec![0u8; SIZE_MESSAGE_PREFIX]) } /// Map a subnet to the peer diff --git a/src/router/tests.rs b/src/router/tests.rs index c2ea225..ca6312d 100644 --- a/src/router/tests.rs +++ b/src/router/tests.rs @@ -12,7 +12,7 @@ use num_cpus; use pnet::packet::ipv4::MutableIpv4Packet; use pnet::packet::ipv6::MutableIpv6Packet; -use super::super::types::{Bind, Key, KeyPair, Tun}; +use super::super::types::{Bind, Endpoint, Key, KeyPair, Tun}; use super::{Callbacks, Device, SIZE_MESSAGE_PREFIX}; extern crate test; @@ -70,14 +70,11 @@ impl fmt::Display for TunError { #[derive(Clone, Copy)] struct UnitEndpoint {} -impl From for UnitEndpoint { - fn from(addr: SocketAddr) -> UnitEndpoint { +impl Endpoint for UnitEndpoint { + fn from_address(_: SocketAddr) -> UnitEndpoint { UnitEndpoint {} } -} - -impl Into for UnitEndpoint { - fn into(self) -> SocketAddr { + fn into_address(&self) -> SocketAddr { "127.0.0.1:8080".parse().unwrap() } } diff --git a/src/router/types.rs b/src/router/types.rs index f9f867a..736e7c8 100644 --- a/src/router/types.rs +++ b/src/router/types.rs @@ -1,6 +1,5 @@ use std::error::Error; use std::fmt; -use std::marker::PhantomData; pub trait Opaque: Send + Sync + 'static {} diff --git a/src/types/endpoint.rs b/src/types/endpoint.rs index 02682a9..261203f 100644 --- a/src/types/endpoint.rs +++ b/src/types/endpoint.rs @@ -1,5 +1,6 @@ use std::net::SocketAddr; -pub trait Endpoint: Into + From + Copy + Send {} - -impl Endpoint for T where T: Into + From + Copy + Send {} +pub trait Endpoint: Send { + fn from_address(addr: SocketAddr) -> Self; + fn into_address(&self) -> SocketAddr; +} -- cgit v1.2.3-59-g8ed1b