summaryrefslogtreecommitdiffstats
path: root/src/mpq.h
diff options
context:
space:
mode:
authorMatt Dunwoodie <ncon@mail.noconroy.net>2019-09-08 18:08:34 +1000
committerMatt Dunwoodie <ncon@mail.noconroy.net>2019-09-08 23:01:43 +1000
commit894e458e20c3443f911463842242ed5126f459c5 (patch)
tree06cef4773ab65ec44fe69c80bdeb3fec9849dd4f /src/mpq.h
parentUppercase macros in fixedmap.h (diff)
downloadwireguard-openbsd-894e458e20c3443f911463842242ed5126f459c5.tar.xz
wireguard-openbsd-894e458e20c3443f911463842242ed5126f459c5.zip
Make mpq.h generic, not specific to mbuf
Diffstat (limited to 'src/mpq.h')
-rw-r--r--src/mpq.h246
1 files changed, 150 insertions, 96 deletions
diff --git a/src/mpq.h b/src/mpq.h
index f32bf7571fb..de9a386230d 100644
--- a/src/mpq.h
+++ b/src/mpq.h
@@ -19,169 +19,223 @@
#include <sys/param.h>
#include <sys/kthread.h>
-#include <sys/mbuf.h>
+#include <sys/mutex.h>
#include <sys/systm.h>
struct mpq {
- uint mpq_sactive;
- size_t mpq_maxlen;
- size_t mpq_nthreads;
struct mutex mpq_mtx;
- struct mbuf *mpq_cursor;
- struct mbuf_list mpq_list;
- struct mbuf *(*mpq_parallel_fn)(struct mbuf *);
- void (*mpq_serial_fn)(struct mbuf *);
+ size_t mpq_size, mpq_itemssize, mpq_phead, mpq_shead, mpq_tail;
+ size_t mpq_nthreads, mpq_wantthreads;
+ void *(*mpq_parallel_fn)(void *);
+ void (*mpq_serial_fn)(void *);
+ struct mpi {
+ enum mpi_state {
+ MPI_EMPTY = 0,
+ MPI_BARRIER,
+ MPI_WAITING,
+ MPI_DEAD,
+ MPI_OK,
+ } mpi_state;
+ void *mpi_data;
+ } *mpq_items;
};
-void mpq_init(struct mpq *, int, int, struct mbuf *(*)(struct mbuf *), void (*)(struct mbuf *));
+void mpq_init(struct mpq *, int, void *(*)(void *), void (*)(void *));
+size_t mpq_size(struct mpq *, size_t);
+void mpq_nthreads(struct mpq *, size_t);
void mpq_barrier(struct mpq *);
void mpq_destroy(struct mpq *);
-int mpq_enqueue(struct mpq *, struct mbuf *);
-int mpq_load(struct mpq *);
+int mpq_enqueue(struct mpq *, void *);
+
+#define MPQ_DEFAULT_SIZE 256
+#define MPQ_DEFAULT_THREADS ncpus
+
+#define MPQ_ITEM(mpq, item) (&(mpq)->mpq_items[(mpq)->item])
+#define MPQ_INC(mpq, item) ((mpq)->item = ((mpq)->item + 1) % (mpq)->mpq_itemssize)
+#define MPQ_LOAD(mpq, item) ((mpq)->item <= (mpq)->mpq_tail ? \
+ (mpq)->mpq_tail - (mpq)->item : (mpq)->mpq_itemssize - (mpq)->item + (mpq)->mpq_tail)
-#define MPQ_DEFAULTLEN 1024
/* Internal functions */
void
_mpq_thread(struct mpq *mpq)
{
- struct mbuf *mc;
+ void *data;
+ size_t tid;
+ struct mpi *mpi;
+ void (*serial_fn)(void *);
+ mtx_enter(&mpq->mpq_mtx);
+ tid = mpq->mpq_nthreads++;
while (1) {
- /* dequeue */
- mtx_enter(&mpq->mpq_mtx);
- if ((mc = mpq->mpq_cursor) == NULL) {
-
- /* check if mpq is dead */
- if (mpq->mpq_parallel_fn == NULL) {
- mpq->mpq_nthreads--;
- mtx_leave(&mpq->mpq_mtx);
- wakeup(&mpq->mpq_nthreads);
- kthread_exit(0);
- }
-
- /* sleep */
+ if (tid >= mpq->mpq_wantthreads) {
mtx_leave(&mpq->mpq_mtx);
- tsleep(&mpq->mpq_cursor, PSOCK, "bored", 0);
- continue;
+ kthread_exit(0);
}
- mpq->mpq_cursor = mc->m_nextpkt;
- mtx_leave(&mpq->mpq_mtx);
- /* run time consuming function in parallel */
- if (mpq->mpq_parallel_fn != NULL && mc->m_next != NULL)
- mc->m_next = mpq->mpq_parallel_fn(mc->m_next);
+ while (MPQ_LOAD(mpq, mpq_phead) > 0) {
- /* signal packet processed */
- mc->m_flags |= M_PROTO1;
+ mpi = MPQ_ITEM(mpq, mpq_phead);
+ MPQ_INC(mpq, mpq_phead);
- if (atomic_cas_uint(&mpq->mpq_sactive, 0, 1) == 1)
- continue;
+ if (mpi->mpi_state == MPI_BARRIER)
+ continue;
- /* run function in queue order */
- while (1) {
- /* check if head is ready to process */
+ if (mpi->mpi_state != MPI_WAITING)
+ panic("invalid state: %d", mpi->mpi_state);
+
+ mtx_leave(&mpq->mpq_mtx);
+ data = mpq->mpq_parallel_fn(mpi->mpi_data);
mtx_enter(&mpq->mpq_mtx);
- mc = mpq->mpq_list.ml_head;
- if (mc == NULL || ~mc->m_flags & M_PROTO1) {
- mtx_leave(&mpq->mpq_mtx);
+ mpi->mpi_data = data;
+ mpi->mpi_state = (data == NULL) ? MPI_DEAD : MPI_OK;
+ }
+
+ while (MPQ_LOAD(mpq, mpq_shead) > 0) {
+
+ mpi = MPQ_ITEM(mpq, mpq_shead);
+ if (mpi->mpi_state == MPI_WAITING)
break;
- }
- mc = ml_dequeue(&mpq->mpq_list);
- mtx_leave(&mpq->mpq_mtx);
- /* process */
- if (mpq->mpq_serial_fn != NULL && mc->m_next != NULL) {
- mpq->mpq_serial_fn(mc->m_next);
- mc->m_next = NULL;
+ if (mpi->mpi_state == MPI_OK) {
+ if (mpq->mpq_serial_fn == NULL)
+ break;
+ serial_fn = mpq->mpq_serial_fn;
+ mpq->mpq_serial_fn = NULL;
+
+ mtx_leave(&mpq->mpq_mtx);
+ serial_fn(mpi->mpi_data);
+ mtx_enter(&mpq->mpq_mtx);
+
+ mpq->mpq_serial_fn = serial_fn;
+ } else if (mpi->mpi_state == MPI_BARRIER) {
+ *(int *) mpi->mpi_data = 0;
+ wakeup(mpi->mpi_data);
+ } else if (mpi->mpi_state == MPI_DEAD) {
+ /* pass */
+ } else {
+ panic("invalid state: %d", mpi->mpi_state);
}
- m_freem(mc);
+ mpi->mpi_state = MPI_EMPTY;
+ MPQ_INC(mpq, mpq_shead);
}
- atomic_swap_uint(&mpq->mpq_sactive, 0);
+ msleep(&mpq->mpq_phead, &mpq->mpq_mtx, PWAIT, "bored", 0);
}
}
/* External functions */
void
-mpq_init(struct mpq *mpq, int ipl, int maxlen, struct mbuf *(*parallel)(struct mbuf *), void (*serial)(struct mbuf *))
+mpq_init(struct mpq *mpq, int ipl, void *(*parallel)(void *), void (*serial)(void *))
{
- assert(parallel != NULL);
-
- mpq->mpq_sactive = 0;
- mpq->mpq_maxlen = maxlen;
+ bzero(mpq, sizeof(*mpq));
mtx_init(&mpq->mpq_mtx, ipl);
- ml_init(&mpq->mpq_list);
- mpq->mpq_cursor = NULL;
mpq->mpq_parallel_fn = parallel;
mpq->mpq_serial_fn = serial;
- for(mpq->mpq_nthreads = 0; mpq->mpq_nthreads < ncpus; mpq->mpq_nthreads++)
- kthread_create((void (*)(void *))_mpq_thread, mpq, NULL, "wg");
+ mpq_size(mpq, MPQ_DEFAULT_SIZE);
+ mpq_nthreads(mpq, MPQ_DEFAULT_THREADS);
}
-void
-mpq_barrier(struct mpq *mpq)
+size_t
+mpq_size(struct mpq *mpq, size_t size)
{
- size_t maxlen, len;
+ size_t oldsize;
+ struct mpi *mpi, *tmp;
+
+ mpi = mallocarray(size, sizeof(struct mpi), M_DEVBUF, M_WAITOK);
+
mtx_enter(&mpq->mpq_mtx);
- maxlen = mpq->mpq_maxlen;
- mpq->mpq_maxlen = 0;
+
+ if (size > mpq->mpq_itemssize) {
+
+ bzero(mpi, size * sizeof(struct mpi));
+
+ if (mpq->mpq_items != NULL) {
+ memcpy(mpi, mpq->mpq_items, mpq->mpq_itemssize * sizeof(struct mpi));
+ free(mpq->mpq_items, M_DEVBUF, 0);
+ }
+
+ mpq->mpq_itemssize = size;
+ tmp = mpq->mpq_items;
+ mpq->mpq_items = mpi;
+ mpi = tmp;
+ }
+
+ oldsize = mpq->mpq_size;
+ mpq->mpq_size = size;
+
mtx_leave(&mpq->mpq_mtx);
- do {
- mtx_enter(&mpq->mpq_mtx);
- len = ml_len(&mpq->mpq_list);
- mtx_leave(&mpq->mpq_mtx);
- tsleep(&mpq->mpq_list, PWAIT, "wg_barrier", 1);
- } while (len > 0);
+ free(mpi, M_DEVBUF, 0);
+
+ return oldsize;
+}
+void
+mpq_nthreads(struct mpq *mpq, size_t nthreads)
+{
+ size_t old_nthreads;
mtx_enter(&mpq->mpq_mtx);
- mpq->mpq_maxlen = maxlen;
+ old_nthreads = mpq->mpq_wantthreads;
+ mpq->mpq_wantthreads = nthreads;
mtx_leave(&mpq->mpq_mtx);
+ while (old_nthreads++ < nthreads)
+ kthread_create((void (*)(void *))_mpq_thread, mpq, NULL, "wg");
+}
+
+void
+mpq_barrier(struct mpq *mpq)
+{
+ int b = 1;
+ mtx_enter(&mpq->mpq_mtx);
+ while (1) {
+ if (MPQ_ITEM(mpq, mpq_tail)->mpi_state == MPI_EMPTY) {
+ MPQ_ITEM(mpq, mpq_tail)->mpi_data = &b;
+ MPQ_ITEM(mpq, mpq_tail)->mpi_state = MPI_BARRIER;
+ MPQ_INC(mpq, mpq_tail);
+ break;
+ }
+ msleep(&b, &mpq->mpq_mtx, PWAIT, "mpq_barrier", 1);
+ }
+ mtx_leave(&mpq->mpq_mtx);
+ wakeup_one(&mpq->mpq_phead);
+
+ while (b)
+ tsleep(&b, PWAIT, "mpq_barrier", 1);
}
void
mpq_destroy(struct mpq *mpq)
{
- mpq->mpq_serial_fn = NULL;
- mpq->mpq_parallel_fn = NULL;
- wakeup(&mpq->mpq_cursor);
- /* TODO timo */
- while (mpq->mpq_nthreads > 0)
- tsleep(&mpq->mpq_nthreads, PWAIT, "wg_destroy", 0);
+ mpq_size(mpq, 0);
+ mpq_barrier(mpq);
+ mpq_nthreads(mpq, 0);
}
int
-mpq_enqueue(struct mpq *mpq, struct mbuf *m)
+mpq_enqueue(struct mpq *mpq, void *data)
{
int dropped = 0;
- struct mbuf *mc = m_get(M_WAITOK, MT_CONTROL);
- mc->m_next = m;
mtx_enter(&mpq->mpq_mtx);
- if (ml_len(&mpq->mpq_list) < mpq->mpq_maxlen) {
- ml_enqueue(&mpq->mpq_list, mc);
- if (mpq->mpq_cursor == NULL)
- mpq->mpq_cursor = mc;
+ if (MPQ_LOAD(mpq, mpq_shead) < mpq->mpq_size - 1) {
+ if (MPQ_ITEM(mpq, mpq_tail)->mpi_state != MPI_EMPTY)
+ panic("invalid state: %d", MPQ_ITEM(mpq, mpq_tail)->mpi_state);
+ assert(MPQ_ITEM(mpq, mpq_tail)->mpi_state == MPI_EMPTY);
+ MPQ_ITEM(mpq, mpq_tail)->mpi_data = data;
+ MPQ_ITEM(mpq, mpq_tail)->mpi_state = MPI_WAITING;
+ MPQ_INC(mpq, mpq_tail);
} else {
dropped = 1;
}
mtx_leave(&mpq->mpq_mtx);
- if (dropped)
- m_freem(mc);
- else
- wakeup_one(&mpq->mpq_cursor);
+ if (!dropped)
+ wakeup_one(&mpq->mpq_phead);
return dropped;
}
-int
-mpq_load(struct mpq *mpq)
-{
- return 100 * ml_len(&mpq->mpq_list) / mpq->mpq_maxlen;
-}
-
#endif /* _MPQ_H_ */