summaryrefslogtreecommitdiffstats
path: root/src/mpq.h
diff options
context:
space:
mode:
authorMatt Dunwoodie <ncon@mail.noconroy.net>2019-06-21 03:44:52 +1000
committerMatt Dunwoodie <ncon@mail.noconroy.net>2019-06-21 03:44:52 +1000
commit873a2ffea7575b548b43b6d2df3f7a5d25202a7f (patch)
tree480af6c20cd8b4a8e8f863e94d9a297efc1a8e27 /src/mpq.h
parentstaging14 (diff)
downloadwireguard-openbsd-873a2ffea7575b548b43b6d2df3f7a5d25202a7f.tar.xz
wireguard-openbsd-873a2ffea7575b548b43b6d2df3f7a5d25202a7f.zip
staging15
Diffstat (limited to 'src/mpq.h')
-rw-r--r--src/mpq.h77
1 files changed, 39 insertions, 38 deletions
diff --git a/src/mpq.h b/src/mpq.h
index 5a861ca08b5..106e7f18604 100644
--- a/src/mpq.h
+++ b/src/mpq.h
@@ -9,7 +9,7 @@ struct mpq_item {
};
struct mpq {
- struct mutex mtx;
+ struct rwlock lock;
int destroy;
void (*parallel)(struct mpq_item *);
void (*serial)(struct mpq_item *);
@@ -62,28 +62,26 @@ mpq_parallel_worker(void *_q)
struct mpq *q = _q;
struct mpq_queue_item *i;
while (1) {
- while (MPQ_PARALLEL_EMPTY(q)) {
- tsleep(&q->tail, PZERO, "bored", 0);
- }
- mtx_enter(&q->mtx);
+ rw_enter_write(&q->lock);
i = &q->items[q->parallel_head];
- if (i->state != MPQ_ITEM_WAITING) {
- mtx_leave(&q->mtx);
- continue;
+ if (i->state == MPQ_ITEM_WAITING) {
+ i->state = MPQ_ITEM_PROCESSING;
+ INC_WRAP(q->parallel_head, q->len);
+ rw_exit_write(&q->lock);
+
+ q->parallel(&i->data);
+
+ rw_enter_write(&q->lock);
+ i->state = MPQ_ITEM_DONE;
+ rw_exit_write(&q->lock);
+
+ wakeup(&q->parallel_head);
+ } else {
+ rw_exit_write(&q->lock);
+ if (MPQ_PARALLEL_EMPTY(q))
+ tsleep(&q->tail, PZERO, "bored", 0);
}
-
- INC_WRAP(q->parallel_head, q->len);
- i->state = MPQ_ITEM_PROCESSING;
- mtx_leave(&q->mtx);
-
- q->parallel(&i->data);
-
- mtx_enter(&q->mtx);
- i->state = MPQ_ITEM_DONE;
- mtx_leave(&q->mtx);
-
- wakeup_one(&q->parallel_head);
}
}
@@ -93,43 +91,46 @@ mpq_serial_worker(void *_q)
struct mpq *q = _q;
struct mpq_queue_item *i;
while (1) {
- while (q->items[q->serial_head].state != MPQ_ITEM_DONE) {
- tsleep(&q->parallel_head, PZERO, "bored", 0);
+ rw_enter_write(&q->lock);
+ if (q->items[q->serial_head].state == MPQ_ITEM_DONE) {
+ i = &q->items[q->serial_head];
+ rw_exit_write(&q->lock);
+
+ q->serial(&i->data);
+
+ rw_enter_write(&q->lock);
+ i->state = MPQ_ITEM_NONE;
+ INC_WRAP(q->serial_head, q->len);
+ rw_exit_write(&q->lock);
+ } else {
+ rw_exit_write(&q->lock);
+ if (MPQ_SERIAL_EMPTY(q))
+ tsleep(&q->parallel_head, PZERO, "bored", 0);
}
-
- i = &q->items[q->serial_head];
-
- q->serial(&i->data);
-
- INC_WRAP(q->serial_head, q->len);
-
- mtx_enter(&q->mtx);
- i->state = MPQ_ITEM_NONE;
- mtx_leave(&q->mtx);
}
}
int
mpq_add(struct mpq *q, struct mpq_item *i)
{
- mtx_enter(&q->mtx);
+ rw_enter_write(&q->lock);
if (MPQ_SERIAL_FULL(q)) {
- mtx_leave(&q->mtx);
+ rw_exit_write(&q->lock);
return 1;
}
q->items[q->tail].data = *i;
q->items[q->tail].state = MPQ_ITEM_WAITING;
INC_WRAP(q->tail, q->len);
- mtx_leave(&q->mtx);
- wakeup_one(&q->tail);
+ rw_exit_write(&q->lock);
+ wakeup(&q->tail);
return 0;
}
void
-mpq_init(struct mpq *q, int ipl, uint8_t nthreads, void (*parallel)(struct mpq_item *), void(*serial)(struct mpq_item *))
+mpq_init(struct mpq *q, struct taskq, uint8_t nthreads, void (*parallel)(struct mpq_item *), void(*serial)(struct mpq_item *))
{
- mtx_init(&q->mtx, ipl);
+ rw_init(&q->lock, "test");
q->parallel = parallel;
q->serial = serial;
q->destroy = 0;