summaryrefslogtreecommitdiffstats
path: root/src/mpq.h
diff options
context:
space:
mode:
authorMatt Dunwoodie <ncon@mail.noconroy.net>2019-06-22 18:34:47 +1000
committerMatt Dunwoodie <ncon@mail.noconroy.net>2019-06-22 18:34:47 +1000
commit73ad930cac82873cb5eb281bbdfad9ef1d7380c7 (patch)
tree263c0c11ccd755287e667e9cffdf72bd2fc87bb3 /src/mpq.h
parentstaging15 (diff)
downloadwireguard-openbsd-73ad930cac82873cb5eb281bbdfad9ef1d7380c7.tar.xz
wireguard-openbsd-73ad930cac82873cb5eb281bbdfad9ef1d7380c7.zip
staging16
Diffstat (limited to 'src/mpq.h')
-rw-r--r--src/mpq.h120
1 files changed, 71 insertions, 49 deletions
diff --git a/src/mpq.h b/src/mpq.h
index 106e7f18604..f47732b66fe 100644
--- a/src/mpq.h
+++ b/src/mpq.h
@@ -1,19 +1,24 @@
#ifndef _MPQ_H_
#define _MPQ_H_
-#include <sys/mutex.h>
-#include <sys/kthread.h>
+#include <sys/kernel.h>
+#include <sys/rwlock.h>
+#include <sys/task.h>
struct mpq_item {
struct wg_pkt pkt;
};
struct mpq {
- struct rwlock lock;
- int destroy;
+ struct rwlock write_lock;
+ struct rwlock serial_lock;
+ struct taskq *taskq;
+ struct task parallel_task, serial_task;
+
void (*parallel)(struct mpq_item *);
void (*serial)(struct mpq_item *);
size_t parallel_head, serial_head, tail, len;
+
struct mpq_queue_item {
enum mpq_item_state {
MPQ_ITEM_NONE = 0,
@@ -50,7 +55,7 @@ struct mpq {
#define MPQ_PARALLEL_EMPTY(q) (MPQ_PARALLEL_LEN(q) == 0)
-void mpq_init(struct mpq *, int ipl, uint8_t, void (*)(struct mpq_item *),
+void mpq_init(struct mpq *, struct taskq *, uint8_t, void (*)(struct mpq_item *),
void (*)(struct mpq_item *));
int mpq_resize(struct mpq *, size_t);
int mpq_add(struct mpq *, struct mpq_item *);
@@ -62,27 +67,33 @@ mpq_parallel_worker(void *_q)
struct mpq *q = _q;
struct mpq_queue_item *i;
while (1) {
+ rw_enter_write(&q->write_lock);
+ if (MPQ_PARALLEL_EMPTY(q)) {
+ rw_exit_write(&q->write_lock);
+ /*
+ if (tsleep(&q->parallel_head, PZERO, "wait", hz/100) == EWOULDBLOCK)
+ break;
+ else
+ continue;
+ */
+ break;
+ }
- rw_enter_write(&q->lock);
i = &q->items[q->parallel_head];
- if (i->state == MPQ_ITEM_WAITING) {
- i->state = MPQ_ITEM_PROCESSING;
- INC_WRAP(q->parallel_head, q->len);
- rw_exit_write(&q->lock);
-
- q->parallel(&i->data);
-
- rw_enter_write(&q->lock);
- i->state = MPQ_ITEM_DONE;
- rw_exit_write(&q->lock);
-
- wakeup(&q->parallel_head);
- } else {
- rw_exit_write(&q->lock);
- if (MPQ_PARALLEL_EMPTY(q))
- tsleep(&q->tail, PZERO, "bored", 0);
- }
+ assert(i->state == MPQ_ITEM_WAITING);
+
+ i->state = MPQ_ITEM_PROCESSING;
+ INC_WRAP(q->parallel_head, q->len);
+ rw_exit_write(&q->write_lock);
+
+ q->parallel(&i->data);
+
+ rw_enter_write(&q->write_lock);
+ i->state = MPQ_ITEM_DONE;
+ rw_exit_write(&q->write_lock);
}
+ task_add(q->taskq, &q->serial_task);
+ //wakeup_one(&q->serial_head);
}
void
@@ -90,58 +101,67 @@ mpq_serial_worker(void *_q)
{
struct mpq *q = _q;
struct mpq_queue_item *i;
+ rw_enter_write(&q->serial_lock);
while (1) {
- rw_enter_write(&q->lock);
- if (q->items[q->serial_head].state == MPQ_ITEM_DONE) {
- i = &q->items[q->serial_head];
- rw_exit_write(&q->lock);
-
- q->serial(&i->data);
-
- rw_enter_write(&q->lock);
- i->state = MPQ_ITEM_NONE;
- INC_WRAP(q->serial_head, q->len);
- rw_exit_write(&q->lock);
- } else {
- rw_exit_write(&q->lock);
- if (MPQ_SERIAL_EMPTY(q))
- tsleep(&q->parallel_head, PZERO, "bored", 0);
+
+ rw_enter_write(&q->write_lock);
+ i = &q->items[q->serial_head];
+ if (i->state != MPQ_ITEM_DONE) {
+ rw_exit_write(&q->write_lock);
+ /*
+ if (tsleep(&q->serial_head, PZERO, "wait", hz/100) == EWOULDBLOCK)
+ break;
+ else
+ continue;
+ */
+ break;
}
+ rw_exit_write(&q->write_lock);
+
+ q->serial(&i->data);
+
+ rw_enter_write(&q->write_lock);
+ i->state = MPQ_ITEM_NONE;
+ INC_WRAP(q->serial_head, q->len);
+ rw_exit_write(&q->write_lock);
}
+ rw_exit_write(&q->serial_lock);
}
int
mpq_add(struct mpq *q, struct mpq_item *i)
{
- rw_enter_write(&q->lock);
+ rw_enter_write(&q->write_lock);
if (MPQ_SERIAL_FULL(q)) {
- rw_exit_write(&q->lock);
+ rw_exit_write(&q->write_lock);
return 1;
}
q->items[q->tail].data = *i;
q->items[q->tail].state = MPQ_ITEM_WAITING;
INC_WRAP(q->tail, q->len);
- rw_exit_write(&q->lock);
- wakeup(&q->tail);
+ rw_exit_write(&q->write_lock);
+ //wakeup_one(&q->parallel_head);
+ task_add(q->taskq, &q->parallel_task);
+ /*if (MPQ_SERIAL_LEN(q) != MPQ_PARALLEL_LEN(q))
+ printf("%p - %zu %zu\n", q, MPQ_SERIAL_LEN(q), MPQ_PARALLEL_LEN(q));*/
return 0;
}
void
-mpq_init(struct mpq *q, struct taskq, uint8_t nthreads, void (*parallel)(struct mpq_item *), void(*serial)(struct mpq_item *))
+mpq_init(struct mpq *q, struct taskq *taskq, uint8_t nthreads, void (*parallel)(struct mpq_item *), void(*serial)(struct mpq_item *))
{
- rw_init(&q->lock, "test");
+ rw_init(&q->write_lock, "mpq_write");
+ rw_init(&q->serial_lock, "mpq_serial");
q->parallel = parallel;
q->serial = serial;
- q->destroy = 0;
q->parallel_head = 0;
q->serial_head = 0;
+ q->taskq = taskq;
q->tail = 0;
q->len = 0;
-
- kthread_create(mpq_serial_worker, q, NULL, "mpq_serial");
- while(nthreads--)
- kthread_create(mpq_parallel_worker, q, NULL, "mpq_parallel");
+ task_set(&q->parallel_task, mpq_parallel_worker, q);
+ task_set(&q->serial_task, mpq_serial_worker, q);
}
int
@@ -152,10 +172,12 @@ mpq_resize(struct mpq *q, size_t len)
return 0;
}
+/*
void
mpq_destroy(struct mpq *q)
{
q->destroy = 1;
}
+*/
#endif /* _MPQ_H_ */