diff options
-rw-r--r-- | src/mpmc_ring.h | 329 | ||||
-rw-r--r-- | src/queueing.h | 2 |
2 files changed, 331 insertions, 0 deletions
diff --git a/src/mpmc_ring.h b/src/mpmc_ring.h new file mode 100644 index 0000000..32f6048 --- /dev/null +++ b/src/mpmc_ring.h @@ -0,0 +1,329 @@ +/* + * Copyright 2009-2015 Samy Al Bahra. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#ifndef MPMC_RING_H +#define MPMC_RING_H + +#ifndef CK_CC_INLINE +#define CK_CC_INLINE inline /* inline is discouraged in the kernel */ +#endif + +#ifndef CK_CC_FORCE_INLINE +#define CK_CC_FORCE_INLINE __always_inline +#endif + +#ifndef CK_MD_CACHELINE +#define CK_MD_CACHELINE (64) +#endif + +#include <stdbool.h> +#include <linux/string.h> + +#include <linux/processor.h> +#include <linux/compiler.h> +#include <linux/atomic.h> +//#define likely(x) (__builtin_expect(!!(x), 1)) +//#define unlikely(x) (__builtin_expect(!!(x), 0)) +//#define cpu_relax() + +//#define ck_pr_load_uint(SRC) CK_PR_LOAD_SAFE((SRC), uint) + +/* http://concurrencykit.org/doc/ck_pr_load.html */ +#define ck_pr_load_uint(SRC) READ_ONCE(*(SRC)) + +/* http://concurrencykit.org/doc/ck_pr_fence_load.html */ +#define ck_pr_fence_load() smp_rmb() + +/* http://concurrencykit.org/doc/ck_pr_fence_store.html */ +#define ck_pr_fence_store() smp_wmb() + +/* http://concurrencykit.org/doc/ck_pr_stall.html */ +#define ck_pr_stall() cpu_relax() + +/* http://concurrencykit.org/doc/ck_pr_fence_store_atomic.html */ +#define ck_pr_fence_store_atomic() + +/* http://concurrencykit.org/doc/ck_pr_cas.html */ +#define ck_pr_cas_uint_value(A, B, C, D) 1 + +/* http://concurrencykit.org/doc/ck_pr_cas.html */ +// long cas(long *mem, long old, long new); +#define ck_pr_cas_uint(t, old, new) atomic_cmpxchg(t, old, new) + +/* http://concurrencykit.org/doc/ck_pr_store.html */ +// TODO: compiler barrier? +#define ck_pr_store_uint(A, B) atomic_set((A), (B)) + +/* + * Concurrent ring buffer. + */ + +struct ck_ring { + //unsigned int c_head; + atomic_t c_head; + /* TODO: use ____cacheline_aligned_in_smp or someting like that */ + char pad[CK_MD_CACHELINE - sizeof(unsigned int)]; + unsigned int p_tail; + //unsigned int p_head; + atomic_t p_head; + char _pad[CK_MD_CACHELINE - sizeof(unsigned int) * 2]; + unsigned int size; + unsigned int mask; +}; +typedef struct ck_ring ck_ring_t; + +struct ck_ring_buffer { + void *value; +}; +typedef struct ck_ring_buffer ck_ring_buffer_t; + +CK_CC_INLINE static void +ck_ring_init(struct ck_ring *ring, unsigned int size) +{ + + ring->size = size; + ring->mask = size - 1; + ring->p_tail = 0; + atomic_set(&ring->p_head, 0); // TODO: barrier? + atomic_set(&ring->c_head, 0); + return; +} + +CK_CC_FORCE_INLINE static bool +_ck_ring_enqueue_mp(struct ck_ring *ring, + void *buffer, + const void *entry, + unsigned int ts, + unsigned int *size) +{ + const unsigned int mask = ring->mask; + unsigned int producer, consumer, delta; + bool r = true; + + producer = ck_pr_load_uint(&ring->p_head); + + for (;;) { + /* + * The snapshot of producer must be up to date with respect to + * consumer. + */ + ck_pr_fence_load(); + consumer = ck_pr_load_uint(&ring->c_head); + + delta = producer + 1; + + /* + * Only try to CAS if the producer is not clearly stale (not + * less than consumer) and the buffer is definitely not full. + */ + if (likely((producer - consumer) < mask)) { + if (ck_pr_cas_uint_value(&ring->p_head, + producer, delta, &producer) == true) { + break; + } + } else { + unsigned int new_producer; + + /* + * Slow path. Either the buffer is full or we have a + * stale snapshot of p_head. Execute a second read of + * p_read that must be ordered wrt the snapshot of + * c_head. + */ + ck_pr_fence_load(); + new_producer = ck_pr_load_uint(&ring->p_head); + + /* + * Only fail if we haven't made forward progress in + * production: the buffer must have been full when we + * read new_producer (or we wrapped around UINT_MAX + * during this iteration). + */ + if (producer == new_producer) { + r = false; + goto leave; + } + + /* + * p_head advanced during this iteration. Try again. + */ + producer = new_producer; + } + } + + buffer = (char *)buffer + ts * (producer & mask); + memcpy(buffer, entry, ts); + + /* + * Wait until all concurrent producers have completed writing + * their data into the ring buffer. + */ + while (ck_pr_load_uint(&ring->p_tail) != producer) + ck_pr_stall(); + + /* + * Ensure that copy is completed before updating shared producer + * counter. + */ + ck_pr_fence_store(); + ck_pr_store_uint(&ring->p_tail, delta); + +leave: + if (size != NULL) + *size = (producer - consumer) & mask; + + return r; +} + +CK_CC_FORCE_INLINE static bool +_ck_ring_enqueue_mp_size(struct ck_ring *ring, + void *buffer, + const void *entry, + unsigned int ts, + unsigned int *size) +{ + unsigned int sz; + bool r; + + r = _ck_ring_enqueue_mp(ring, buffer, entry, ts, &sz); + *size = sz; + return r; +} + +CK_CC_FORCE_INLINE static bool +_ck_ring_trydequeue_mc(struct ck_ring *ring, + const void *buffer, + void *data, + unsigned int size) +{ + const unsigned int mask = ring->mask; + //unsigned int consumer, producer; + unsigned int producer; + atomic_t consumer; + + consumer = ck_pr_load_uint(&ring->c_head); + ck_pr_fence_load(); + producer = ck_pr_load_uint(&ring->p_tail); + + if (unlikely(atomic_read(&consumer) == producer)) + //if (unlikely(consumer == producer)) + return false; + + ck_pr_fence_load(); + + buffer = (const char *)buffer + size * (consumer & mask); + memcpy(data, buffer, size); + + ck_pr_fence_store_atomic(); + return ck_pr_cas_uint(&ring->c_head, consumer, consumer + 1); +} + +CK_CC_FORCE_INLINE static bool +_ck_ring_dequeue_mc(struct ck_ring *ring, + const void *buffer, + void *data, + unsigned int ts) +{ + const unsigned int mask = ring->mask; + unsigned int consumer, producer; + + consumer = ck_pr_load_uint(&ring->c_head); + + do { + const char *target; + + /* + * Producer counter must represent state relative to + * our latest consumer snapshot. + */ + ck_pr_fence_load(); + producer = ck_pr_load_uint(&ring->p_tail); + + if (unlikely(consumer == producer)) + return false; + + ck_pr_fence_load(); + + target = (const char *)buffer + ts * (consumer & mask); + memcpy(data, target, ts); + + /* Serialize load with respect to head update. */ + ck_pr_fence_store_atomic(); + } while (ck_pr_cas_uint_value(&ring->c_head, + consumer, + consumer + 1, + &consumer) == false); + + return true; +} + +/* + * The ck_ring_*_mpmc namespace is the public interface for interacting with a + * ring buffer containing pointers. Correctness is provided for any number of + * producers and consumers. + */ +CK_CC_INLINE static bool +ck_ring_enqueue_mpmc(struct ck_ring *ring, + struct ck_ring_buffer *buffer, + const void *entry) +{ + + return _ck_ring_enqueue_mp(ring, buffer, &entry, + sizeof(entry), NULL); +} + +CK_CC_INLINE static bool +ck_ring_enqueue_mpmc_size(struct ck_ring *ring, + struct ck_ring_buffer *buffer, + const void *entry, + unsigned int *size) +{ + + return _ck_ring_enqueue_mp_size(ring, buffer, &entry, + sizeof(entry), size); +} + +CK_CC_INLINE static bool +ck_ring_trydequeue_mpmc(struct ck_ring *ring, + const struct ck_ring_buffer *buffer, + void *data) +{ + + return _ck_ring_trydequeue_mc(ring, + buffer, (void **)data, sizeof(void *)); +} + +CK_CC_INLINE static bool +ck_ring_dequeue_mpmc(struct ck_ring *ring, + const struct ck_ring_buffer *buffer, + void *data) +{ + + return _ck_ring_dequeue_mc(ring, buffer, (void **)data, + sizeof(void *)); +} + +#endif /* _WG_MPMC_RING_H */ diff --git a/src/queueing.h b/src/queueing.h index 0057cfa..7b30733 100644 --- a/src/queueing.h +++ b/src/queueing.h @@ -12,6 +12,8 @@ #include <linux/ip.h> #include <linux/ipv6.h> +#include "mpmc_ring.h" + struct wireguard_device; struct wireguard_peer; struct multicore_worker; |