From 52ca84907b8048f6f81026ad9956793bf594519d Mon Sep 17 00:00:00 2001 From: Jonathan Neuschäfer Date: Thu, 21 Jun 2018 00:56:05 +0200 Subject: [BROKEN] mpmc_ptr_ring: Rely on null pointers to avoid p_tail This should fix a performance problem when two (or more) producers run on the same CPU. [ TODO: more text ] previously: Thread A Thread B producer_head=1 write item producer_head=2 write item producer_tail=1 producer_head=3 write item ... with this patch: Thread A Thread B producer_head=1 write item producer_head=2 write item producer_head=3 write item producer_head=4 write item producer_head=5 write item producer_head=6 write item producer_head=7 write item ... --- src/mpmc_ptr_ring.h | 61 ++++++++++++++++++++--------------------------------- 1 file changed, 23 insertions(+), 38 deletions(-) (limited to 'src/mpmc_ptr_ring.h') diff --git a/src/mpmc_ptr_ring.h b/src/mpmc_ptr_ring.h index da9393e..deadb54 100644 --- a/src/mpmc_ptr_ring.h +++ b/src/mpmc_ptr_ring.h @@ -14,24 +14,21 @@ * * +-----------------------------------------------+ * index | 0| 1| 2| 3| 4| 5| 6| 7| 8| 9|10|11|12|13|14|15| - * state |--|--|--|**|**|**|**|**|**|**|++|++|++|--|--|--| + * state |--|--|--|**|**|**|**|**|**|**|--|**|--|--|--|--| * +-----------------------------------------------+ - * ^ ^ ^ - * consumer head | producer head - * producer tail + * ^ ^ + * consumer head producer head + * * Possible states: * - * -- : unoccupied - * ++ : being written + * -- : unoccupied (NULL) * ** : occupied * * Differences between ptr_ring.h and this implementation: - * - An additional producer tail pointer, which allows multiple enqueue - * operations to be in progress at the same time. * - No consumer tail pointer, for simplicity (although I expect it can be * added later) * - Most importantly: No spinlocks. - * - The head/tail pointers (or rather: indices) are stored untrimmed, i.e. + * - The head pointers (or rather: indices) are stored untrimmed, i.e. * without the bit mask (size - 1) applied, because that's how ConcurrencyKit * does it. * @@ -58,28 +55,28 @@ struct mpmc_ptr_ring { /* consumer_head: updated in _consume; read in _produce */ atomic_t consumer_head ____cacheline_aligned_in_smp; - /* producer_{head,tail}: updated in _produce */ + /* producer_head: updated in _produce */ atomic_t producer_head ____cacheline_aligned_in_smp; - atomic_t producer_tail; }; static inline bool mpmc_ptr_ring_empty(struct mpmc_ptr_ring *r) { - unsigned int ptail, chead; + unsigned int phead, chead; /* Order the following reads against earlier stuff */ smp_rmb(); - ptail = atomic_read(&r->producer_tail); + phead = atomic_read(&r->producer_head); chead = atomic_read(&r->consumer_head); - return chead == ptail; + return chead == phead; } static inline int mpmc_ptr_ring_produce(struct mpmc_ptr_ring *r, void *ptr) { int p, c; unsigned int mask = r->mask; + void *element; p = atomic_read(&r->producer_head); @@ -88,7 +85,9 @@ static inline int mpmc_ptr_ring_produce(struct mpmc_ptr_ring *r, void *ptr) c = atomic_read(&r->consumer_head); if (likely((p - c) < mask)) { - if (atomic_try_cmpxchg_relaxed(&r->producer_head, &p, p + 1)) + /* If this slot is empty (cleared by the consumer), try to claim it */ + element = READ_ONCE(r->queue[p & mask]); + if (!element && atomic_try_cmpxchg_relaxed(&r->producer_head, &p, p + 1)) break; } else { int new_p; @@ -104,18 +103,6 @@ static inline int mpmc_ptr_ring_produce(struct mpmc_ptr_ring *r, void *ptr) } WRITE_ONCE(r->queue[p & mask], ptr); - - /* Wait until it's our turn to update the producer tail pointer */ - while (atomic_read(&r->producer_tail) != p) - cpu_relax(); - - /* - * Make sure the WRITE_ONCE above becomes visible before producer_tail - * is updated. - */ - smp_wmb(); - atomic_set(&r->producer_tail, p + 1); - return 0; } @@ -131,7 +118,7 @@ static inline void *mpmc_ptr_ring_consume(struct mpmc_ptr_ring *r) /* Fetch consumer_head first. */ smp_rmb(); - p = atomic_read(&r->producer_tail); + p = atomic_read(&r->producer_head); /* Is the ring empty? */ if (unlikely(p == c)) @@ -139,12 +126,19 @@ static inline void *mpmc_ptr_ring_consume(struct mpmc_ptr_ring *r) element = READ_ONCE(r->queue[c & mask]); + /* Nothing there? give up */ + if (!element) + return NULL; + /* * Stores to consumer_head must be completed before we update * the head, so we use *_release. */ } while (!atomic_try_cmpxchg_release(&r->consumer_head, &c, c + 1)); + /* mark the slot as empty */ + WRITE_ONCE(r->queue[c & mask], NULL); + return element; } @@ -160,7 +154,6 @@ static inline int mpmc_ptr_ring_init(struct mpmc_ptr_ring *r, unsigned int size, r->mask = size - 1; atomic_set(&r->consumer_head, 0); atomic_set(&r->producer_head, 0); - atomic_set(&r->producer_tail, 0); r->queue = kcalloc(size, sizeof(r->queue[0]), gfp); if (!r->queue) @@ -188,7 +181,7 @@ static inline void mpmc_ptr_ring_cleanup(struct mpmc_ptr_ring *r, void (*destroy */ static inline void *__mpmc_ptr_ring_peek(struct mpmc_ptr_ring *r) { - unsigned int c, p; + unsigned int c; unsigned int mask = r->mask; void *element; @@ -197,14 +190,6 @@ static inline void *__mpmc_ptr_ring_peek(struct mpmc_ptr_ring *r) /* Fetch consumer_head first */ smp_rmb(); - p = atomic_read(&r->producer_tail); - - if (unlikely(c == p)) - return NULL; - - /* TODO */ - smp_rmb(); - element = READ_ONCE(r->queue[c & mask]); return element; -- cgit v1.2.3-59-g8ed1b