summaryrefslogtreecommitdiffstats
path: root/src/mpq.h
diff options
context:
space:
mode:
authorMatt Dunwoodie <ncon@mail.noconroy.net>2019-06-24 00:32:15 +1000
committerMatt Dunwoodie <ncon@mail.noconroy.net>2019-06-24 00:32:15 +1000
commitec6d4dbf0a0a095987395449198adb791d3c9863 (patch)
treecc513ed5dc8e15b5e6a3d4c7e1187941c9910473 /src/mpq.h
parentstaging16 (diff)
downloadwireguard-openbsd-ec6d4dbf0a0a095987395449198adb791d3c9863.tar.xz
wireguard-openbsd-ec6d4dbf0a0a095987395449198adb791d3c9863.zip
staging17
Diffstat (limited to 'src/mpq.h')
-rw-r--r--src/mpq.h305
1 files changed, 154 insertions, 151 deletions
diff --git a/src/mpq.h b/src/mpq.h
index f47732b66fe..2c7de74d192 100644
--- a/src/mpq.h
+++ b/src/mpq.h
@@ -5,33 +5,29 @@
#include <sys/rwlock.h>
#include <sys/task.h>
-struct mpq_item {
- struct wg_pkt pkt;
-};
-
-struct mpq {
- struct rwlock write_lock;
- struct rwlock serial_lock;
- struct taskq *taskq;
- struct task parallel_task, serial_task;
-
- void (*parallel)(struct mpq_item *);
- void (*serial)(struct mpq_item *);
- size_t parallel_head, serial_head, tail, len;
-
- struct mpq_queue_item {
- enum mpq_item_state {
- MPQ_ITEM_NONE = 0,
- MPQ_ITEM_WAITING,
- MPQ_ITEM_PROCESSING,
- MPQ_ITEM_DONE,
- } state;
- struct mpq_item data;
- } *items;
-};
+#define MPQ_HEAD(name, type) \
+struct name { \
+ struct rwlock write_lock; \
+ struct rwlock serial_lock; \
+ struct taskq *taskq; \
+ struct task parallel_task, serial_task; \
+ void (*parallel)(type *); \
+ void (*serial)(type *); \
+ size_t parallel_head, serial_head, tail, size; \
+ int barrier; \
+ struct mpq_queue_item_##name { \
+ enum mpq_item_state { \
+ MPQ_ITEM_NONE = 0, \
+ MPQ_ITEM_WAITING, \
+ MPQ_ITEM_PROCESSING, \
+ MPQ_ITEM_DONE, \
+ } state; \
+ type data; \
+ } *items; \
+} \
/*
- * H = Head, T = Tail, N = len
+ * H = Head, T = Tail, N = size
* H T T-H N-(H-T)
* 5 5 0 8
* 5 6 1 9
@@ -46,138 +42,145 @@ struct mpq {
#define INC_WRAP(v, max) (v) = ((v) >= (max - 1) ? 0 : (v) + 1)
#define CIRC_LEN(h, t, n) ((t) >= (h) ? ((t) - (h)) : ((n) - ((h) - (t))))
-#define MPQ_SERIAL_LEN(q) CIRC_LEN((q)->serial_head, (q)->tail, (q)->len)
-#define MPQ_SERIAL_FULL(q) (MPQ_SERIAL_LEN(q) >= ((q)->len - 1))
+#define MPQ_GET_SIZE(q) (q)->size
+
+#define MPQ_SERIAL_LEN(q) CIRC_LEN((q)->serial_head, (q)->tail, (q)->size)
+#define MPQ_SERIAL_FULL(q) (MPQ_SERIAL_LEN(q) >= ((q)->size - 1))
#define MPQ_SERIAL_EMPTY(q) (MPQ_SERIAL_LEN(q) == 0)
-#define MPQ_PARALLEL_LEN(q) CIRC_LEN((q)->parallel_head, (q)->tail, (q)->len)
-#define MPQ_PARALLEL_FULL(q) (MPQ_PARALLEL_LEN(q) >= ((q)->len - 1))
+#define MPQ_PARALLEL_LEN(q) CIRC_LEN((q)->parallel_head, (q)->tail, (q)->size)
+#define MPQ_PARALLEL_FULL(q) (MPQ_PARALLEL_LEN(q) >= ((q)->size - 1))
#define MPQ_PARALLEL_EMPTY(q) (MPQ_PARALLEL_LEN(q) == 0)
-
-void mpq_init(struct mpq *, struct taskq *, uint8_t, void (*)(struct mpq_item *),
- void (*)(struct mpq_item *));
-int mpq_resize(struct mpq *, size_t);
-int mpq_add(struct mpq *, struct mpq_item *);
-void mpq_destroy(struct mpq *);
-
-void
-mpq_parallel_worker(void *_q)
-{
- struct mpq *q = _q;
- struct mpq_queue_item *i;
- while (1) {
- rw_enter_write(&q->write_lock);
- if (MPQ_PARALLEL_EMPTY(q)) {
- rw_exit_write(&q->write_lock);
- /*
- if (tsleep(&q->parallel_head, PZERO, "wait", hz/100) == EWOULDBLOCK)
- break;
- else
- continue;
- */
- break;
- }
-
- i = &q->items[q->parallel_head];
- assert(i->state == MPQ_ITEM_WAITING);
-
- i->state = MPQ_ITEM_PROCESSING;
- INC_WRAP(q->parallel_head, q->len);
- rw_exit_write(&q->write_lock);
-
- q->parallel(&i->data);
-
- rw_enter_write(&q->write_lock);
- i->state = MPQ_ITEM_DONE;
- rw_exit_write(&q->write_lock);
- }
- task_add(q->taskq, &q->serial_task);
- //wakeup_one(&q->serial_head);
-}
-
-void
-mpq_serial_worker(void *_q)
-{
- struct mpq *q = _q;
- struct mpq_queue_item *i;
- rw_enter_write(&q->serial_lock);
- while (1) {
-
- rw_enter_write(&q->write_lock);
- i = &q->items[q->serial_head];
- if (i->state != MPQ_ITEM_DONE) {
- rw_exit_write(&q->write_lock);
- /*
- if (tsleep(&q->serial_head, PZERO, "wait", hz/100) == EWOULDBLOCK)
- break;
- else
- continue;
- */
- break;
- }
- rw_exit_write(&q->write_lock);
-
- q->serial(&i->data);
-
- rw_enter_write(&q->write_lock);
- i->state = MPQ_ITEM_NONE;
- INC_WRAP(q->serial_head, q->len);
- rw_exit_write(&q->write_lock);
- }
- rw_exit_write(&q->serial_lock);
-}
-
-int
-mpq_add(struct mpq *q, struct mpq_item *i)
-{
- rw_enter_write(&q->write_lock);
- if (MPQ_SERIAL_FULL(q)) {
- rw_exit_write(&q->write_lock);
- return 1;
- }
-
- q->items[q->tail].data = *i;
- q->items[q->tail].state = MPQ_ITEM_WAITING;
- INC_WRAP(q->tail, q->len);
- rw_exit_write(&q->write_lock);
- //wakeup_one(&q->parallel_head);
- task_add(q->taskq, &q->parallel_task);
- /*if (MPQ_SERIAL_LEN(q) != MPQ_PARALLEL_LEN(q))
- printf("%p - %zu %zu\n", q, MPQ_SERIAL_LEN(q), MPQ_PARALLEL_LEN(q));*/
- return 0;
+#define MPQ_GENERATE(name, type) \
+void \
+mpq_parallel_worker_##name(void *_q) \
+{ \
+ struct name *q = _q; \
+ struct mpq_queue_item_##name *i; \
+ while (1) { \
+ rw_enter_write(&q->write_lock); \
+ if (MPQ_PARALLEL_EMPTY(q)) { \
+ rw_exit_write(&q->write_lock); \
+ break; \
+ } \
+ \
+ i = &q->items[q->parallel_head]; \
+ assert(i->state == MPQ_ITEM_WAITING); \
+ \
+ i->state = MPQ_ITEM_PROCESSING; \
+ INC_WRAP(q->parallel_head, q->size); \
+ rw_exit_write(&q->write_lock); \
+ \
+ q->parallel(&i->data); \
+ \
+ rw_enter_write(&q->write_lock); \
+ i->state = MPQ_ITEM_DONE; \
+ rw_exit_write(&q->write_lock); \
+ } \
+ if (rw_status(&q->serial_lock) == 0) \
+ task_add(q->taskq, &q->serial_task); \
+} \
+\
+void \
+mpq_serial_worker_##name(void *_q) \
+{ \
+ struct name *q = _q; \
+ struct mpq_queue_item_##name *i; \
+ rw_enter_write(&q->serial_lock); \
+ while (1) { \
+ rw_enter_write(&q->write_lock); \
+ i = &q->items[q->serial_head]; \
+ if (i->state != MPQ_ITEM_DONE) { \
+ rw_exit_write(&q->write_lock); \
+ break; \
+ } \
+ rw_exit_write(&q->write_lock); \
+ \
+ q->serial(&i->data); \
+ \
+ rw_enter_write(&q->write_lock); \
+ i->state = MPQ_ITEM_NONE; \
+ INC_WRAP(q->serial_head, q->size); \
+ rw_exit_write(&q->write_lock); \
+ } \
+ rw_exit_write(&q->serial_lock); \
+} \
+\
+int \
+mpq_add_##name(struct name *q, type *d) \
+{ \
+ rw_enter_write(&(q)->write_lock); \
+ if (MPQ_SERIAL_FULL(q) || (q)->barrier) { \
+ rw_exit_write(&(q)->write_lock); \
+ return 1; \
+ } \
+ \
+ (q)->items[(q)->tail].data = *d; \
+ (q)->items[(q)->tail].state = MPQ_ITEM_WAITING; \
+ INC_WRAP((q)->tail, (q)->size); \
+ rw_exit_write(&(q)->write_lock); \
+ task_add((q)->taskq, &(q)->parallel_task); \
+ return 0; \
}
-void
-mpq_init(struct mpq *q, struct taskq *taskq, uint8_t nthreads, void (*parallel)(struct mpq_item *), void(*serial)(struct mpq_item *))
-{
- rw_init(&q->write_lock, "mpq_write");
- rw_init(&q->serial_lock, "mpq_serial");
- q->parallel = parallel;
- q->serial = serial;
- q->parallel_head = 0;
- q->serial_head = 0;
- q->taskq = taskq;
- q->tail = 0;
- q->len = 0;
- task_set(&q->parallel_task, mpq_parallel_worker, q);
- task_set(&q->serial_task, mpq_serial_worker, q);
-}
-
-int
-mpq_resize(struct mpq *q, size_t len)
-{
- q->len = len;
- q->items = mallocarray(q->len, sizeof(struct mpq_queue_item), M_DEVBUF, M_WAIT | M_ZERO);
- return 0;
-}
+#define MPQ_ADD(name, q, d) mpq_add_##name(q, d)
+
+#define MPQ_INIT(name, q, taskq_p, parallel_fn, serial_fn) do { \
+ rw_init(&(q)->write_lock, "mpq_write"); \
+ rw_init(&(q)->serial_lock, "mpq_serial"); \
+ (q)->parallel = parallel_fn; \
+ (q)->serial = serial_fn; \
+ (q)->parallel_head = 0; \
+ (q)->serial_head = 0; \
+ (q)->taskq = taskq_p; \
+ (q)->tail = 0; \
+ (q)->size = 0; \
+ (q)->barrier = 0; \
+ task_set(&(q)->parallel_task, mpq_parallel_worker_##name, (q)); \
+ task_set(&(q)->serial_task, mpq_serial_worker_##name, (q)); \
+} while (0)
+
+#define MPQ_BARRIER_ENTER(q) do { \
+ size_t i; \
+ rw_enter_write(&(q)->write_lock); \
+ (q)->barrier = 1; \
+ rw_exit_write(&(q)->write_lock); \
+ \
+ rw_enter_read(&(q)->write_lock); \
+ for(i = 0; i < (q)->size; i++) { \
+ if ((q)->items[i].state != MPQ_ITEM_NONE) { \
+ rw_exit_read(&(q)->write_lock); \
+ tsleep((q), PZERO, "mpq_barrier", 1); \
+ rw_enter_read(&(q)->write_lock); \
+ } \
+ } \
+ rw_exit_read(&(q)->write_lock); \
+} while (0)
+
+#define MPQ_BARRIER_LEAVE(q) do { \
+ rw_enter_write(&(q)->write_lock); \
+ (q)->barrier = 0; \
+ rw_exit_write(&(q)->write_lock); \
+} while (0)
+
+#define MPQ_BARRIER(q) do { \
+ MPQ_BARRIER_ENTER(q); \
+ MPQ_BARRIER_LEAVE(q); \
+} while (0)
+
+#define MPQ_DESTROY(q) do { \
+ MPQ_BARRIER_ENTER(q); \
+ free((q)->items, M_DEVBUF, 0); \
+} while (0)
+
+#define MPQ_SET_SIZE(name, q, size_v) do { \
+ MPQ_DESTROY(q); \
+ (q)->size = size_v; \
+ (q)->items = mallocarray((q)->size, sizeof(struct mpq_queue_item_##name), \
+ M_DEVBUF, M_WAIT | M_ZERO); \
+ MPQ_BARRIER_LEAVE(q); \
+} while (0)
-/*
-void
-mpq_destroy(struct mpq *q)
-{
- q->destroy = 1;
-}
-*/
#endif /* _MPQ_H_ */