aboutsummaryrefslogtreecommitdiffstats
path: root/src/wireguard/queue.rs
blob: f4843203d584fc7699057d1cc8fbcfa32a90a97f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::sync_channel;
use std::sync::mpsc::{Receiver, SyncSender};
use std::sync::Mutex;

/// A simple parallel queue used to pass work to a worker pool.
///
/// Unlike e.g. the crossbeam multi-producer multi-consumer queue
/// the ParallelQueue offers fewer features and instead improves speed:
///
/// The crossbeam channel ensures that elements are consumed
/// even if not every Receiver is being read from.
/// This is not ensured by ParallelQueue.
pub struct ParallelQueue<T> {
    next: AtomicUsize,                         // next round-robin index
    queues: Vec<Mutex<Option<SyncSender<T>>>>, // work queues (1 per thread)
}

impl<T> ParallelQueue<T> {
    /// Create a new ParallelQueue instance
    ///
    /// # Arguments
    ///
    /// - `queues`: number of readers
    /// - `capacity`: capacity of each internal queue
    ///
    pub fn new(queues: usize, capacity: usize) -> (Self, Vec<Receiver<T>>) {
        let mut rxs = vec![];
        let mut txs = vec![];

        for _ in 0..queues {
            let (tx, rx) = sync_channel(capacity);
            txs.push(Mutex::new(Some(tx)));
            rxs.push(rx);
        }

        (
            ParallelQueue {
                next: AtomicUsize::new(0),
                queues: txs,
            },
            rxs,
        )
    }

    pub fn send(&self, v: T) {
        let len = self.queues.len();
        let idx = self.next.fetch_add(1, Ordering::SeqCst);
        match self.queues[idx % len].lock().unwrap().as_ref() {
            Some(que) => {
                // TODO: consider best way to propergate Result
                let _ = que.send(v);
            }
            _ => (),
        }
    }

    pub fn close(&self) {
        for i in 0..self.queues.len() {
            let queue = &self.queues[i];
            *queue.lock().unwrap() = None;
        }
    }
}