summaryrefslogtreecommitdiffstats
path: root/src/mpq.h
diff options
context:
space:
mode:
authorMatt Dunwoodie <ncon@mail.noconroy.net>2019-08-11 11:45:58 +1000
committerMatt Dunwoodie <ncon@mail.noconroy.net>2019-08-11 11:45:58 +1000
commite95417532c0b8cd84b948853a56972b000d3de69 (patch)
tree8f72f2febdefdc262175c350e2fc25b9d2204a8c /src/mpq.h
parentAdd INET6 conditionals (diff)
downloadwireguard-openbsd-e95417532c0b8cd84b948853a56972b000d3de69.tar.xz
wireguard-openbsd-e95417532c0b8cd84b948853a56972b000d3de69.zip
Update mpq.h
Diffstat (limited to 'src/mpq.h')
-rw-r--r--src/mpq.h66
1 files changed, 53 insertions, 13 deletions
diff --git a/src/mpq.h b/src/mpq.h
index d90d082efd4..7b7549cfdb3 100644
--- a/src/mpq.h
+++ b/src/mpq.h
@@ -23,6 +23,7 @@
#include <sys/systm.h>
struct mpq {
+ uint mpq_sactive;
size_t mpq_maxlen;
size_t mpq_nthreads;
struct mutex mpq_mtx;
@@ -47,8 +48,19 @@ _mpq_thread(struct mpq *mpq)
struct mbuf *mc;
while (1) {
+ /* dequeue */
mtx_enter(&mpq->mpq_mtx);
if ((mc = mpq->mpq_cursor) == NULL) {
+
+ /* check if mpq is dead */
+ if (mpq->mpq_parallel_fn == NULL) {
+ mpq->mpq_nthreads--;
+ mtx_leave(&mpq->mpq_mtx);
+ wakeup(&mpq->mpq_nthreads);
+ kthread_exit(0);
+ }
+
+ /* sleep */
mtx_leave(&mpq->mpq_mtx);
tsleep(&mpq->mpq_cursor, PSOCK, "bored", 0);
continue;
@@ -56,10 +68,19 @@ _mpq_thread(struct mpq *mpq)
mpq->mpq_cursor = mc->m_nextpkt;
mtx_leave(&mpq->mpq_mtx);
- mc->m_next = mpq->mpq_parallel_fn(mc->m_next);
+ /* run time consuming function in parallel */
+ if (mpq->mpq_parallel_fn != NULL && mc->m_next != NULL)
+ mc->m_next = mpq->mpq_parallel_fn(mc->m_next);
+
+ /* signal packet processed */
mc->m_flags |= M_PROTO1;
+ if (atomic_cas_uint(&mpq->mpq_sactive, 0, 1) == 1)
+ continue;
+
+ /* run function in queue order */
while (1) {
+ /* check if head is ready to process */
mtx_enter(&mpq->mpq_mtx);
mc = mpq->mpq_list.ml_head;
if (mc == NULL || ~mc->m_flags & M_PROTO1) {
@@ -69,6 +90,7 @@ _mpq_thread(struct mpq *mpq)
mc = ml_dequeue(&mpq->mpq_list);
mtx_leave(&mpq->mpq_mtx);
+ /* process */
if (mpq->mpq_serial_fn != NULL && mc->m_next != NULL) {
mpq->mpq_serial_fn(mc->m_next);
mc->m_next = NULL;
@@ -76,6 +98,8 @@ _mpq_thread(struct mpq *mpq)
m_freem(mc);
}
+
+ atomic_swap_uint(&mpq->mpq_sactive, 0);
}
}
@@ -83,6 +107,9 @@ _mpq_thread(struct mpq *mpq)
void
mpq_init(struct mpq *mpq, int ipl, struct mbuf *(*parallel)(struct mbuf *), void (*serial)(struct mbuf *))
{
+ assert(parallel != NULL);
+
+ mpq->mpq_sactive = 0;
mpq->mpq_maxlen = MPQ_DEFAULTLEN;
mtx_init(&mpq->mpq_mtx, ipl);
ml_init(&mpq->mpq_list);
@@ -91,33 +118,46 @@ mpq_init(struct mpq *mpq, int ipl, struct mbuf *(*parallel)(struct mbuf *), void
mpq->mpq_serial_fn = serial;
for(mpq->mpq_nthreads = 0; mpq->mpq_nthreads < ncpus; mpq->mpq_nthreads++)
- kthread_create((void (*)(void *))_mpq_thread, mpq, NULL, "wireguard");
+ kthread_create((void (*)(void *))_mpq_thread, mpq, NULL, "wg");
}
void
mpq_barrier(struct mpq *mpq)
{
- struct mbuf *m = m_gethdr(M_WAITOK, MT_CONTROL);
- m->m_pkthdr.ph_cookie = m;
- mpq_enqueue(mpq, m);
- while (m->m_pkthdr.ph_cookie == m)
- tsleep(m, PWAIT, "wireguard_barrier", 0);
- m_freem(m);
+ size_t maxlen, len;
+ mtx_enter(&mpq->mpq_mtx);
+ maxlen = mpq->mpq_maxlen;
+ mpq->mpq_maxlen = 0;
+ mtx_leave(&mpq->mpq_mtx);
+
+ do {
+ mtx_enter(&mpq->mpq_mtx);
+ len = ml_len(&mpq->mpq_list);
+ mtx_leave(&mpq->mpq_mtx);
+ tsleep(&mpq->mpq_list, PWAIT, "wg_barrier", 1);
+ } while (len > 0);
+
+ mtx_enter(&mpq->mpq_mtx);
+ mpq->mpq_maxlen = maxlen;
+ mtx_leave(&mpq->mpq_mtx);
}
void
mpq_destroy(struct mpq *mpq)
{
- struct mbuf *m = m_gethdr(M_WAITOK, MT_CONTROL);
- m->m_pkthdr.ph_cookie = m + 1;
- mpq_enqueue(mpq, m);
+ mpq->mpq_serial_fn = NULL;
+ mpq->mpq_parallel_fn = NULL;
+ wakeup(&mpq->mpq_cursor);
+ /* TODO timo */
+ while (mpq->mpq_nthreads > 0)
+ tsleep(&mpq->mpq_nthreads, PWAIT, "wg_destroy", 0);
}
int
mpq_enqueue(struct mpq *mpq, struct mbuf *m)
{
int dropped = 0;
- struct mbuf *mc = m_gethdr(M_WAITOK, MT_CONTROL);
+ struct mbuf *mc = m_get(M_WAITOK, MT_CONTROL);
mc->m_next = m;
mtx_enter(&mpq->mpq_mtx);
@@ -141,7 +181,7 @@ mpq_enqueue(struct mpq *mpq, struct mbuf *m)
int
mpq_load(struct mpq *mpq)
{
- return 0;
+ return 100 * ml_len(&mpq->mpq_list) / mpq->mpq_maxlen;
}
#endif /* _MPQ_H_ */