summaryrefslogtreecommitdiffstats
path: root/src/mpq.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mpq.h')
-rw-r--r--src/mpq.h171
1 files changed, 144 insertions, 27 deletions
diff --git a/src/mpq.h b/src/mpq.h
index ff401b0c266..5a861ca08b5 100644
--- a/src/mpq.h
+++ b/src/mpq.h
@@ -2,42 +2,159 @@
#define _MPQ_H_
#include <sys/mutex.h>
-#include <sys/task.h>
-#include <sys/queue.h>
+#include <sys/kthread.h>
struct mpq_item {
- TAILQ_ENTRY(mpq_item) i_worker;
- TAILQ_ENTRY(mpq_item) i_serial;
- struct cond i_cond;
- enum mpq_istate {
- MPQ_ITEM_WAITING,
- MPQ_ITEM_PROCESSED,
- MPQ_ITEM_BARRIER,
- } i_state;
- void *i_val;
+ struct wg_pkt pkt;
};
struct mpq {
- struct mutex q_mtx;
- struct taskq *q_taskq;
- struct rwlock q_serial_lock;
+ struct mutex mtx;
+ int destroy;
+ void (*parallel)(struct mpq_item *);
+ void (*serial)(struct mpq_item *);
+ size_t parallel_head, serial_head, tail, len;
+ struct mpq_queue_item {
+ enum mpq_item_state {
+ MPQ_ITEM_NONE = 0,
+ MPQ_ITEM_WAITING,
+ MPQ_ITEM_PROCESSING,
+ MPQ_ITEM_DONE,
+ } state;
+ struct mpq_item data;
+ } *items;
+};
- size_t q_max;
- size_t q_len;
+/*
+ * H = Head, T = Tail, N = len
+ * H T T-H N-(H-T)
+ * 5 5 0 8
+ * 5 6 1 9
+ * 5 7 2 10
+ * 5 0 -5 3
+ * 5 1 -4 4
+ * 5 2 -3 5
+ * 5 3 -2 6
+ * 5 4 -1 7
+*/
- struct task q_worker_task;
- struct task q_serial_task;
+#define INC_WRAP(v, max) (v) = ((v) >= (max - 1) ? 0 : (v) + 1)
+#define CIRC_LEN(h, t, n) ((t) >= (h) ? ((t) - (h)) : ((n) - ((h) - (t))))
- void (*q_worker_fn)(void *);
- void (*q_serial_fn)(void *);
+#define MPQ_SERIAL_LEN(q) CIRC_LEN((q)->serial_head, (q)->tail, (q)->len)
+#define MPQ_SERIAL_FULL(q) (MPQ_SERIAL_LEN(q) >= ((q)->len - 1))
+#define MPQ_SERIAL_EMPTY(q) (MPQ_SERIAL_LEN(q) == 0)
- TAILQ_HEAD(, mpq_item) q_worker_items;
- TAILQ_HEAD(, mpq_item) q_serial_items;
-};
+#define MPQ_PARALLEL_LEN(q) CIRC_LEN((q)->parallel_head, (q)->tail, (q)->len)
+#define MPQ_PARALLEL_FULL(q) (MPQ_PARALLEL_LEN(q) >= ((q)->len - 1))
+#define MPQ_PARALLEL_EMPTY(q) (MPQ_PARALLEL_LEN(q) == 0)
+
+
+void mpq_init(struct mpq *, int ipl, uint8_t, void (*)(struct mpq_item *),
+ void (*)(struct mpq_item *));
+int mpq_resize(struct mpq *, size_t);
+int mpq_add(struct mpq *, struct mpq_item *);
+void mpq_destroy(struct mpq *);
+
+void
+mpq_parallel_worker(void *_q)
+{
+ struct mpq *q = _q;
+ struct mpq_queue_item *i;
+ while (1) {
+ while (MPQ_PARALLEL_EMPTY(q)) {
+ tsleep(&q->tail, PZERO, "bored", 0);
+ }
+
+ mtx_enter(&q->mtx);
+ i = &q->items[q->parallel_head];
+ if (i->state != MPQ_ITEM_WAITING) {
+ mtx_leave(&q->mtx);
+ continue;
+ }
+
+ INC_WRAP(q->parallel_head, q->len);
+ i->state = MPQ_ITEM_PROCESSING;
+ mtx_leave(&q->mtx);
+
+ q->parallel(&i->data);
+
+ mtx_enter(&q->mtx);
+ i->state = MPQ_ITEM_DONE;
+ mtx_leave(&q->mtx);
+
+ wakeup_one(&q->parallel_head);
+ }
+}
+
+void
+mpq_serial_worker(void *_q)
+{
+ struct mpq *q = _q;
+ struct mpq_queue_item *i;
+ while (1) {
+ while (q->items[q->serial_head].state != MPQ_ITEM_DONE) {
+ tsleep(&q->parallel_head, PZERO, "bored", 0);
+ }
+
+ i = &q->items[q->serial_head];
+
+ q->serial(&i->data);
+
+ INC_WRAP(q->serial_head, q->len);
+
+ mtx_enter(&q->mtx);
+ i->state = MPQ_ITEM_NONE;
+ mtx_leave(&q->mtx);
+ }
+}
+
+int
+mpq_add(struct mpq *q, struct mpq_item *i)
+{
+ mtx_enter(&q->mtx);
+ if (MPQ_SERIAL_FULL(q)) {
+ mtx_leave(&q->mtx);
+ return 1;
+ }
+
+ q->items[q->tail].data = *i;
+ q->items[q->tail].state = MPQ_ITEM_WAITING;
+ INC_WRAP(q->tail, q->len);
+ mtx_leave(&q->mtx);
+ wakeup_one(&q->tail);
+ return 0;
+}
+
+void
+mpq_init(struct mpq *q, int ipl, uint8_t nthreads, void (*parallel)(struct mpq_item *), void(*serial)(struct mpq_item *))
+{
+ mtx_init(&q->mtx, ipl);
+ q->parallel = parallel;
+ q->serial = serial;
+ q->destroy = 0;
+ q->parallel_head = 0;
+ q->serial_head = 0;
+ q->tail = 0;
+ q->len = 0;
+
+ kthread_create(mpq_serial_worker, q, NULL, "mpq_serial");
+ while(nthreads--)
+ kthread_create(mpq_parallel_worker, q, NULL, "mpq_parallel");
+}
+
+int
+mpq_resize(struct mpq *q, size_t len)
+{
+ q->len = len;
+ q->items = mallocarray(q->len, sizeof(struct mpq_queue_item), M_DEVBUF, M_WAIT | M_ZERO);
+ return 0;
+}
-void mpq_init(struct mpq *, struct taskq *, int ipl, void (*)(void *), void (*)(void *), size_t);
-int mpq_add(struct mpq *, void *);
-int mpq_active(struct mpq *);
-size_t mpq_len(struct mpq *);
+void
+mpq_destroy(struct mpq *q)
+{
+ q->destroy = 1;
+}
#endif /* _MPQ_H_ */