aboutsummaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
authorJonathan Neuschäfer <j.neuschaefer@gmx.net>2018-06-21 00:56:05 +0200
committerJonathan Neuschäfer <j.neuschaefer@gmx.net>2018-06-21 00:56:05 +0200
commit52ca84907b8048f6f81026ad9956793bf594519d (patch)
tree71c6b31796c0ce60fd3a14b78834a8f02b1edf3d
parentmpmc_ptr_ring: add {,un}likely() annotations (diff)
downloadwireguard-monolithic-historical-jn/mpmc-null.tar.xz
wireguard-monolithic-historical-jn/mpmc-null.zip
[BROKEN] mpmc_ptr_ring: Rely on null pointers to avoid p_tailjn/mpmc-null
This should fix a performance problem when two (or more) producers run on the same CPU. [ TODO: more text ] previously: Thread A Thread B producer_head=1 write item <preemptive context switch> producer_head=2 write item <stalled because producer_tail is still 0> <preemptive context switch> producer_tail=1 producer_head=3 write item <stalled because producer_tail is still 1> ... with this patch: Thread A Thread B producer_head=1 write item <preemptive context switch> producer_head=2 write item producer_head=3 write item producer_head=4 write item producer_head=5 write item <preemptive context switch> producer_head=6 write item producer_head=7 write item ...
-rw-r--r--src/mpmc_ptr_ring.h61
1 files changed, 23 insertions, 38 deletions
diff --git a/src/mpmc_ptr_ring.h b/src/mpmc_ptr_ring.h
index da9393e..deadb54 100644
--- a/src/mpmc_ptr_ring.h
+++ b/src/mpmc_ptr_ring.h
@@ -14,24 +14,21 @@
*
* +-----------------------------------------------+
* index | 0| 1| 2| 3| 4| 5| 6| 7| 8| 9|10|11|12|13|14|15|
- * state |--|--|--|**|**|**|**|**|**|**|++|++|++|--|--|--|
+ * state |--|--|--|**|**|**|**|**|**|**|--|**|--|--|--|--|
* +-----------------------------------------------+
- * ^ ^ ^
- * consumer head | producer head
- * producer tail
+ * ^ ^
+ * consumer head producer head
+ *
* Possible states:
*
- * -- : unoccupied
- * ++ : being written
+ * -- : unoccupied (NULL)
* ** : occupied
*
* Differences between ptr_ring.h and this implementation:
- * - An additional producer tail pointer, which allows multiple enqueue
- * operations to be in progress at the same time.
* - No consumer tail pointer, for simplicity (although I expect it can be
* added later)
* - Most importantly: No spinlocks.
- * - The head/tail pointers (or rather: indices) are stored untrimmed, i.e.
+ * - The head pointers (or rather: indices) are stored untrimmed, i.e.
* without the bit mask (size - 1) applied, because that's how ConcurrencyKit
* does it.
*
@@ -58,28 +55,28 @@ struct mpmc_ptr_ring {
/* consumer_head: updated in _consume; read in _produce */
atomic_t consumer_head ____cacheline_aligned_in_smp;
- /* producer_{head,tail}: updated in _produce */
+ /* producer_head: updated in _produce */
atomic_t producer_head ____cacheline_aligned_in_smp;
- atomic_t producer_tail;
};
static inline bool mpmc_ptr_ring_empty(struct mpmc_ptr_ring *r)
{
- unsigned int ptail, chead;
+ unsigned int phead, chead;
/* Order the following reads against earlier stuff */
smp_rmb();
- ptail = atomic_read(&r->producer_tail);
+ phead = atomic_read(&r->producer_head);
chead = atomic_read(&r->consumer_head);
- return chead == ptail;
+ return chead == phead;
}
static inline int mpmc_ptr_ring_produce(struct mpmc_ptr_ring *r, void *ptr)
{
int p, c;
unsigned int mask = r->mask;
+ void *element;
p = atomic_read(&r->producer_head);
@@ -88,7 +85,9 @@ static inline int mpmc_ptr_ring_produce(struct mpmc_ptr_ring *r, void *ptr)
c = atomic_read(&r->consumer_head);
if (likely((p - c) < mask)) {
- if (atomic_try_cmpxchg_relaxed(&r->producer_head, &p, p + 1))
+ /* If this slot is empty (cleared by the consumer), try to claim it */
+ element = READ_ONCE(r->queue[p & mask]);
+ if (!element && atomic_try_cmpxchg_relaxed(&r->producer_head, &p, p + 1))
break;
} else {
int new_p;
@@ -104,18 +103,6 @@ static inline int mpmc_ptr_ring_produce(struct mpmc_ptr_ring *r, void *ptr)
}
WRITE_ONCE(r->queue[p & mask], ptr);
-
- /* Wait until it's our turn to update the producer tail pointer */
- while (atomic_read(&r->producer_tail) != p)
- cpu_relax();
-
- /*
- * Make sure the WRITE_ONCE above becomes visible before producer_tail
- * is updated.
- */
- smp_wmb();
- atomic_set(&r->producer_tail, p + 1);
-
return 0;
}
@@ -131,7 +118,7 @@ static inline void *mpmc_ptr_ring_consume(struct mpmc_ptr_ring *r)
/* Fetch consumer_head first. */
smp_rmb();
- p = atomic_read(&r->producer_tail);
+ p = atomic_read(&r->producer_head);
/* Is the ring empty? */
if (unlikely(p == c))
@@ -139,12 +126,19 @@ static inline void *mpmc_ptr_ring_consume(struct mpmc_ptr_ring *r)
element = READ_ONCE(r->queue[c & mask]);
+ /* Nothing there? give up */
+ if (!element)
+ return NULL;
+
/*
* Stores to consumer_head must be completed before we update
* the head, so we use *_release.
*/
} while (!atomic_try_cmpxchg_release(&r->consumer_head, &c, c + 1));
+ /* mark the slot as empty */
+ WRITE_ONCE(r->queue[c & mask], NULL);
+
return element;
}
@@ -160,7 +154,6 @@ static inline int mpmc_ptr_ring_init(struct mpmc_ptr_ring *r, unsigned int size,
r->mask = size - 1;
atomic_set(&r->consumer_head, 0);
atomic_set(&r->producer_head, 0);
- atomic_set(&r->producer_tail, 0);
r->queue = kcalloc(size, sizeof(r->queue[0]), gfp);
if (!r->queue)
@@ -188,7 +181,7 @@ static inline void mpmc_ptr_ring_cleanup(struct mpmc_ptr_ring *r, void (*destroy
*/
static inline void *__mpmc_ptr_ring_peek(struct mpmc_ptr_ring *r)
{
- unsigned int c, p;
+ unsigned int c;
unsigned int mask = r->mask;
void *element;
@@ -197,14 +190,6 @@ static inline void *__mpmc_ptr_ring_peek(struct mpmc_ptr_ring *r)
/* Fetch consumer_head first */
smp_rmb();
- p = atomic_read(&r->producer_tail);
-
- if (unlikely(c == p))
- return NULL;
-
- /* TODO */
- smp_rmb();
-
element = READ_ONCE(r->queue[c & mask]);
return element;