diff options
author | Thomas Gschwantner <tharre3@gmail.com> | 2018-06-01 01:11:48 +0200 |
---|---|---|
committer | Jason A. Donenfeld <Jason@zx2c4.com> | 2018-06-04 20:30:02 +0200 |
commit | 26e2521b85deb2c9715be01a8f72620926912096 (patch) | |
tree | 10a32ff5a20b76ba3e2a1759b456f27177694de3 | |
parent | WIP5 (diff) | |
download | wireguard-monolithic-historical-26e2521b85deb2c9715be01a8f72620926912096.tar.xz wireguard-monolithic-historical-26e2521b85deb2c9715be01a8f72620926912096.zip |
WIP6
-rw-r--r-- | src/mpmc_ring.h | 219 |
1 files changed, 122 insertions, 97 deletions
diff --git a/src/mpmc_ring.h b/src/mpmc_ring.h index 1d19a7f..e3e993b 100644 --- a/src/mpmc_ring.h +++ b/src/mpmc_ring.h @@ -27,14 +27,6 @@ #ifndef MPMC_RING_H #define MPMC_RING_H -#ifndef CK_CC_INLINE -#define CK_CC_INLINE inline /* inline is discouraged in the kernel */ -#endif - -#ifndef CK_CC_FORCE_INLINE -#define CK_CC_FORCE_INLINE __always_inline -#endif - #include <stdbool.h> #include <linux/string.h> @@ -43,60 +35,16 @@ #include <linux/atomic.h> #include <linux/cache.h> -/* http://concurrencykit.org/doc/ck_pr_load.html */ -#define ck_pr_load_uint(SRC) atomic_read(SRC) - -/* http://concurrencykit.org/doc/ck_pr_fence_load.html */ -#define ck_pr_fence_load() smp_rmb() - -/* http://concurrencykit.org/doc/ck_pr_fence_store.html */ -#define ck_pr_fence_store() smp_wmb() - -/* http://concurrencykit.org/doc/ck_pr_stall.html */ -#define ck_pr_stall() cpu_relax() - /* http://concurrencykit.org/doc/ck_pr_fence_store_atomic.html */ /* this actually resolves to __asm__ __volatile__("" ::: "memory"); in x86-64 */ /* so basically a compiler barrier? */ #define ck_pr_fence_store_atomic() smp_mb__before_atomic() /* TODO: probably overkill? */ -/* http://concurrencykit.org/doc/ck_pr_cas.html */ -/* - ck_pr_cas_uint_value(unsigned int *target, unsigned int compare, - unsigned int set, unsigned int *v) { - _Bool - z; __asm__ __volatile__("lock " "cmpxchg" "l" " %3, %0;" "mov %% " "eax" ", %2;" "setz %1;" : "+m" (*(unsigned int *)target), "=a" (z), "=m" (*(unsigned int *)v) : "q" (set), "a" (compare) : "memory", "cc"); - return z; } -*/ -__always_inline static -bool ck_pr_cas_uint(atomic_t *target, uint old, uint new) -{ - uint prev; - prev = atomic_cmpxchg(target, old, new); - //pr_err("cas(): old: %d, new: %d, prev: %d", old, new, prev); - return prev == old; -} - -__always_inline static -bool ck_pr_cas_uint_value(atomic_t *target, uint old, uint new, uint *v) -{ - bool ret; - //pr_err("cas_value(): %d\n", *v); - ret = ck_pr_cas_uint(target, old, new); - WRITE_ONCE(*v, ret ? old : new); - return ret; -} - -/* http://concurrencykit.org/doc/ck_pr_store.html */ -// TODO: compiler barrier? -#define ck_pr_store_uint(A, B) atomic_set((A), (B)) - /* * Concurrent ring buffer. */ struct ck_ring { - /* TODO: is the aligment correct? */ atomic_t c_head ____cacheline_aligned_in_smp; atomic_t p_tail ____cacheline_aligned_in_smp; atomic_t p_head; @@ -118,11 +66,10 @@ static inline int ck_ring_init(struct ck_ring *ring, uint size, gfp_t gfp) if (!ring->queue) return -ENOMEM; - memset(ring->queue, 0x77, sizeof(void *) * size); return 0; } -CK_CC_FORCE_INLINE static bool +__always_inline static bool _ck_ring_enqueue_mp(struct ck_ring *ring, const void *entry, unsigned int ts, unsigned int *size) { @@ -131,15 +78,15 @@ _ck_ring_enqueue_mp(struct ck_ring *ring, const void *entry, unsigned int ts, void *buffer; bool r = true; - producer = ck_pr_load_uint(&ring->p_head); + producer = atomic_read(&ring->p_head); for (;;) { /* * The snapshot of producer must be up to date with respect to * consumer. */ - ck_pr_fence_load(); - consumer = ck_pr_load_uint(&ring->c_head); + smp_rmb(); + consumer = atomic_read(&ring->c_head); delta = (producer + 1) & mask; @@ -148,10 +95,10 @@ _ck_ring_enqueue_mp(struct ck_ring *ring, const void *entry, unsigned int ts, * less than consumer) and the buffer is definitely not full. */ if (likely((producer - consumer) < mask)) { - if (ck_pr_cas_uint_value(&ring->p_head, - producer, delta, &producer) == true) { + if (atomic_cmpxchg(&ring->p_head, producer, delta) == producer) break; - } + + producer = delta; } else { unsigned int new_producer; @@ -161,8 +108,8 @@ _ck_ring_enqueue_mp(struct ck_ring *ring, const void *entry, unsigned int ts, * p_read that must be ordered wrt the snapshot of * c_head. */ - ck_pr_fence_load(); - new_producer = ck_pr_load_uint(&ring->p_head); + smp_rmb(); + new_producer = atomic_read(&ring->p_head); /* * Only fail if we haven't made forward progress in @@ -183,22 +130,21 @@ _ck_ring_enqueue_mp(struct ck_ring *ring, const void *entry, unsigned int ts, } buffer = (char *)ring->queue + ts * producer; - //pr_err("memcpy(%p, %p, %u)", buffer, entry, ts); memcpy(buffer, entry, ts); /* * Wait until all concurrent producers have completed writing * their data into the ring buffer. */ - while (ck_pr_load_uint(&ring->p_tail) != producer) - ck_pr_stall(); + while (atomic_read(&ring->p_tail) != producer) + cpu_relax(); /* * Ensure that copy is completed before updating shared producer * counter. */ - ck_pr_fence_store(); - ck_pr_store_uint(&ring->p_tail, delta); + smp_wmb(); + atomic_set(&ring->p_tail, delta); leave: if (size != NULL) @@ -207,7 +153,7 @@ leave: return r; } -CK_CC_FORCE_INLINE static bool +__always_inline static bool _ck_ring_enqueue_mp_size(struct ck_ring *ring, const void *entry, unsigned int ts, unsigned int *size) { @@ -219,7 +165,7 @@ _ck_ring_enqueue_mp_size(struct ck_ring *ring, const void *entry, return r; } -CK_CC_FORCE_INLINE static bool +__always_inline static bool _ck_ring_trydequeue_mc(struct ck_ring *ring, void *data, unsigned int size) { @@ -227,30 +173,31 @@ _ck_ring_trydequeue_mc(struct ck_ring *ring, unsigned int consumer, producer; const void *buffer; - consumer = ck_pr_load_uint(&ring->c_head); - ck_pr_fence_load(); - producer = ck_pr_load_uint(&ring->p_tail); + consumer = atomic_read(&ring->c_head); + smp_rmb(); + producer = atomic_read(&ring->p_tail); if (unlikely(consumer == producer)) return false; - ck_pr_fence_load(); + smp_rmb(); buffer = (const char *)ring->queue + size * consumer; memcpy(data, buffer, size); ck_pr_fence_store_atomic(); - return ck_pr_cas_uint(&ring->c_head, consumer, (consumer + 1) & mask); + return atomic_cmpxchg(&ring->c_head, consumer, (consumer + 1) & mask) == consumer; } -CK_CC_FORCE_INLINE static bool +__always_inline static bool _ck_ring_dequeue_mc(struct ck_ring *ring, void *data, unsigned int ts) { const unsigned int mask = ring->mask; - unsigned int consumer, producer; + unsigned int consumer, producer, delta; + bool cmp; - consumer = ck_pr_load_uint(&ring->c_head); + consumer = atomic_read(&ring->c_head); do { const char *target; @@ -259,24 +206,102 @@ _ck_ring_dequeue_mc(struct ck_ring *ring, * Producer counter must represent state relative to * our latest consumer snapshot. */ - ck_pr_fence_load(); - producer = ck_pr_load_uint(&ring->p_tail); + smp_rmb(); + producer = atomic_read(&ring->p_tail); if (unlikely(consumer == producer)) return false; - ck_pr_fence_load(); + smp_rmb(); target = (const char *)ring->queue + ts * consumer; memcpy(data, target, ts); /* Serialize load with respect to head update. */ ck_pr_fence_store_atomic(); - } while (ck_pr_cas_uint_value(&ring->c_head, - consumer, - (consumer + 1) & mask, - &consumer) == false); + delta = (consumer + 1) & mask; + cmp = atomic_cmpxchg(&ring->c_head, consumer, delta) == consumer; + consumer = delta; + } while (!cmp); + + return true; +} + +__always_inline static bool +_ck_ring_enqueue_sp(struct ck_ring *ring, const void *entry, + unsigned int ts, + unsigned int *size) +{ + const unsigned int mask = ring->mask; + unsigned int consumer, producer, delta; + void *buffer; + + consumer = atomic_read(&ring->c_head); + producer = atomic_read(&ring->p_tail); + delta = producer + 1; + if (size != NULL) + *size = (producer - consumer) & mask; + + if (unlikely((delta & mask) == (consumer & mask))) + return false; + + buffer = (char *)buffer + ts * (producer & mask); + memcpy(buffer, entry, ts); + + /* + * Make sure to update slot value before indicating + * that the slot is available for consumption. + */ + smp_wmb(); + atomic_set(&ring->p_tail, delta); + return true; +} + +__always_inline static bool +_ck_ring_enqueue_sp_size(struct ck_ring *ring, + const void *entry, + unsigned int ts, + unsigned int *size) +{ + unsigned int sz; + bool r; + + r = _ck_ring_enqueue_sp(ring, entry, ts, &sz); + *size = sz; + return r; +} + +__always_inline static bool +_ck_ring_dequeue_sc(struct ck_ring *ring, + void *target, + unsigned int size) +{ + const unsigned int mask = ring->mask; + unsigned int consumer, producer; + const void *buffer; + + consumer = atomic_read(&ring->c_head); + producer = atomic_read(&ring->p_tail); + + if (unlikely(consumer == producer)) + return false; + + /* + * Make sure to serialize with respect to our snapshot + * of the producer counter. + */ + smp_rmb(); + + buffer = (const char *)buffer + size * (consumer & mask); + memcpy(target, buffer, size); + + /* + * Make sure copy is completed with respect to consumer + * update. + */ + smp_wmb(); + atomic_set(&ring->c_head, consumer + 1); return true; } @@ -284,11 +309,11 @@ static __always_inline bool mpmc_ring_empty(struct ck_ring *ring) { uint producer, consumer; - consumer = ck_pr_load_uint(&ring->c_head); - ck_pr_fence_load(); - producer = ck_pr_load_uint(&ring->p_tail); + consumer = atomic_read(&ring->c_head); + smp_rmb(); + producer = atomic_read(&ring->p_tail); - ck_pr_fence_load(); + smp_rmb(); return producer == consumer; } @@ -305,11 +330,11 @@ static __always_inline bool mpmc_ptr_ring_peek(struct ck_ring *ring, void *data, const unsigned int mask = ring->mask; void *buffer; - consumer = ck_pr_load_uint(&ring->c_head); - ck_pr_fence_load(); - producer = ck_pr_load_uint(&ring->p_tail); + consumer = atomic_read(&ring->c_head); + smp_rmb(); + producer = atomic_read(&ring->p_tail); - ck_pr_fence_load(); + smp_rmb(); if (unlikely(producer == consumer)) { data = NULL; @@ -325,7 +350,7 @@ static __always_inline bool mpmc_ptr_ring_peek(struct ck_ring *ring, void *data, static __always_inline void mpmc_ptr_ring_discard(struct ck_ring *ring) { const unsigned int mask = ring->mask; - unsigned int consumer = consumer = ck_pr_load_uint(&ring->c_head); + unsigned int consumer = atomic_read(&ring->c_head); atomic_set(&ring->c_head, (consumer + 1) & mask); } @@ -335,26 +360,26 @@ static __always_inline void mpmc_ptr_ring_discard(struct ck_ring *ring) * ring buffer containing pointers. Correctness is provided for any number of * producers and consumers. */ -CK_CC_INLINE static bool +inline static bool ck_ring_enqueue_mpmc(struct ck_ring *ring, const void *entry) { return _ck_ring_enqueue_mp(ring, &entry, sizeof(entry), NULL); } -CK_CC_INLINE static bool +inline static bool ck_ring_enqueue_mpmc_size(struct ck_ring *ring, const void *entry, unsigned int *size) { return _ck_ring_enqueue_mp_size(ring, &entry, sizeof(entry), size); } -CK_CC_INLINE static bool +inline static bool ck_ring_trydequeue_mpmc(struct ck_ring *ring, void *data) { return _ck_ring_trydequeue_mc(ring, (void **)data, sizeof(void *)); } -CK_CC_INLINE static bool +inline static bool ck_ring_dequeue_mpmc(struct ck_ring *ring, void *data) { return _ck_ring_dequeue_mc(ring, (void **)data, sizeof(void *)); |