From dc94e7afde5e2517a93283f5edd7902b94ceb9e1 Mon Sep 17 00:00:00 2001 From: Jonathan Neuschäfer Date: Tue, 29 May 2018 17:11:27 +0200 Subject: mpmc_ptr_ring: Reduce the memory barrier usage --- src/mpmc_ptr_ring.h | 48 ++++++++++++++++++++++-------------------------- 1 file changed, 22 insertions(+), 26 deletions(-) (limited to 'src/mpmc_ptr_ring.h') diff --git a/src/mpmc_ptr_ring.h b/src/mpmc_ptr_ring.h index be7c2ff..d628ea0 100644 --- a/src/mpmc_ptr_ring.h +++ b/src/mpmc_ptr_ring.h @@ -53,13 +53,12 @@ static inline bool mpmc_ptr_ring_empty(struct mpmc_ptr_ring *r) { size_t ptail, chead; - mb(); /* TODO: think about barriers */ + /* Order the following reads against earlier stuff */ + rmb(); ptail = atomic_long_read(&r->producer_tail); chead = atomic_long_read(&r->consumer_head); - mb(); /* TODO: think about barriers */ - return chead == ptail; } @@ -73,7 +72,6 @@ static inline int mpmc_ptr_ring_produce(struct mpmc_ptr_ring *r, void *ptr) for (;;) { rmb(); /* TODO */ c = atomic_long_read(&r->consumer_head); - mb(); if ((p - c) < mask) { /* fast path */ if (atomic_long_cmpxchg(&r->producer_head, p, p + 1) == p) @@ -81,9 +79,8 @@ static inline int mpmc_ptr_ring_produce(struct mpmc_ptr_ring *r, void *ptr) } else { size_t new_p; - mb(); + rmb(); new_p = atomic_long_read(&r->producer_head); - mb(); if (new_p == p) return -ENOSPC; @@ -92,17 +89,18 @@ static inline int mpmc_ptr_ring_produce(struct mpmc_ptr_ring *r, void *ptr) } } - mb(); /* barriers? */ WRITE_ONCE(r->queue[p & mask], ptr); - mb(); /* barriers? */ /* Wait until it's our term to update the producer tail pointer */ while(atomic_long_read(&r->producer_tail) != p) cpu_relax(); - smp_mb__before_atomic(); + /* + * Make sure the WRITE_ONCE above becomes visible before producer_tail + * is updated. + */ + wmb(); atomic_long_set(&r->producer_tail, p + 1); - smp_mb__after_atomic(); return 0; } @@ -114,11 +112,12 @@ static inline void *mpmc_ptr_ring_consume(struct mpmc_ptr_ring *r) size_t mask = r->size - 1; for (;;) { - mb(); // TODO: check c = atomic_long_read(&r->consumer_head); - mb(); // TODO: check + + /* Fetch consumer_head first. */ + rmb(); + p = atomic_long_read(&r->producer_tail); - mb(); // TODO: check /* Is the ring empty? */ if (p == c) @@ -126,13 +125,13 @@ static inline void *mpmc_ptr_ring_consume(struct mpmc_ptr_ring *r) element = READ_ONCE(r->queue[c & mask]); - mb(); // TODO: check + /* TODO: Why? */ + rmb(); old_c = atomic_long_cmpxchg(&r->consumer_head, c, c + 1); if (old_c == c) break; } - mb(); // TODO: check return element; } @@ -151,8 +150,6 @@ static inline int mpmc_ptr_ring_init(struct mpmc_ptr_ring *r, int size, gfp_t gf if (!r->queue) return -ENOMEM; - mb(); /* TODO: check */ - return 0; } @@ -179,20 +176,20 @@ static inline void *__mpmc_ptr_ring_peek(struct mpmc_ptr_ring *r) size_t mask = r->size - 1; void *element; - mb(); // TODO: check c = atomic_long_read(&r->consumer_head); - mb(); // TODO: check - p = atomic_long_read(&r->producer_tail); - mb(); // TODO: check - //pr_info("%s %px %u, at %zu/%zu\n", __func__, r, current->pid, c, p); + /* Fetch consumer_head first */ + rmb(); + + p = atomic_long_read(&r->producer_tail); if (c == p) return NULL; - mb(); // TODO: check + /* TODO */ + rmb(); + element = READ_ONCE(r->queue[c & mask]); - mb(); // TODO: check return element; } @@ -206,7 +203,6 @@ static inline void *__mpmc_ptr_ring_peek(struct mpmc_ptr_ring *r) */ static inline void __mpmc_ptr_ring_discard_one(struct mpmc_ptr_ring *r) { - mb(); // TODO: check + smp_mb__before_atomic(); atomic_long_inc(&r->consumer_head); - mb(); // TODO: check } -- cgit v1.2.3-59-g8ed1b