summaryrefslogtreecommitdiffstats
path: root/src/wireguard/router/send.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/wireguard/router/send.rs')
-rw-r--r--src/wireguard/router/send.rs95
1 files changed, 49 insertions, 46 deletions
diff --git a/src/wireguard/router/send.rs b/src/wireguard/router/send.rs
index 2bd4abd..8e41796 100644
--- a/src/wireguard/router/send.rs
+++ b/src/wireguard/router/send.rs
@@ -1,4 +1,4 @@
-use super::queue::{Job, Queue};
+use super::queue::{SequentialJob, ParallelJob, Queue};
use super::KeyPair;
use super::types::Callbacks;
use super::peer::Peer;
@@ -22,8 +22,14 @@ struct Inner<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> {
peer: Peer<E, C, T, B>,
}
-pub struct SendJob<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> {
- inner: Arc<Inner<E, C, T, B>>,
+pub struct SendJob<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> (
+ Arc<Inner<E, C, T, B>>
+);
+
+impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Clone for SendJob<E, C, T, B> {
+ fn clone(&self) -> SendJob<E, C, T, B> {
+ SendJob(self.0.clone())
+ }
}
impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> SendJob<E, C, T, B> {
@@ -32,32 +38,53 @@ impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> SendJob<E, C
counter: u64,
keypair: Arc<KeyPair>,
peer: Peer<E, C, T, B>
- ) -> Option<SendJob<E, C, T, B>> {
- // create job
- let inner = Arc::new(Inner{
+ ) -> SendJob<E, C, T, B> {
+ SendJob(Arc::new(Inner{
buffer: Mutex::new(buffer),
counter,
keypair,
peer,
ready: AtomicBool::new(false)
- });
-
- // attempt to add to queue
- if peer.outbound.push(SendJob{ inner: inner.clone()}) {
- Some(SendJob{inner})
- } else {
- None
- }
+ }))
}
}
-impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Job for SendJob<E, C, T, B> {
- fn queue(&self) -> &Queue<Self> {
- &self.inner.peer.outbound
+impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> SequentialJob for SendJob<E, C, T, B> {
+
+ fn is_ready(&self) -> bool {
+ self.0.ready.load(Ordering::Acquire)
}
- fn is_ready(&self) -> bool {
- self.inner.ready.load(Ordering::Acquire)
+ fn sequential_work(self) {
+ debug_assert_eq!(
+ self.is_ready(),
+ true,
+ "doing sequential work
+ on an incomplete job"
+ );
+ log::trace!("processing sequential send job");
+
+ // send to peer
+ let job = &self.0;
+ let msg = job.buffer.lock();
+ let xmit = job.peer.send_raw(&msg[..]).is_ok();
+
+ // trigger callback (for timers)
+ C::send(
+ &job.peer.opaque,
+ msg.len(),
+ xmit,
+ &job.keypair,
+ job.counter,
+ );
+ }
+}
+
+
+impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> ParallelJob for SendJob<E, C, T, B> {
+
+ fn queue(&self) -> &Queue<Self> {
+ &self.0.peer.outbound
}
fn parallel_work(&self) {
@@ -71,7 +98,7 @@ impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Job for Send
// encrypt body
{
// make space for the tag
- let job = &*self.inner;
+ let job = &*self.0;
let mut msg = job.buffer.lock();
msg.extend([0u8; SIZE_TAG].iter());
@@ -111,30 +138,6 @@ impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Job for Send
}
// mark ready
- self.inner.ready.store(true, Ordering::Release);
- }
-
- fn sequential_work(self) {
- debug_assert_eq!(
- self.is_ready(),
- true,
- "doing sequential work
- on an incomplete job"
- );
- log::trace!("processing sequential send job");
-
- // send to peer
- let job = &self.inner;
- let msg = job.buffer.lock();
- let xmit = job.peer.send(&msg[..]).is_ok();
-
- // trigger callback (for timers)
- C::send(
- &job.peer.opaque,
- msg.len(),
- xmit,
- &job.keypair,
- job.counter,
- );
+ self.0.ready.store(true, Ordering::Release);
}
-}
+} \ No newline at end of file