From b62eeb89ab8a90f005dd48776e38dd33f0f3fb9e Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Sun, 16 Feb 2020 15:50:32 +0100 Subject: Work on reducing context switches --- src/wireguard/router/queue.rs | 76 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 src/wireguard/router/queue.rs (limited to 'src/wireguard/router/queue.rs') diff --git a/src/wireguard/router/queue.rs b/src/wireguard/router/queue.rs new file mode 100644 index 0000000..045fd51 --- /dev/null +++ b/src/wireguard/router/queue.rs @@ -0,0 +1,76 @@ +use arraydeque::ArrayDeque; +use spin::Mutex; + +use std::mem; +use std::sync::atomic::{AtomicUsize, Ordering}; + +const QUEUE_SIZE: usize = 1024; + +pub trait Job: Sized { + fn queue(&self) -> &Queue; + + fn is_ready(&self) -> bool; + + fn parallel_work(&self); + + fn sequential_work(self); +} + + +pub struct Queue { + contenders: AtomicUsize, + queue: Mutex>, +} + +impl Queue { + pub fn new() -> Queue { + Queue { + contenders: AtomicUsize::new(0), + queue: Mutex::new(ArrayDeque::new()), + } + } + + pub fn push(&self, job: J) -> bool { + self.queue.lock().push_back(job).is_ok() + } + + pub fn consume(&self) { + // check if we are the first contender + let pos = self.contenders.fetch_add(1, Ordering::Acquire); + if pos > 0 { + assert!(pos < usize::max_value(), "contenders overflow"); + } + + // enter the critical section + let mut contenders = 1; // myself + while contenders > 0 { + // handle every ready element + loop { + let mut queue = self.queue.lock(); + + // check if front job is ready + match queue.front() { + None => break, + Some(job) => { + if job.is_ready() { + () + } else { + break; + } + } + }; + + // take the job out of the queue + let job = queue.pop_front().unwrap(); + debug_assert!(job.is_ready()); + mem::drop(queue); + + // process element + job.sequential_work(); + } + + // decrease contenders + contenders = self.contenders.fetch_sub(contenders, Ordering::Acquire) - contenders; + } + } +} -- cgit v1.2.3-59-g8ed1b