summaryrefslogtreecommitdiffstats
path: root/src/mpq.h
diff options
context:
space:
mode:
authorMatt Dunwoodie <ncon@mail.noconroy.net>2019-08-10 18:46:42 +1000
committerMatt Dunwoodie <ncon@mail.noconroy.net>2019-08-10 18:46:42 +1000
commit106d726e5c0c8ba7b02d379ce115b61dd59cf53f (patch)
treeb81c9f578c575473e1fdc3edcc346048d1291721 /src/mpq.h
parentReturn fake values for private keys (diff)
downloadwireguard-openbsd-106d726e5c0c8ba7b02d379ce115b61dd59cf53f.tar.xz
wireguard-openbsd-106d726e5c0c8ba7b02d379ce115b61dd59cf53f.zip
Update mpq.h for multithreading
Diffstat (limited to 'src/mpq.h')
-rw-r--r--src/mpq.h98
1 files changed, 65 insertions, 33 deletions
diff --git a/src/mpq.h b/src/mpq.h
index 4aabf351b29..d90d082efd4 100644
--- a/src/mpq.h
+++ b/src/mpq.h
@@ -23,9 +23,13 @@
#include <sys/systm.h>
struct mpq {
- struct mbuf_queue mpq_mq;
- struct mbuf *(*mpq_parallel)(struct mbuf *);
- void (*mpq_serial)(struct mbuf *);
+ size_t mpq_maxlen;
+ size_t mpq_nthreads;
+ struct mutex mpq_mtx;
+ struct mbuf *mpq_cursor;
+ struct mbuf_list mpq_list;
+ struct mbuf *(*mpq_parallel_fn)(struct mbuf *);
+ void (*mpq_serial_fn)(struct mbuf *);
};
void mpq_init(struct mpq *, int, struct mbuf *(*)(struct mbuf *), void (*)(struct mbuf *));
@@ -34,36 +38,44 @@ void mpq_destroy(struct mpq *);
int mpq_enqueue(struct mpq *, struct mbuf *);
int mpq_load(struct mpq *);
-#define MPQ_DEFAULTLEN 128
+#define MPQ_DEFAULTLEN 1024
/* Internal functions */
void
-_mpq_thread(void *_mpq)
+_mpq_thread(struct mpq *mpq)
{
- struct mbuf *m;
- struct mpq *mpq = _mpq;
+ struct mbuf *mc;
while (1) {
- if ((m = mq_dequeue(&mpq->mpq_mq)) == NULL) {
- tsleep(&mpq->mpq_mq, PWAIT, "bored", 0);
+ mtx_enter(&mpq->mpq_mtx);
+ if ((mc = mpq->mpq_cursor) == NULL) {
+ mtx_leave(&mpq->mpq_mtx);
+ tsleep(&mpq->mpq_cursor, PSOCK, "bored", 0);
continue;
}
-
- if (m->m_pkthdr.ph_cookie == m) {
- m->m_pkthdr.ph_cookie = NULL;
- wakeup(m);
- continue;
+ mpq->mpq_cursor = mc->m_nextpkt;
+ mtx_leave(&mpq->mpq_mtx);
+
+ mc->m_next = mpq->mpq_parallel_fn(mc->m_next);
+ mc->m_flags |= M_PROTO1;
+
+ while (1) {
+ mtx_enter(&mpq->mpq_mtx);
+ mc = mpq->mpq_list.ml_head;
+ if (mc == NULL || ~mc->m_flags & M_PROTO1) {
+ mtx_leave(&mpq->mpq_mtx);
+ break;
+ }
+ mc = ml_dequeue(&mpq->mpq_list);
+ mtx_leave(&mpq->mpq_mtx);
+
+ if (mpq->mpq_serial_fn != NULL && mc->m_next != NULL) {
+ mpq->mpq_serial_fn(mc->m_next);
+ mc->m_next = NULL;
+ }
+
+ m_freem(mc);
}
-
- if (m->m_pkthdr.ph_cookie == m + 1) {
- m_freem(m);
- kthread_exit(0);
- }
-
- m = mpq->mpq_parallel(m);
-
- if (m != NULL && mpq->mpq_serial != NULL)
- mpq->mpq_serial(m);
}
}
@@ -71,10 +83,15 @@ _mpq_thread(void *_mpq)
void
mpq_init(struct mpq *mpq, int ipl, struct mbuf *(*parallel)(struct mbuf *), void (*serial)(struct mbuf *))
{
- mq_init(&mpq->mpq_mq, MPQ_DEFAULTLEN, ipl);
- mpq->mpq_parallel = parallel;
- mpq->mpq_serial = serial;
- kthread_create(_mpq_thread, mpq, NULL, "wireguard");
+ mpq->mpq_maxlen = MPQ_DEFAULTLEN;
+ mtx_init(&mpq->mpq_mtx, ipl);
+ ml_init(&mpq->mpq_list);
+ mpq->mpq_cursor = NULL;
+ mpq->mpq_parallel_fn = parallel;
+ mpq->mpq_serial_fn = serial;
+
+ for(mpq->mpq_nthreads = 0; mpq->mpq_nthreads < ncpus; mpq->mpq_nthreads++)
+ kthread_create((void (*)(void *))_mpq_thread, mpq, NULL, "wireguard");
}
void
@@ -99,11 +116,26 @@ mpq_destroy(struct mpq *mpq)
int
mpq_enqueue(struct mpq *mpq, struct mbuf *m)
{
- int ret;
- m->m_flags &= ~M_PROTO1;
- ret = mq_enqueue(&mpq->mpq_mq, m);
- wakeup(&mpq->mpq_mq);
- return ret;
+ int dropped = 0;
+ struct mbuf *mc = m_gethdr(M_WAITOK, MT_CONTROL);
+ mc->m_next = m;
+
+ mtx_enter(&mpq->mpq_mtx);
+ if (ml_len(&mpq->mpq_list) < mpq->mpq_maxlen) {
+ ml_enqueue(&mpq->mpq_list, mc);
+ if (mpq->mpq_cursor == NULL)
+ mpq->mpq_cursor = mc;
+ } else {
+ dropped = 1;
+ }
+ mtx_leave(&mpq->mpq_mtx);
+
+ if (dropped)
+ m_freem(mc);
+ else
+ wakeup_one(&mpq->mpq_cursor);
+
+ return dropped;
}
int