diff options
author | Jonathan Neuschäfer <j.neuschaefer@gmx.net> | 2018-05-28 00:40:09 +0200 |
---|---|---|
committer | Jason A. Donenfeld <Jason@zx2c4.com> | 2018-06-04 20:29:48 +0200 |
commit | 0dd372c7488a074ee456f92acc3a45bd351be0a4 (patch) | |
tree | 69fe5048e377c085c5e75f7b37e52ac898cfeb4c /src | |
parent | wg-quick: android: change name of intent (diff) | |
download | wireguard-monolithic-historical-0dd372c7488a074ee456f92acc3a45bd351be0a4.tar.xz wireguard-monolithic-historical-0dd372c7488a074ee456f92acc3a45bd351be0a4.zip |
[WIP] Implement a lock-free MPMC ring buffer
TODO: actually use the right memory barriers in the right places
TODO: eliminate false sharing between mpmc_ptr_ring members
TODO: reconsider atomic_long_t vs. atomic_t, vs. the type of size in _init()
TODO: sprinkle likely/unlikely on some branches
FIXME: it still crashes
Diffstat (limited to 'src')
-rw-r--r-- | src/device.h | 4 | ||||
-rw-r--r-- | src/mpmc_ptr_ring.h | 212 | ||||
-rw-r--r-- | src/queueing.c | 6 | ||||
-rw-r--r-- | src/queueing.h | 4 | ||||
-rw-r--r-- | src/receive.c | 8 | ||||
-rw-r--r-- | src/send.c | 8 |
6 files changed, 225 insertions, 17 deletions
diff --git a/src/device.h b/src/device.h index 2a0e2c7..e2fd205 100644 --- a/src/device.h +++ b/src/device.h @@ -10,13 +10,13 @@ #include "allowedips.h" #include "hashtables.h" #include "cookie.h" +#include "mpmc_ptr_ring.h" #include <linux/types.h> #include <linux/netdevice.h> #include <linux/workqueue.h> #include <linux/mutex.h> #include <linux/net.h> -#include <linux/ptr_ring.h> struct wireguard_device; @@ -26,7 +26,7 @@ struct multicore_worker { }; struct crypt_queue { - struct ptr_ring ring; + struct mpmc_ptr_ring ring; union { struct { struct multicore_worker __percpu *worker; diff --git a/src/mpmc_ptr_ring.h b/src/mpmc_ptr_ring.h new file mode 100644 index 0000000..7145a4e --- /dev/null +++ b/src/mpmc_ptr_ring.h @@ -0,0 +1,212 @@ +/* SPDX-License-Identifier: GPL-2.0 + * + * Copyright (C) 2018 Jonathan Neuschäfer + */ + +/* + * This is an implementation of a Multi-Producer/Multi-Consumer (MPMC) queue, + * strongly inspired by ConcurrencyKit[1], and Linux's own ptr_ring.h. + * + * + * +-----------------------------------------------+ + * index | 0| 1| 2| 3| 4| 5| 6| 7| 8| 9|10|11|12|13|14|15| + * state |--|--|--|**|**|**|**|**|**|**|++|++|++|--|--|--| + * +-----------------------------------------------+ + * ^ ^ ^ + * consumer head | producer head + * producer tail + * Possible states: + * + * -- : unoccupied + * ++ : being written + * ** : occupied + * + * Differences between ptr_ring.h and this implementation: + * - An additional producer tail pointer, which allows multiple enqueue + * operations to be in progress at the same time. + * - No consumer tail pointer, for simplicity (although I expect it can be + * added later) + * - Most importantly: No spinlocks. + * - The head/tail pointers (or rather: indices) are stored untrimmed, i.e. + * without the bit mask (size - 1) applied, because that's how ConcurrencyKit + * does it. + * + * [1]: https://github.com/concurrencykit/ck/blob/master/include/ck_ring.h + */ + +struct mpmc_ptr_ring { + /* Read-mostly data */ + void **queue; + size_t size; + + /* consumer_head: updated in dequeue; read in enqueue */ + atomic_long_t consumer_head; + + /* producer_head: read and updated in enqueue */ + atomic_long_t producer_head; + + /* producer_tail: updated in enqueue, read in dequeue */ + atomic_long_t producer_tail; +}; + +static inline bool mpmc_ptr_ring_empty(struct mpmc_ptr_ring *r) +{ + size_t ptail, chead; + + mb(); /* TODO: think about barriers */ + + ptail = atomic_long_read(&r->producer_tail); + chead = atomic_long_read(&r->consumer_head); + + mb(); /* TODO: think about barriers */ + + return chead == ptail; +} + +static inline int mpmc_ptr_ring_produce(struct mpmc_ptr_ring *r, void *ptr) +{ + size_t p, c; + size_t mask = r->size - 1; + + p = atomic_long_read(&r->producer_head); + + for (;;) { + rmb(); /* TODO */ + c = atomic_long_read(&r->consumer_head); + mb(); + + if ((p - c) < mask) { /* fast path */ + if (atomic_long_cmpxchg(&r->producer_head, p, p + 1) == p) + break; + } else { + size_t new_p; + + mb(); + new_p = atomic_long_read(&r->producer_head); + mb(); + + if (new_p == p) + return -ENOSPC; + + p = new_p; + } + } + + mb(); /* barriers? */ + WRITE_ONCE(r->queue[p & mask], ptr); + mb(); /* barriers? */ + + /* Wait until it's our term to update the producer tail pointer */ + while(atomic_long_read(&r->producer_tail) != p) + cpu_relax(); + + smp_mb__before_atomic(); + atomic_long_set(&r->producer_tail, p + 1); + smp_mb__after_atomic(); + + return 0; +} + +static inline void *mpmc_ptr_ring_consume(struct mpmc_ptr_ring *r) +{ + size_t c, p, old_c; + void *element; + size_t mask = r->size - 1; + + for (;;) { + mb(); // TODO: check + p = atomic_long_read(&r->producer_tail); + mb(); // TODO: check + c = atomic_long_read(&r->consumer_head); + mb(); // TODO: check + + /* Is the ring empty? */ + if (p == c) + return NULL; + + element = READ_ONCE(r->queue[c & mask]); + + mb(); // TODO: check + + old_c = atomic_long_cmpxchg(&r->consumer_head, c, c + 1); + if (old_c == c) + break; + } + mb(); // TODO: check + + return element; +} + +static inline int mpmc_ptr_ring_init(struct mpmc_ptr_ring *r, int size, gfp_t gfp) +{ + if (WARN_ONCE(!is_power_of_2(size), "size must be a power of two")) + return -EINVAL; + + r->size = size; + atomic_long_set(&r->consumer_head, 0); + atomic_long_set(&r->producer_head, 0); + atomic_long_set(&r->producer_tail, 0); + + r->queue = kcalloc(size, sizeof(r->queue[0]), gfp); + if (!r->queue) + return -ENOMEM; + + mb(); /* TODO: check */ + + return 0; +} + +static inline void mpmc_ptr_ring_cleanup(struct mpmc_ptr_ring *r, void (*destroy)(void *)) +{ + void *ptr; + + if (destroy) + while ((ptr = mpmc_ptr_ring_consume(r))) + destroy(ptr); + kfree(r->queue); +} + +/** + * __mpmc_ptr_ring_peek - Read the first element in an MPMC ring buffer + * + * @r: The ring buffer + * + * Note that this function should only be called in single-consumer situations. + */ +static inline void *__mpmc_ptr_ring_peek(struct mpmc_ptr_ring *r) +{ + size_t c, p; + size_t mask = r->size - 1; + void *element; + + mb(); // TODO: check + c = atomic_long_read(&r->consumer_head); + mb(); // TODO: check + p = atomic_long_read(&r->producer_tail); + mb(); // TODO: check + + //pr_info("%s %px %u, at %zu/%zu\n", __func__, r, current->pid, c, p); + + if (c == p) + return NULL; + + mb(); // TODO: check + element = READ_ONCE(r->queue[c & mask]); + mb(); // TODO: check + + return element; +} + +/** + * __mpmc_ptr_ring_discard_one - Discard the first element in an MPMC ring buffer + * + * @r: The ring buffer + * + * Note that this function should only be called in single-consumer situations. + */ +static inline void __mpmc_ptr_ring_discard_one(struct mpmc_ptr_ring *r) +{ + mb(); // TODO: check + atomic_long_inc(&r->consumer_head); + mb(); // TODO: check +} diff --git a/src/queueing.c b/src/queueing.c index 85dea6b..80048c9 100644 --- a/src/queueing.c +++ b/src/queueing.c @@ -25,7 +25,7 @@ int packet_queue_init(struct crypt_queue *queue, work_func_t function, bool mult int ret; memset(queue, 0, sizeof(*queue)); - ret = ptr_ring_init(&queue->ring, len, GFP_KERNEL); + ret = mpmc_ptr_ring_init(&queue->ring, len, GFP_KERNEL); if (ret) return ret; if (multicore) { @@ -41,6 +41,6 @@ void packet_queue_free(struct crypt_queue *queue, bool multicore) { if (multicore) free_percpu(queue->worker); - WARN_ON(!ptr_ring_empty_bh(&queue->ring)); - ptr_ring_cleanup(&queue->ring, NULL); + WARN_ON(!mpmc_ptr_ring_empty(&queue->ring)); + mpmc_ptr_ring_cleanup(&queue->ring, NULL); } diff --git a/src/queueing.h b/src/queueing.h index 0057cfa..14d69df 100644 --- a/src/queueing.h +++ b/src/queueing.h @@ -120,10 +120,10 @@ static inline int queue_enqueue_per_device_and_peer(struct crypt_queue *device_q int cpu; atomic_set(&PACKET_CB(skb)->state, PACKET_STATE_UNCRYPTED); - if (unlikely(ptr_ring_produce_bh(&peer_queue->ring, skb))) + if (unlikely(mpmc_ptr_ring_produce(&peer_queue->ring, skb))) return -ENOSPC; cpu = cpumask_next_online(next_cpu); - if (unlikely(ptr_ring_produce_bh(&device_queue->ring, skb))) + if (unlikely(mpmc_ptr_ring_produce(&device_queue->ring, skb))) return -EPIPE; queue_work_on(cpu, wq, &per_cpu_ptr(device_queue->worker, cpu)->work); return 0; diff --git a/src/receive.c b/src/receive.c index 27d3d04..baa41e6 100644 --- a/src/receive.c +++ b/src/receive.c @@ -378,9 +378,8 @@ void packet_rx_worker(struct work_struct *work) bool free; local_bh_disable(); - spin_lock_bh(&queue->ring.consumer_lock); - while ((skb = __ptr_ring_peek(&queue->ring)) != NULL && (state = atomic_read(&PACKET_CB(skb)->state)) != PACKET_STATE_UNCRYPTED) { - __ptr_ring_discard_one(&queue->ring); + while ((skb = __mpmc_ptr_ring_peek(&queue->ring)) != NULL && (state = atomic_read(&PACKET_CB(skb)->state)) != PACKET_STATE_UNCRYPTED) { + __mpmc_ptr_ring_discard_one(&queue->ring); peer = PACKET_PEER(skb); keypair = PACKET_CB(skb)->keypair; free = true; @@ -406,7 +405,6 @@ next: if (unlikely(free)) dev_kfree_skb(skb); } - spin_unlock_bh(&queue->ring.consumer_lock); local_bh_enable(); } @@ -416,7 +414,7 @@ void packet_decrypt_worker(struct work_struct *work) struct sk_buff *skb; bool have_simd = chacha20poly1305_init_simd(); - while ((skb = ptr_ring_consume_bh(&queue->ring)) != NULL) { + while ((skb = mpmc_ptr_ring_consume(&queue->ring)) != NULL) { enum packet_state state = likely(skb_decrypt(skb, &PACKET_CB(skb)->keypair->receiving, have_simd)) ? PACKET_STATE_CRYPTED : PACKET_STATE_DEAD; queue_enqueue_per_peer(&PACKET_PEER(skb)->rx_queue, skb, state); @@ -223,9 +223,8 @@ void packet_tx_worker(struct work_struct *work) struct sk_buff *first; enum packet_state state; - spin_lock_bh(&queue->ring.consumer_lock); - while ((first = __ptr_ring_peek(&queue->ring)) != NULL && (state = atomic_read(&PACKET_CB(first)->state)) != PACKET_STATE_UNCRYPTED) { - __ptr_ring_discard_one(&queue->ring); + while ((first = __mpmc_ptr_ring_peek(&queue->ring)) != NULL && (state = atomic_read(&PACKET_CB(first)->state)) != PACKET_STATE_UNCRYPTED) { + __mpmc_ptr_ring_discard_one(&queue->ring); peer = PACKET_PEER(first); keypair = PACKET_CB(first)->keypair; @@ -237,7 +236,6 @@ void packet_tx_worker(struct work_struct *work) noise_keypair_put(keypair); peer_put(peer); } - spin_unlock_bh(&queue->ring.consumer_lock); } void packet_encrypt_worker(struct work_struct *work) @@ -246,7 +244,7 @@ void packet_encrypt_worker(struct work_struct *work) struct sk_buff *first, *skb, *next; bool have_simd = chacha20poly1305_init_simd(); - while ((first = ptr_ring_consume_bh(&queue->ring)) != NULL) { + while ((first = mpmc_ptr_ring_consume(&queue->ring)) != NULL) { enum packet_state state = PACKET_STATE_CRYPTED; skb_walk_null_queue_safe(first, skb, next) { |