summaryrefslogtreecommitdiffstats
path: root/src/mpq.h
diff options
context:
space:
mode:
authorMatt Dunwoodie <ncon@mail.noconroy.net>2019-07-29 01:54:20 +1000
committerMatt Dunwoodie <ncon@mail.noconroy.net>2019-08-08 21:37:38 +1000
commit1a776975b6d0b923cfe23934bae371c57f99c6f9 (patch)
treeccb501408cd63cbc99cfabd3d82ed43d81019a65 /src/mpq.h
parentDecrease coupling between if_wg.c and wireguard.c more (diff)
downloadwireguard-openbsd-1a776975b6d0b923cfe23934bae371c57f99c6f9.tar.xz
wireguard-openbsd-1a776975b6d0b923cfe23934bae371c57f99c6f9.zip
Remove struct wg_pkt
It was fun while it lasted, but will be nice just to rely on a mbuf_queue for queueing packets. That will require a bit of work for a functioning multithreaded mpq_*, but should be fun.
Diffstat (limited to 'src/mpq.h')
-rw-r--r--src/mpq.h269
1 files changed, 91 insertions, 178 deletions
diff --git a/src/mpq.h b/src/mpq.h
index 2c7de74d192..f9b761d66ee 100644
--- a/src/mpq.h
+++ b/src/mpq.h
@@ -1,186 +1,99 @@
#ifndef _MPQ_H_
#define _MPQ_H_
-#include <sys/kernel.h>
-#include <sys/rwlock.h>
-#include <sys/task.h>
-
-#define MPQ_HEAD(name, type) \
-struct name { \
- struct rwlock write_lock; \
- struct rwlock serial_lock; \
- struct taskq *taskq; \
- struct task parallel_task, serial_task; \
- void (*parallel)(type *); \
- void (*serial)(type *); \
- size_t parallel_head, serial_head, tail, size; \
- int barrier; \
- struct mpq_queue_item_##name { \
- enum mpq_item_state { \
- MPQ_ITEM_NONE = 0, \
- MPQ_ITEM_WAITING, \
- MPQ_ITEM_PROCESSING, \
- MPQ_ITEM_DONE, \
- } state; \
- type data; \
- } *items; \
-} \
-
-/*
- * H = Head, T = Tail, N = size
- * H T T-H N-(H-T)
- * 5 5 0 8
- * 5 6 1 9
- * 5 7 2 10
- * 5 0 -5 3
- * 5 1 -4 4
- * 5 2 -3 5
- * 5 3 -2 6
- * 5 4 -1 7
-*/
-
-#define INC_WRAP(v, max) (v) = ((v) >= (max - 1) ? 0 : (v) + 1)
-#define CIRC_LEN(h, t, n) ((t) >= (h) ? ((t) - (h)) : ((n) - ((h) - (t))))
-
-#define MPQ_GET_SIZE(q) (q)->size
-
-#define MPQ_SERIAL_LEN(q) CIRC_LEN((q)->serial_head, (q)->tail, (q)->size)
-#define MPQ_SERIAL_FULL(q) (MPQ_SERIAL_LEN(q) >= ((q)->size - 1))
-#define MPQ_SERIAL_EMPTY(q) (MPQ_SERIAL_LEN(q) == 0)
-
-#define MPQ_PARALLEL_LEN(q) CIRC_LEN((q)->parallel_head, (q)->tail, (q)->size)
-#define MPQ_PARALLEL_FULL(q) (MPQ_PARALLEL_LEN(q) >= ((q)->size - 1))
-#define MPQ_PARALLEL_EMPTY(q) (MPQ_PARALLEL_LEN(q) == 0)
-
-#define MPQ_GENERATE(name, type) \
-void \
-mpq_parallel_worker_##name(void *_q) \
-{ \
- struct name *q = _q; \
- struct mpq_queue_item_##name *i; \
- while (1) { \
- rw_enter_write(&q->write_lock); \
- if (MPQ_PARALLEL_EMPTY(q)) { \
- rw_exit_write(&q->write_lock); \
- break; \
- } \
- \
- i = &q->items[q->parallel_head]; \
- assert(i->state == MPQ_ITEM_WAITING); \
- \
- i->state = MPQ_ITEM_PROCESSING; \
- INC_WRAP(q->parallel_head, q->size); \
- rw_exit_write(&q->write_lock); \
- \
- q->parallel(&i->data); \
- \
- rw_enter_write(&q->write_lock); \
- i->state = MPQ_ITEM_DONE; \
- rw_exit_write(&q->write_lock); \
- } \
- if (rw_status(&q->serial_lock) == 0) \
- task_add(q->taskq, &q->serial_task); \
-} \
-\
-void \
-mpq_serial_worker_##name(void *_q) \
-{ \
- struct name *q = _q; \
- struct mpq_queue_item_##name *i; \
- rw_enter_write(&q->serial_lock); \
- while (1) { \
- rw_enter_write(&q->write_lock); \
- i = &q->items[q->serial_head]; \
- if (i->state != MPQ_ITEM_DONE) { \
- rw_exit_write(&q->write_lock); \
- break; \
- } \
- rw_exit_write(&q->write_lock); \
- \
- q->serial(&i->data); \
- \
- rw_enter_write(&q->write_lock); \
- i->state = MPQ_ITEM_NONE; \
- INC_WRAP(q->serial_head, q->size); \
- rw_exit_write(&q->write_lock); \
- } \
- rw_exit_write(&q->serial_lock); \
-} \
-\
-int \
-mpq_add_##name(struct name *q, type *d) \
-{ \
- rw_enter_write(&(q)->write_lock); \
- if (MPQ_SERIAL_FULL(q) || (q)->barrier) { \
- rw_exit_write(&(q)->write_lock); \
- return 1; \
- } \
- \
- (q)->items[(q)->tail].data = *d; \
- (q)->items[(q)->tail].state = MPQ_ITEM_WAITING; \
- INC_WRAP((q)->tail, (q)->size); \
- rw_exit_write(&(q)->write_lock); \
- task_add((q)->taskq, &(q)->parallel_task); \
- return 0; \
+#include <sys/param.h>
+#include <sys/kthread.h>
+#include <sys/mbuf.h>
+#include <sys/systm.h>
+
+struct mpq {
+ struct mbuf_queue mpq_mq;
+ struct mbuf *(*mpq_parallel)(struct mbuf *);
+ void (*mpq_serial)(struct mbuf *);
+};
+
+void mpq_init(struct mpq *, int, struct mbuf *(*)(struct mbuf *), void (*)(struct mbuf *));
+void mpq_barrier(struct mpq *);
+void mpq_destroy(struct mpq *);
+int mpq_enqueue(struct mpq *, struct mbuf *);
+int mpq_load(struct mpq *);
+
+#define MPQ_DEFAULTLEN 128
+
+/* Internal functions */
+void
+_mpq_thread(void *_mpq)
+{
+ struct mbuf *m;
+ struct mpq *mpq = _mpq;
+
+ while (1) {
+ if ((m = mq_dequeue(&mpq->mpq_mq)) == NULL) {
+ tsleep(&mpq->mpq_mq, PWAIT, "bored", 0);
+ continue;
+ }
+
+ if (m->m_pkthdr.ph_cookie == m) {
+ m->m_pkthdr.ph_cookie = NULL;
+ wakeup(m);
+ continue;
+ }
+
+ if (m->m_pkthdr.ph_cookie == m + 1) {
+ m_freem(m);
+ kthread_exit(0);
+ }
+
+ m = mpq->mpq_parallel(m);
+
+ if (m != NULL && mpq->mpq_serial != NULL)
+ mpq->mpq_serial(m);
+ }
}
-#define MPQ_ADD(name, q, d) mpq_add_##name(q, d)
-
-#define MPQ_INIT(name, q, taskq_p, parallel_fn, serial_fn) do { \
- rw_init(&(q)->write_lock, "mpq_write"); \
- rw_init(&(q)->serial_lock, "mpq_serial"); \
- (q)->parallel = parallel_fn; \
- (q)->serial = serial_fn; \
- (q)->parallel_head = 0; \
- (q)->serial_head = 0; \
- (q)->taskq = taskq_p; \
- (q)->tail = 0; \
- (q)->size = 0; \
- (q)->barrier = 0; \
- task_set(&(q)->parallel_task, mpq_parallel_worker_##name, (q)); \
- task_set(&(q)->serial_task, mpq_serial_worker_##name, (q)); \
-} while (0)
-
-#define MPQ_BARRIER_ENTER(q) do { \
- size_t i; \
- rw_enter_write(&(q)->write_lock); \
- (q)->barrier = 1; \
- rw_exit_write(&(q)->write_lock); \
- \
- rw_enter_read(&(q)->write_lock); \
- for(i = 0; i < (q)->size; i++) { \
- if ((q)->items[i].state != MPQ_ITEM_NONE) { \
- rw_exit_read(&(q)->write_lock); \
- tsleep((q), PZERO, "mpq_barrier", 1); \
- rw_enter_read(&(q)->write_lock); \
- } \
- } \
- rw_exit_read(&(q)->write_lock); \
-} while (0)
-
-#define MPQ_BARRIER_LEAVE(q) do { \
- rw_enter_write(&(q)->write_lock); \
- (q)->barrier = 0; \
- rw_exit_write(&(q)->write_lock); \
-} while (0)
-
-#define MPQ_BARRIER(q) do { \
- MPQ_BARRIER_ENTER(q); \
- MPQ_BARRIER_LEAVE(q); \
-} while (0)
-
-#define MPQ_DESTROY(q) do { \
- MPQ_BARRIER_ENTER(q); \
- free((q)->items, M_DEVBUF, 0); \
-} while (0)
-
-#define MPQ_SET_SIZE(name, q, size_v) do { \
- MPQ_DESTROY(q); \
- (q)->size = size_v; \
- (q)->items = mallocarray((q)->size, sizeof(struct mpq_queue_item_##name), \
- M_DEVBUF, M_WAIT | M_ZERO); \
- MPQ_BARRIER_LEAVE(q); \
-} while (0)
+/* External functions */
+void
+mpq_init(struct mpq *mpq, int ipl, struct mbuf *(*parallel)(struct mbuf *), void (*serial)(struct mbuf *))
+{
+ mq_init(&mpq->mpq_mq, MPQ_DEFAULTLEN, ipl);
+ mpq->mpq_parallel = parallel;
+ mpq->mpq_serial = serial;
+ kthread_create(_mpq_thread, mpq, NULL, "wireguard");
+}
+
+void
+mpq_barrier(struct mpq *mpq)
+{
+ struct mbuf *m = m_gethdr(M_WAITOK, MT_CONTROL);
+ m->m_pkthdr.ph_cookie = m;
+ mpq_enqueue(mpq, m);
+ while (m->m_pkthdr.ph_cookie == m)
+ tsleep(m, PWAIT, "wireguard_barrier", 0);
+ m_freem(m);
+}
+
+void
+mpq_destroy(struct mpq *mpq)
+{
+ struct mbuf *m = m_gethdr(M_WAITOK, MT_CONTROL);
+ m->m_pkthdr.ph_cookie = m + 1;
+ mpq_enqueue(mpq, m);
+}
+
+int
+mpq_enqueue(struct mpq *mpq, struct mbuf *m)
+{
+ int ret;
+ m->m_flags &= ~M_PROTO1;
+ ret = mq_enqueue(&mpq->mpq_mq, m);
+ wakeup(&mpq->mpq_mq);
+ return ret;
+}
+int
+mpq_load(struct mpq *mpq)
+{
+ return 0;
+}
#endif /* _MPQ_H_ */