diff options
Diffstat (limited to 'src/wireguard/router/send.rs')
-rw-r--r-- | src/wireguard/router/send.rs | 95 |
1 files changed, 49 insertions, 46 deletions
diff --git a/src/wireguard/router/send.rs b/src/wireguard/router/send.rs index 2bd4abd..8e41796 100644 --- a/src/wireguard/router/send.rs +++ b/src/wireguard/router/send.rs @@ -1,4 +1,4 @@ -use super::queue::{Job, Queue}; +use super::queue::{SequentialJob, ParallelJob, Queue}; use super::KeyPair; use super::types::Callbacks; use super::peer::Peer; @@ -22,8 +22,14 @@ struct Inner<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> { peer: Peer<E, C, T, B>, } -pub struct SendJob<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> { - inner: Arc<Inner<E, C, T, B>>, +pub struct SendJob<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> ( + Arc<Inner<E, C, T, B>> +); + +impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Clone for SendJob<E, C, T, B> { + fn clone(&self) -> SendJob<E, C, T, B> { + SendJob(self.0.clone()) + } } impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> SendJob<E, C, T, B> { @@ -32,32 +38,53 @@ impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> SendJob<E, C counter: u64, keypair: Arc<KeyPair>, peer: Peer<E, C, T, B> - ) -> Option<SendJob<E, C, T, B>> { - // create job - let inner = Arc::new(Inner{ + ) -> SendJob<E, C, T, B> { + SendJob(Arc::new(Inner{ buffer: Mutex::new(buffer), counter, keypair, peer, ready: AtomicBool::new(false) - }); - - // attempt to add to queue - if peer.outbound.push(SendJob{ inner: inner.clone()}) { - Some(SendJob{inner}) - } else { - None - } + })) } } -impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Job for SendJob<E, C, T, B> { - fn queue(&self) -> &Queue<Self> { - &self.inner.peer.outbound +impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> SequentialJob for SendJob<E, C, T, B> { + + fn is_ready(&self) -> bool { + self.0.ready.load(Ordering::Acquire) } - fn is_ready(&self) -> bool { - self.inner.ready.load(Ordering::Acquire) + fn sequential_work(self) { + debug_assert_eq!( + self.is_ready(), + true, + "doing sequential work + on an incomplete job" + ); + log::trace!("processing sequential send job"); + + // send to peer + let job = &self.0; + let msg = job.buffer.lock(); + let xmit = job.peer.send_raw(&msg[..]).is_ok(); + + // trigger callback (for timers) + C::send( + &job.peer.opaque, + msg.len(), + xmit, + &job.keypair, + job.counter, + ); + } +} + + +impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> ParallelJob for SendJob<E, C, T, B> { + + fn queue(&self) -> &Queue<Self> { + &self.0.peer.outbound } fn parallel_work(&self) { @@ -71,7 +98,7 @@ impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Job for Send // encrypt body { // make space for the tag - let job = &*self.inner; + let job = &*self.0; let mut msg = job.buffer.lock(); msg.extend([0u8; SIZE_TAG].iter()); @@ -111,30 +138,6 @@ impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Job for Send } // mark ready - self.inner.ready.store(true, Ordering::Release); - } - - fn sequential_work(self) { - debug_assert_eq!( - self.is_ready(), - true, - "doing sequential work - on an incomplete job" - ); - log::trace!("processing sequential send job"); - - // send to peer - let job = &self.inner; - let msg = job.buffer.lock(); - let xmit = job.peer.send(&msg[..]).is_ok(); - - // trigger callback (for timers) - C::send( - &job.peer.opaque, - msg.len(), - xmit, - &job.keypair, - job.counter, - ); + self.0.ready.store(true, Ordering::Release); } -} +}
\ No newline at end of file |