aboutsummaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
-rw-r--r--src/mpmc_ring.h219
1 files changed, 122 insertions, 97 deletions
diff --git a/src/mpmc_ring.h b/src/mpmc_ring.h
index 1d19a7f..e3e993b 100644
--- a/src/mpmc_ring.h
+++ b/src/mpmc_ring.h
@@ -27,14 +27,6 @@
#ifndef MPMC_RING_H
#define MPMC_RING_H
-#ifndef CK_CC_INLINE
-#define CK_CC_INLINE inline /* inline is discouraged in the kernel */
-#endif
-
-#ifndef CK_CC_FORCE_INLINE
-#define CK_CC_FORCE_INLINE __always_inline
-#endif
-
#include <stdbool.h>
#include <linux/string.h>
@@ -43,60 +35,16 @@
#include <linux/atomic.h>
#include <linux/cache.h>
-/* http://concurrencykit.org/doc/ck_pr_load.html */
-#define ck_pr_load_uint(SRC) atomic_read(SRC)
-
-/* http://concurrencykit.org/doc/ck_pr_fence_load.html */
-#define ck_pr_fence_load() smp_rmb()
-
-/* http://concurrencykit.org/doc/ck_pr_fence_store.html */
-#define ck_pr_fence_store() smp_wmb()
-
-/* http://concurrencykit.org/doc/ck_pr_stall.html */
-#define ck_pr_stall() cpu_relax()
-
/* http://concurrencykit.org/doc/ck_pr_fence_store_atomic.html */
/* this actually resolves to __asm__ __volatile__("" ::: "memory"); in x86-64 */
/* so basically a compiler barrier? */
#define ck_pr_fence_store_atomic() smp_mb__before_atomic() /* TODO: probably overkill? */
-/* http://concurrencykit.org/doc/ck_pr_cas.html */
-/*
- ck_pr_cas_uint_value(unsigned int *target, unsigned int compare,
- unsigned int set, unsigned int *v) {
- _Bool
- z; __asm__ __volatile__("lock " "cmpxchg" "l" " %3, %0;" "mov %% " "eax" ", %2;" "setz %1;" : "+m" (*(unsigned int *)target), "=a" (z), "=m" (*(unsigned int *)v) : "q" (set), "a" (compare) : "memory", "cc");
- return z; }
-*/
-__always_inline static
-bool ck_pr_cas_uint(atomic_t *target, uint old, uint new)
-{
- uint prev;
- prev = atomic_cmpxchg(target, old, new);
- //pr_err("cas(): old: %d, new: %d, prev: %d", old, new, prev);
- return prev == old;
-}
-
-__always_inline static
-bool ck_pr_cas_uint_value(atomic_t *target, uint old, uint new, uint *v)
-{
- bool ret;
- //pr_err("cas_value(): %d\n", *v);
- ret = ck_pr_cas_uint(target, old, new);
- WRITE_ONCE(*v, ret ? old : new);
- return ret;
-}
-
-/* http://concurrencykit.org/doc/ck_pr_store.html */
-// TODO: compiler barrier?
-#define ck_pr_store_uint(A, B) atomic_set((A), (B))
-
/*
* Concurrent ring buffer.
*/
struct ck_ring {
- /* TODO: is the aligment correct? */
atomic_t c_head ____cacheline_aligned_in_smp;
atomic_t p_tail ____cacheline_aligned_in_smp;
atomic_t p_head;
@@ -118,11 +66,10 @@ static inline int ck_ring_init(struct ck_ring *ring, uint size, gfp_t gfp)
if (!ring->queue)
return -ENOMEM;
- memset(ring->queue, 0x77, sizeof(void *) * size);
return 0;
}
-CK_CC_FORCE_INLINE static bool
+__always_inline static bool
_ck_ring_enqueue_mp(struct ck_ring *ring, const void *entry, unsigned int ts,
unsigned int *size)
{
@@ -131,15 +78,15 @@ _ck_ring_enqueue_mp(struct ck_ring *ring, const void *entry, unsigned int ts,
void *buffer;
bool r = true;
- producer = ck_pr_load_uint(&ring->p_head);
+ producer = atomic_read(&ring->p_head);
for (;;) {
/*
* The snapshot of producer must be up to date with respect to
* consumer.
*/
- ck_pr_fence_load();
- consumer = ck_pr_load_uint(&ring->c_head);
+ smp_rmb();
+ consumer = atomic_read(&ring->c_head);
delta = (producer + 1) & mask;
@@ -148,10 +95,10 @@ _ck_ring_enqueue_mp(struct ck_ring *ring, const void *entry, unsigned int ts,
* less than consumer) and the buffer is definitely not full.
*/
if (likely((producer - consumer) < mask)) {
- if (ck_pr_cas_uint_value(&ring->p_head,
- producer, delta, &producer) == true) {
+ if (atomic_cmpxchg(&ring->p_head, producer, delta) == producer)
break;
- }
+
+ producer = delta;
} else {
unsigned int new_producer;
@@ -161,8 +108,8 @@ _ck_ring_enqueue_mp(struct ck_ring *ring, const void *entry, unsigned int ts,
* p_read that must be ordered wrt the snapshot of
* c_head.
*/
- ck_pr_fence_load();
- new_producer = ck_pr_load_uint(&ring->p_head);
+ smp_rmb();
+ new_producer = atomic_read(&ring->p_head);
/*
* Only fail if we haven't made forward progress in
@@ -183,22 +130,21 @@ _ck_ring_enqueue_mp(struct ck_ring *ring, const void *entry, unsigned int ts,
}
buffer = (char *)ring->queue + ts * producer;
- //pr_err("memcpy(%p, %p, %u)", buffer, entry, ts);
memcpy(buffer, entry, ts);
/*
* Wait until all concurrent producers have completed writing
* their data into the ring buffer.
*/
- while (ck_pr_load_uint(&ring->p_tail) != producer)
- ck_pr_stall();
+ while (atomic_read(&ring->p_tail) != producer)
+ cpu_relax();
/*
* Ensure that copy is completed before updating shared producer
* counter.
*/
- ck_pr_fence_store();
- ck_pr_store_uint(&ring->p_tail, delta);
+ smp_wmb();
+ atomic_set(&ring->p_tail, delta);
leave:
if (size != NULL)
@@ -207,7 +153,7 @@ leave:
return r;
}
-CK_CC_FORCE_INLINE static bool
+__always_inline static bool
_ck_ring_enqueue_mp_size(struct ck_ring *ring, const void *entry,
unsigned int ts, unsigned int *size)
{
@@ -219,7 +165,7 @@ _ck_ring_enqueue_mp_size(struct ck_ring *ring, const void *entry,
return r;
}
-CK_CC_FORCE_INLINE static bool
+__always_inline static bool
_ck_ring_trydequeue_mc(struct ck_ring *ring,
void *data, unsigned int size)
{
@@ -227,30 +173,31 @@ _ck_ring_trydequeue_mc(struct ck_ring *ring,
unsigned int consumer, producer;
const void *buffer;
- consumer = ck_pr_load_uint(&ring->c_head);
- ck_pr_fence_load();
- producer = ck_pr_load_uint(&ring->p_tail);
+ consumer = atomic_read(&ring->c_head);
+ smp_rmb();
+ producer = atomic_read(&ring->p_tail);
if (unlikely(consumer == producer))
return false;
- ck_pr_fence_load();
+ smp_rmb();
buffer = (const char *)ring->queue + size * consumer;
memcpy(data, buffer, size);
ck_pr_fence_store_atomic();
- return ck_pr_cas_uint(&ring->c_head, consumer, (consumer + 1) & mask);
+ return atomic_cmpxchg(&ring->c_head, consumer, (consumer + 1) & mask) == consumer;
}
-CK_CC_FORCE_INLINE static bool
+__always_inline static bool
_ck_ring_dequeue_mc(struct ck_ring *ring,
void *data, unsigned int ts)
{
const unsigned int mask = ring->mask;
- unsigned int consumer, producer;
+ unsigned int consumer, producer, delta;
+ bool cmp;
- consumer = ck_pr_load_uint(&ring->c_head);
+ consumer = atomic_read(&ring->c_head);
do {
const char *target;
@@ -259,24 +206,102 @@ _ck_ring_dequeue_mc(struct ck_ring *ring,
* Producer counter must represent state relative to
* our latest consumer snapshot.
*/
- ck_pr_fence_load();
- producer = ck_pr_load_uint(&ring->p_tail);
+ smp_rmb();
+ producer = atomic_read(&ring->p_tail);
if (unlikely(consumer == producer))
return false;
- ck_pr_fence_load();
+ smp_rmb();
target = (const char *)ring->queue + ts * consumer;
memcpy(data, target, ts);
/* Serialize load with respect to head update. */
ck_pr_fence_store_atomic();
- } while (ck_pr_cas_uint_value(&ring->c_head,
- consumer,
- (consumer + 1) & mask,
- &consumer) == false);
+ delta = (consumer + 1) & mask;
+ cmp = atomic_cmpxchg(&ring->c_head, consumer, delta) == consumer;
+ consumer = delta;
+ } while (!cmp);
+
+ return true;
+}
+
+__always_inline static bool
+_ck_ring_enqueue_sp(struct ck_ring *ring, const void *entry,
+ unsigned int ts,
+ unsigned int *size)
+{
+ const unsigned int mask = ring->mask;
+ unsigned int consumer, producer, delta;
+ void *buffer;
+
+ consumer = atomic_read(&ring->c_head);
+ producer = atomic_read(&ring->p_tail);
+ delta = producer + 1;
+ if (size != NULL)
+ *size = (producer - consumer) & mask;
+
+ if (unlikely((delta & mask) == (consumer & mask)))
+ return false;
+
+ buffer = (char *)buffer + ts * (producer & mask);
+ memcpy(buffer, entry, ts);
+
+ /*
+ * Make sure to update slot value before indicating
+ * that the slot is available for consumption.
+ */
+ smp_wmb();
+ atomic_set(&ring->p_tail, delta);
+ return true;
+}
+
+__always_inline static bool
+_ck_ring_enqueue_sp_size(struct ck_ring *ring,
+ const void *entry,
+ unsigned int ts,
+ unsigned int *size)
+{
+ unsigned int sz;
+ bool r;
+
+ r = _ck_ring_enqueue_sp(ring, entry, ts, &sz);
+ *size = sz;
+ return r;
+}
+
+__always_inline static bool
+_ck_ring_dequeue_sc(struct ck_ring *ring,
+ void *target,
+ unsigned int size)
+{
+ const unsigned int mask = ring->mask;
+ unsigned int consumer, producer;
+ const void *buffer;
+
+ consumer = atomic_read(&ring->c_head);
+ producer = atomic_read(&ring->p_tail);
+
+ if (unlikely(consumer == producer))
+ return false;
+
+ /*
+ * Make sure to serialize with respect to our snapshot
+ * of the producer counter.
+ */
+ smp_rmb();
+
+ buffer = (const char *)buffer + size * (consumer & mask);
+ memcpy(target, buffer, size);
+
+ /*
+ * Make sure copy is completed with respect to consumer
+ * update.
+ */
+ smp_wmb();
+ atomic_set(&ring->c_head, consumer + 1);
return true;
}
@@ -284,11 +309,11 @@ static __always_inline bool mpmc_ring_empty(struct ck_ring *ring)
{
uint producer, consumer;
- consumer = ck_pr_load_uint(&ring->c_head);
- ck_pr_fence_load();
- producer = ck_pr_load_uint(&ring->p_tail);
+ consumer = atomic_read(&ring->c_head);
+ smp_rmb();
+ producer = atomic_read(&ring->p_tail);
- ck_pr_fence_load();
+ smp_rmb();
return producer == consumer;
}
@@ -305,11 +330,11 @@ static __always_inline bool mpmc_ptr_ring_peek(struct ck_ring *ring, void *data,
const unsigned int mask = ring->mask;
void *buffer;
- consumer = ck_pr_load_uint(&ring->c_head);
- ck_pr_fence_load();
- producer = ck_pr_load_uint(&ring->p_tail);
+ consumer = atomic_read(&ring->c_head);
+ smp_rmb();
+ producer = atomic_read(&ring->p_tail);
- ck_pr_fence_load();
+ smp_rmb();
if (unlikely(producer == consumer)) {
data = NULL;
@@ -325,7 +350,7 @@ static __always_inline bool mpmc_ptr_ring_peek(struct ck_ring *ring, void *data,
static __always_inline void mpmc_ptr_ring_discard(struct ck_ring *ring)
{
const unsigned int mask = ring->mask;
- unsigned int consumer = consumer = ck_pr_load_uint(&ring->c_head);
+ unsigned int consumer = atomic_read(&ring->c_head);
atomic_set(&ring->c_head, (consumer + 1) & mask);
}
@@ -335,26 +360,26 @@ static __always_inline void mpmc_ptr_ring_discard(struct ck_ring *ring)
* ring buffer containing pointers. Correctness is provided for any number of
* producers and consumers.
*/
-CK_CC_INLINE static bool
+inline static bool
ck_ring_enqueue_mpmc(struct ck_ring *ring, const void *entry)
{
return _ck_ring_enqueue_mp(ring, &entry, sizeof(entry), NULL);
}
-CK_CC_INLINE static bool
+inline static bool
ck_ring_enqueue_mpmc_size(struct ck_ring *ring, const void *entry,
unsigned int *size)
{
return _ck_ring_enqueue_mp_size(ring, &entry, sizeof(entry), size);
}
-CK_CC_INLINE static bool
+inline static bool
ck_ring_trydequeue_mpmc(struct ck_ring *ring, void *data)
{
return _ck_ring_trydequeue_mc(ring, (void **)data, sizeof(void *));
}
-CK_CC_INLINE static bool
+inline static bool
ck_ring_dequeue_mpmc(struct ck_ring *ring, void *data)
{
return _ck_ring_dequeue_mc(ring, (void **)data, sizeof(void *));