aboutsummaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
authorJonathan Neuschäfer <j.neuschaefer@gmx.net>2018-07-01 05:28:37 +0200
committerThomas Gschwantner <tharre3@gmail.com>2018-07-01 05:28:37 +0200
commitf55eb4ec4151b5b94fac3a1eb756250231db6aed (patch)
treeb3cadb28a397903eb06546b123a8477483082e99
parentversion: bump snapshot (diff)
downloadwireguard-monolithic-historical-f55eb4ec4151b5b94fac3a1eb756250231db6aed.tar.xz
wireguard-monolithic-historical-f55eb4ec4151b5b94fac3a1eb756250231db6aed.zip
[WIP] implement a lock-free MPMC ring buffer
TODO: actually use the right memory barriers in the right places TODO: eliminate false sharing between mpmc_ptr_ring members TODO: reconsider atomic_long_t vs. atomic_t, vs. the type of size in _init() TODO: sprinkle likely/unlikely on some branches FIXME: it still crashes Signed-off-by: Jonathan Neuschäfer <j.neuschaefer@gmx.net>
-rw-r--r--src/device.h4
-rw-r--r--src/mpmc_ptr_ring.h212
-rw-r--r--src/queueing.c6
-rw-r--r--src/queueing.h4
-rw-r--r--src/receive.c6
-rw-r--r--src/send.c6
6 files changed, 225 insertions, 13 deletions
diff --git a/src/device.h b/src/device.h
index 2a0e2c7..e2fd205 100644
--- a/src/device.h
+++ b/src/device.h
@@ -10,13 +10,13 @@
#include "allowedips.h"
#include "hashtables.h"
#include "cookie.h"
+#include "mpmc_ptr_ring.h"
#include <linux/types.h>
#include <linux/netdevice.h>
#include <linux/workqueue.h>
#include <linux/mutex.h>
#include <linux/net.h>
-#include <linux/ptr_ring.h>
struct wireguard_device;
@@ -26,7 +26,7 @@ struct multicore_worker {
};
struct crypt_queue {
- struct ptr_ring ring;
+ struct mpmc_ptr_ring ring;
union {
struct {
struct multicore_worker __percpu *worker;
diff --git a/src/mpmc_ptr_ring.h b/src/mpmc_ptr_ring.h
new file mode 100644
index 0000000..7145a4e
--- /dev/null
+++ b/src/mpmc_ptr_ring.h
@@ -0,0 +1,212 @@
+/* SPDX-License-Identifier: GPL-2.0
+ *
+ * Copyright (C) 2018 Jonathan Neuschäfer
+ */
+
+/*
+ * This is an implementation of a Multi-Producer/Multi-Consumer (MPMC) queue,
+ * strongly inspired by ConcurrencyKit[1], and Linux's own ptr_ring.h.
+ *
+ *
+ * +-----------------------------------------------+
+ * index | 0| 1| 2| 3| 4| 5| 6| 7| 8| 9|10|11|12|13|14|15|
+ * state |--|--|--|**|**|**|**|**|**|**|++|++|++|--|--|--|
+ * +-----------------------------------------------+
+ * ^ ^ ^
+ * consumer head | producer head
+ * producer tail
+ * Possible states:
+ *
+ * -- : unoccupied
+ * ++ : being written
+ * ** : occupied
+ *
+ * Differences between ptr_ring.h and this implementation:
+ * - An additional producer tail pointer, which allows multiple enqueue
+ * operations to be in progress at the same time.
+ * - No consumer tail pointer, for simplicity (although I expect it can be
+ * added later)
+ * - Most importantly: No spinlocks.
+ * - The head/tail pointers (or rather: indices) are stored untrimmed, i.e.
+ * without the bit mask (size - 1) applied, because that's how ConcurrencyKit
+ * does it.
+ *
+ * [1]: https://github.com/concurrencykit/ck/blob/master/include/ck_ring.h
+ */
+
+struct mpmc_ptr_ring {
+ /* Read-mostly data */
+ void **queue;
+ size_t size;
+
+ /* consumer_head: updated in dequeue; read in enqueue */
+ atomic_long_t consumer_head;
+
+ /* producer_head: read and updated in enqueue */
+ atomic_long_t producer_head;
+
+ /* producer_tail: updated in enqueue, read in dequeue */
+ atomic_long_t producer_tail;
+};
+
+static inline bool mpmc_ptr_ring_empty(struct mpmc_ptr_ring *r)
+{
+ size_t ptail, chead;
+
+ mb(); /* TODO: think about barriers */
+
+ ptail = atomic_long_read(&r->producer_tail);
+ chead = atomic_long_read(&r->consumer_head);
+
+ mb(); /* TODO: think about barriers */
+
+ return chead == ptail;
+}
+
+static inline int mpmc_ptr_ring_produce(struct mpmc_ptr_ring *r, void *ptr)
+{
+ size_t p, c;
+ size_t mask = r->size - 1;
+
+ p = atomic_long_read(&r->producer_head);
+
+ for (;;) {
+ rmb(); /* TODO */
+ c = atomic_long_read(&r->consumer_head);
+ mb();
+
+ if ((p - c) < mask) { /* fast path */
+ if (atomic_long_cmpxchg(&r->producer_head, p, p + 1) == p)
+ break;
+ } else {
+ size_t new_p;
+
+ mb();
+ new_p = atomic_long_read(&r->producer_head);
+ mb();
+
+ if (new_p == p)
+ return -ENOSPC;
+
+ p = new_p;
+ }
+ }
+
+ mb(); /* barriers? */
+ WRITE_ONCE(r->queue[p & mask], ptr);
+ mb(); /* barriers? */
+
+ /* Wait until it's our term to update the producer tail pointer */
+ while(atomic_long_read(&r->producer_tail) != p)
+ cpu_relax();
+
+ smp_mb__before_atomic();
+ atomic_long_set(&r->producer_tail, p + 1);
+ smp_mb__after_atomic();
+
+ return 0;
+}
+
+static inline void *mpmc_ptr_ring_consume(struct mpmc_ptr_ring *r)
+{
+ size_t c, p, old_c;
+ void *element;
+ size_t mask = r->size - 1;
+
+ for (;;) {
+ mb(); // TODO: check
+ p = atomic_long_read(&r->producer_tail);
+ mb(); // TODO: check
+ c = atomic_long_read(&r->consumer_head);
+ mb(); // TODO: check
+
+ /* Is the ring empty? */
+ if (p == c)
+ return NULL;
+
+ element = READ_ONCE(r->queue[c & mask]);
+
+ mb(); // TODO: check
+
+ old_c = atomic_long_cmpxchg(&r->consumer_head, c, c + 1);
+ if (old_c == c)
+ break;
+ }
+ mb(); // TODO: check
+
+ return element;
+}
+
+static inline int mpmc_ptr_ring_init(struct mpmc_ptr_ring *r, int size, gfp_t gfp)
+{
+ if (WARN_ONCE(!is_power_of_2(size), "size must be a power of two"))
+ return -EINVAL;
+
+ r->size = size;
+ atomic_long_set(&r->consumer_head, 0);
+ atomic_long_set(&r->producer_head, 0);
+ atomic_long_set(&r->producer_tail, 0);
+
+ r->queue = kcalloc(size, sizeof(r->queue[0]), gfp);
+ if (!r->queue)
+ return -ENOMEM;
+
+ mb(); /* TODO: check */
+
+ return 0;
+}
+
+static inline void mpmc_ptr_ring_cleanup(struct mpmc_ptr_ring *r, void (*destroy)(void *))
+{
+ void *ptr;
+
+ if (destroy)
+ while ((ptr = mpmc_ptr_ring_consume(r)))
+ destroy(ptr);
+ kfree(r->queue);
+}
+
+/**
+ * __mpmc_ptr_ring_peek - Read the first element in an MPMC ring buffer
+ *
+ * @r: The ring buffer
+ *
+ * Note that this function should only be called in single-consumer situations.
+ */
+static inline void *__mpmc_ptr_ring_peek(struct mpmc_ptr_ring *r)
+{
+ size_t c, p;
+ size_t mask = r->size - 1;
+ void *element;
+
+ mb(); // TODO: check
+ c = atomic_long_read(&r->consumer_head);
+ mb(); // TODO: check
+ p = atomic_long_read(&r->producer_tail);
+ mb(); // TODO: check
+
+ //pr_info("%s %px %u, at %zu/%zu\n", __func__, r, current->pid, c, p);
+
+ if (c == p)
+ return NULL;
+
+ mb(); // TODO: check
+ element = READ_ONCE(r->queue[c & mask]);
+ mb(); // TODO: check
+
+ return element;
+}
+
+/**
+ * __mpmc_ptr_ring_discard_one - Discard the first element in an MPMC ring buffer
+ *
+ * @r: The ring buffer
+ *
+ * Note that this function should only be called in single-consumer situations.
+ */
+static inline void __mpmc_ptr_ring_discard_one(struct mpmc_ptr_ring *r)
+{
+ mb(); // TODO: check
+ atomic_long_inc(&r->consumer_head);
+ mb(); // TODO: check
+}
diff --git a/src/queueing.c b/src/queueing.c
index f33395e..80048c9 100644
--- a/src/queueing.c
+++ b/src/queueing.c
@@ -25,7 +25,7 @@ int packet_queue_init(struct crypt_queue *queue, work_func_t function, bool mult
int ret;
memset(queue, 0, sizeof(*queue));
- ret = ptr_ring_init(&queue->ring, len, GFP_KERNEL);
+ ret = mpmc_ptr_ring_init(&queue->ring, len, GFP_KERNEL);
if (ret)
return ret;
if (multicore) {
@@ -41,6 +41,6 @@ void packet_queue_free(struct crypt_queue *queue, bool multicore)
{
if (multicore)
free_percpu(queue->worker);
- WARN_ON(!__ptr_ring_empty(&queue->ring));
- ptr_ring_cleanup(&queue->ring, NULL);
+ WARN_ON(!mpmc_ptr_ring_empty(&queue->ring));
+ mpmc_ptr_ring_cleanup(&queue->ring, NULL);
}
diff --git a/src/queueing.h b/src/queueing.h
index 0057cfa..14d69df 100644
--- a/src/queueing.h
+++ b/src/queueing.h
@@ -120,10 +120,10 @@ static inline int queue_enqueue_per_device_and_peer(struct crypt_queue *device_q
int cpu;
atomic_set(&PACKET_CB(skb)->state, PACKET_STATE_UNCRYPTED);
- if (unlikely(ptr_ring_produce_bh(&peer_queue->ring, skb)))
+ if (unlikely(mpmc_ptr_ring_produce(&peer_queue->ring, skb)))
return -ENOSPC;
cpu = cpumask_next_online(next_cpu);
- if (unlikely(ptr_ring_produce_bh(&device_queue->ring, skb)))
+ if (unlikely(mpmc_ptr_ring_produce(&device_queue->ring, skb)))
return -EPIPE;
queue_work_on(cpu, wq, &per_cpu_ptr(device_queue->worker, cpu)->work);
return 0;
diff --git a/src/receive.c b/src/receive.c
index d150a0b..5cb56a4 100644
--- a/src/receive.c
+++ b/src/receive.c
@@ -379,8 +379,8 @@ void packet_rx_worker(struct work_struct *work)
bool free;
local_bh_disable();
- while ((skb = __ptr_ring_peek(&queue->ring)) != NULL && (state = atomic_read(&PACKET_CB(skb)->state)) != PACKET_STATE_UNCRYPTED) {
- __ptr_ring_discard_one(&queue->ring);
+ while ((skb = __mpmc_ptr_ring_peek(&queue->ring)) != NULL && (state = atomic_read(&PACKET_CB(skb)->state)) != PACKET_STATE_UNCRYPTED) {
+ __mpmc_ptr_ring_discard_one(&queue->ring);
peer = PACKET_PEER(skb);
keypair = PACKET_CB(skb)->keypair;
free = true;
@@ -415,7 +415,7 @@ void packet_decrypt_worker(struct work_struct *work)
struct sk_buff *skb;
bool have_simd = simd_get();
- while ((skb = ptr_ring_consume_bh(&queue->ring)) != NULL) {
+ while ((skb = mpmc_ptr_ring_consume(&queue->ring)) != NULL) {
enum packet_state state = likely(skb_decrypt(skb, &PACKET_CB(skb)->keypair->receiving, have_simd)) ? PACKET_STATE_CRYPTED : PACKET_STATE_DEAD;
queue_enqueue_per_peer(&PACKET_PEER(skb)->rx_queue, skb, state);
have_simd = simd_relax(have_simd);
diff --git a/src/send.c b/src/send.c
index 7a8bea1..d34b29a 100644
--- a/src/send.c
+++ b/src/send.c
@@ -223,8 +223,8 @@ void packet_tx_worker(struct work_struct *work)
struct sk_buff *first;
enum packet_state state;
- while ((first = __ptr_ring_peek(&queue->ring)) != NULL && (state = atomic_read(&PACKET_CB(first)->state)) != PACKET_STATE_UNCRYPTED) {
- __ptr_ring_discard_one(&queue->ring);
+ while ((first = __mpmc_ptr_ring_peek(&queue->ring)) != NULL && (state = atomic_read(&PACKET_CB(first)->state)) != PACKET_STATE_UNCRYPTED) {
+ __mpmc_ptr_ring_discard_one(&queue->ring);
peer = PACKET_PEER(first);
keypair = PACKET_CB(first)->keypair;
@@ -244,7 +244,7 @@ void packet_encrypt_worker(struct work_struct *work)
struct sk_buff *first, *skb, *next;
bool have_simd = simd_get();
- while ((first = ptr_ring_consume_bh(&queue->ring)) != NULL) {
+ while ((first = mpmc_ptr_ring_consume(&queue->ring)) != NULL) {
enum packet_state state = PACKET_STATE_CRYPTED;
skb_walk_null_queue_safe(first, skb, next) {