aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/src/mpmc_ptr_ring.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mpmc_ptr_ring.h')
-rw-r--r--src/mpmc_ptr_ring.h61
1 files changed, 23 insertions, 38 deletions
diff --git a/src/mpmc_ptr_ring.h b/src/mpmc_ptr_ring.h
index da9393e..deadb54 100644
--- a/src/mpmc_ptr_ring.h
+++ b/src/mpmc_ptr_ring.h
@@ -14,24 +14,21 @@
*
* +-----------------------------------------------+
* index | 0| 1| 2| 3| 4| 5| 6| 7| 8| 9|10|11|12|13|14|15|
- * state |--|--|--|**|**|**|**|**|**|**|++|++|++|--|--|--|
+ * state |--|--|--|**|**|**|**|**|**|**|--|**|--|--|--|--|
* +-----------------------------------------------+
- * ^ ^ ^
- * consumer head | producer head
- * producer tail
+ * ^ ^
+ * consumer head producer head
+ *
* Possible states:
*
- * -- : unoccupied
- * ++ : being written
+ * -- : unoccupied (NULL)
* ** : occupied
*
* Differences between ptr_ring.h and this implementation:
- * - An additional producer tail pointer, which allows multiple enqueue
- * operations to be in progress at the same time.
* - No consumer tail pointer, for simplicity (although I expect it can be
* added later)
* - Most importantly: No spinlocks.
- * - The head/tail pointers (or rather: indices) are stored untrimmed, i.e.
+ * - The head pointers (or rather: indices) are stored untrimmed, i.e.
* without the bit mask (size - 1) applied, because that's how ConcurrencyKit
* does it.
*
@@ -58,28 +55,28 @@ struct mpmc_ptr_ring {
/* consumer_head: updated in _consume; read in _produce */
atomic_t consumer_head ____cacheline_aligned_in_smp;
- /* producer_{head,tail}: updated in _produce */
+ /* producer_head: updated in _produce */
atomic_t producer_head ____cacheline_aligned_in_smp;
- atomic_t producer_tail;
};
static inline bool mpmc_ptr_ring_empty(struct mpmc_ptr_ring *r)
{
- unsigned int ptail, chead;
+ unsigned int phead, chead;
/* Order the following reads against earlier stuff */
smp_rmb();
- ptail = atomic_read(&r->producer_tail);
+ phead = atomic_read(&r->producer_head);
chead = atomic_read(&r->consumer_head);
- return chead == ptail;
+ return chead == phead;
}
static inline int mpmc_ptr_ring_produce(struct mpmc_ptr_ring *r, void *ptr)
{
int p, c;
unsigned int mask = r->mask;
+ void *element;
p = atomic_read(&r->producer_head);
@@ -88,7 +85,9 @@ static inline int mpmc_ptr_ring_produce(struct mpmc_ptr_ring *r, void *ptr)
c = atomic_read(&r->consumer_head);
if (likely((p - c) < mask)) {
- if (atomic_try_cmpxchg_relaxed(&r->producer_head, &p, p + 1))
+ /* If this slot is empty (cleared by the consumer), try to claim it */
+ element = READ_ONCE(r->queue[p & mask]);
+ if (!element && atomic_try_cmpxchg_relaxed(&r->producer_head, &p, p + 1))
break;
} else {
int new_p;
@@ -104,18 +103,6 @@ static inline int mpmc_ptr_ring_produce(struct mpmc_ptr_ring *r, void *ptr)
}
WRITE_ONCE(r->queue[p & mask], ptr);
-
- /* Wait until it's our turn to update the producer tail pointer */
- while (atomic_read(&r->producer_tail) != p)
- cpu_relax();
-
- /*
- * Make sure the WRITE_ONCE above becomes visible before producer_tail
- * is updated.
- */
- smp_wmb();
- atomic_set(&r->producer_tail, p + 1);
-
return 0;
}
@@ -131,7 +118,7 @@ static inline void *mpmc_ptr_ring_consume(struct mpmc_ptr_ring *r)
/* Fetch consumer_head first. */
smp_rmb();
- p = atomic_read(&r->producer_tail);
+ p = atomic_read(&r->producer_head);
/* Is the ring empty? */
if (unlikely(p == c))
@@ -139,12 +126,19 @@ static inline void *mpmc_ptr_ring_consume(struct mpmc_ptr_ring *r)
element = READ_ONCE(r->queue[c & mask]);
+ /* Nothing there? give up */
+ if (!element)
+ return NULL;
+
/*
* Stores to consumer_head must be completed before we update
* the head, so we use *_release.
*/
} while (!atomic_try_cmpxchg_release(&r->consumer_head, &c, c + 1));
+ /* mark the slot as empty */
+ WRITE_ONCE(r->queue[c & mask], NULL);
+
return element;
}
@@ -160,7 +154,6 @@ static inline int mpmc_ptr_ring_init(struct mpmc_ptr_ring *r, unsigned int size,
r->mask = size - 1;
atomic_set(&r->consumer_head, 0);
atomic_set(&r->producer_head, 0);
- atomic_set(&r->producer_tail, 0);
r->queue = kcalloc(size, sizeof(r->queue[0]), gfp);
if (!r->queue)
@@ -188,7 +181,7 @@ static inline void mpmc_ptr_ring_cleanup(struct mpmc_ptr_ring *r, void (*destroy
*/
static inline void *__mpmc_ptr_ring_peek(struct mpmc_ptr_ring *r)
{
- unsigned int c, p;
+ unsigned int c;
unsigned int mask = r->mask;
void *element;
@@ -197,14 +190,6 @@ static inline void *__mpmc_ptr_ring_peek(struct mpmc_ptr_ring *r)
/* Fetch consumer_head first */
smp_rmb();
- p = atomic_read(&r->producer_tail);
-
- if (unlikely(c == p))
- return NULL;
-
- /* TODO */
- smp_rmb();
-
element = READ_ONCE(r->queue[c & mask]);
return element;