aboutsummaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
-rw-r--r--src/mpmc_ring.h329
-rw-r--r--src/queueing.h2
2 files changed, 331 insertions, 0 deletions
diff --git a/src/mpmc_ring.h b/src/mpmc_ring.h
new file mode 100644
index 0000000..32f6048
--- /dev/null
+++ b/src/mpmc_ring.h
@@ -0,0 +1,329 @@
+/*
+ * Copyright 2009-2015 Samy Al Bahra.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#ifndef MPMC_RING_H
+#define MPMC_RING_H
+
+#ifndef CK_CC_INLINE
+#define CK_CC_INLINE inline /* inline is discouraged in the kernel */
+#endif
+
+#ifndef CK_CC_FORCE_INLINE
+#define CK_CC_FORCE_INLINE __always_inline
+#endif
+
+#ifndef CK_MD_CACHELINE
+#define CK_MD_CACHELINE (64)
+#endif
+
+#include <stdbool.h>
+#include <linux/string.h>
+
+#include <linux/processor.h>
+#include <linux/compiler.h>
+#include <linux/atomic.h>
+//#define likely(x) (__builtin_expect(!!(x), 1))
+//#define unlikely(x) (__builtin_expect(!!(x), 0))
+//#define cpu_relax()
+
+//#define ck_pr_load_uint(SRC) CK_PR_LOAD_SAFE((SRC), uint)
+
+/* http://concurrencykit.org/doc/ck_pr_load.html */
+#define ck_pr_load_uint(SRC) READ_ONCE(*(SRC))
+
+/* http://concurrencykit.org/doc/ck_pr_fence_load.html */
+#define ck_pr_fence_load() smp_rmb()
+
+/* http://concurrencykit.org/doc/ck_pr_fence_store.html */
+#define ck_pr_fence_store() smp_wmb()
+
+/* http://concurrencykit.org/doc/ck_pr_stall.html */
+#define ck_pr_stall() cpu_relax()
+
+/* http://concurrencykit.org/doc/ck_pr_fence_store_atomic.html */
+#define ck_pr_fence_store_atomic()
+
+/* http://concurrencykit.org/doc/ck_pr_cas.html */
+#define ck_pr_cas_uint_value(A, B, C, D) 1
+
+/* http://concurrencykit.org/doc/ck_pr_cas.html */
+// long cas(long *mem, long old, long new);
+#define ck_pr_cas_uint(t, old, new) atomic_cmpxchg(t, old, new)
+
+/* http://concurrencykit.org/doc/ck_pr_store.html */
+// TODO: compiler barrier?
+#define ck_pr_store_uint(A, B) atomic_set((A), (B))
+
+/*
+ * Concurrent ring buffer.
+ */
+
+struct ck_ring {
+ //unsigned int c_head;
+ atomic_t c_head;
+ /* TODO: use ____cacheline_aligned_in_smp or someting like that */
+ char pad[CK_MD_CACHELINE - sizeof(unsigned int)];
+ unsigned int p_tail;
+ //unsigned int p_head;
+ atomic_t p_head;
+ char _pad[CK_MD_CACHELINE - sizeof(unsigned int) * 2];
+ unsigned int size;
+ unsigned int mask;
+};
+typedef struct ck_ring ck_ring_t;
+
+struct ck_ring_buffer {
+ void *value;
+};
+typedef struct ck_ring_buffer ck_ring_buffer_t;
+
+CK_CC_INLINE static void
+ck_ring_init(struct ck_ring *ring, unsigned int size)
+{
+
+ ring->size = size;
+ ring->mask = size - 1;
+ ring->p_tail = 0;
+ atomic_set(&ring->p_head, 0); // TODO: barrier?
+ atomic_set(&ring->c_head, 0);
+ return;
+}
+
+CK_CC_FORCE_INLINE static bool
+_ck_ring_enqueue_mp(struct ck_ring *ring,
+ void *buffer,
+ const void *entry,
+ unsigned int ts,
+ unsigned int *size)
+{
+ const unsigned int mask = ring->mask;
+ unsigned int producer, consumer, delta;
+ bool r = true;
+
+ producer = ck_pr_load_uint(&ring->p_head);
+
+ for (;;) {
+ /*
+ * The snapshot of producer must be up to date with respect to
+ * consumer.
+ */
+ ck_pr_fence_load();
+ consumer = ck_pr_load_uint(&ring->c_head);
+
+ delta = producer + 1;
+
+ /*
+ * Only try to CAS if the producer is not clearly stale (not
+ * less than consumer) and the buffer is definitely not full.
+ */
+ if (likely((producer - consumer) < mask)) {
+ if (ck_pr_cas_uint_value(&ring->p_head,
+ producer, delta, &producer) == true) {
+ break;
+ }
+ } else {
+ unsigned int new_producer;
+
+ /*
+ * Slow path. Either the buffer is full or we have a
+ * stale snapshot of p_head. Execute a second read of
+ * p_read that must be ordered wrt the snapshot of
+ * c_head.
+ */
+ ck_pr_fence_load();
+ new_producer = ck_pr_load_uint(&ring->p_head);
+
+ /*
+ * Only fail if we haven't made forward progress in
+ * production: the buffer must have been full when we
+ * read new_producer (or we wrapped around UINT_MAX
+ * during this iteration).
+ */
+ if (producer == new_producer) {
+ r = false;
+ goto leave;
+ }
+
+ /*
+ * p_head advanced during this iteration. Try again.
+ */
+ producer = new_producer;
+ }
+ }
+
+ buffer = (char *)buffer + ts * (producer & mask);
+ memcpy(buffer, entry, ts);
+
+ /*
+ * Wait until all concurrent producers have completed writing
+ * their data into the ring buffer.
+ */
+ while (ck_pr_load_uint(&ring->p_tail) != producer)
+ ck_pr_stall();
+
+ /*
+ * Ensure that copy is completed before updating shared producer
+ * counter.
+ */
+ ck_pr_fence_store();
+ ck_pr_store_uint(&ring->p_tail, delta);
+
+leave:
+ if (size != NULL)
+ *size = (producer - consumer) & mask;
+
+ return r;
+}
+
+CK_CC_FORCE_INLINE static bool
+_ck_ring_enqueue_mp_size(struct ck_ring *ring,
+ void *buffer,
+ const void *entry,
+ unsigned int ts,
+ unsigned int *size)
+{
+ unsigned int sz;
+ bool r;
+
+ r = _ck_ring_enqueue_mp(ring, buffer, entry, ts, &sz);
+ *size = sz;
+ return r;
+}
+
+CK_CC_FORCE_INLINE static bool
+_ck_ring_trydequeue_mc(struct ck_ring *ring,
+ const void *buffer,
+ void *data,
+ unsigned int size)
+{
+ const unsigned int mask = ring->mask;
+ //unsigned int consumer, producer;
+ unsigned int producer;
+ atomic_t consumer;
+
+ consumer = ck_pr_load_uint(&ring->c_head);
+ ck_pr_fence_load();
+ producer = ck_pr_load_uint(&ring->p_tail);
+
+ if (unlikely(atomic_read(&consumer) == producer))
+ //if (unlikely(consumer == producer))
+ return false;
+
+ ck_pr_fence_load();
+
+ buffer = (const char *)buffer + size * (consumer & mask);
+ memcpy(data, buffer, size);
+
+ ck_pr_fence_store_atomic();
+ return ck_pr_cas_uint(&ring->c_head, consumer, consumer + 1);
+}
+
+CK_CC_FORCE_INLINE static bool
+_ck_ring_dequeue_mc(struct ck_ring *ring,
+ const void *buffer,
+ void *data,
+ unsigned int ts)
+{
+ const unsigned int mask = ring->mask;
+ unsigned int consumer, producer;
+
+ consumer = ck_pr_load_uint(&ring->c_head);
+
+ do {
+ const char *target;
+
+ /*
+ * Producer counter must represent state relative to
+ * our latest consumer snapshot.
+ */
+ ck_pr_fence_load();
+ producer = ck_pr_load_uint(&ring->p_tail);
+
+ if (unlikely(consumer == producer))
+ return false;
+
+ ck_pr_fence_load();
+
+ target = (const char *)buffer + ts * (consumer & mask);
+ memcpy(data, target, ts);
+
+ /* Serialize load with respect to head update. */
+ ck_pr_fence_store_atomic();
+ } while (ck_pr_cas_uint_value(&ring->c_head,
+ consumer,
+ consumer + 1,
+ &consumer) == false);
+
+ return true;
+}
+
+/*
+ * The ck_ring_*_mpmc namespace is the public interface for interacting with a
+ * ring buffer containing pointers. Correctness is provided for any number of
+ * producers and consumers.
+ */
+CK_CC_INLINE static bool
+ck_ring_enqueue_mpmc(struct ck_ring *ring,
+ struct ck_ring_buffer *buffer,
+ const void *entry)
+{
+
+ return _ck_ring_enqueue_mp(ring, buffer, &entry,
+ sizeof(entry), NULL);
+}
+
+CK_CC_INLINE static bool
+ck_ring_enqueue_mpmc_size(struct ck_ring *ring,
+ struct ck_ring_buffer *buffer,
+ const void *entry,
+ unsigned int *size)
+{
+
+ return _ck_ring_enqueue_mp_size(ring, buffer, &entry,
+ sizeof(entry), size);
+}
+
+CK_CC_INLINE static bool
+ck_ring_trydequeue_mpmc(struct ck_ring *ring,
+ const struct ck_ring_buffer *buffer,
+ void *data)
+{
+
+ return _ck_ring_trydequeue_mc(ring,
+ buffer, (void **)data, sizeof(void *));
+}
+
+CK_CC_INLINE static bool
+ck_ring_dequeue_mpmc(struct ck_ring *ring,
+ const struct ck_ring_buffer *buffer,
+ void *data)
+{
+
+ return _ck_ring_dequeue_mc(ring, buffer, (void **)data,
+ sizeof(void *));
+}
+
+#endif /* _WG_MPMC_RING_H */
diff --git a/src/queueing.h b/src/queueing.h
index 0057cfa..7b30733 100644
--- a/src/queueing.h
+++ b/src/queueing.h
@@ -12,6 +12,8 @@
#include <linux/ip.h>
#include <linux/ipv6.h>
+#include "mpmc_ring.h"
+
struct wireguard_device;
struct wireguard_peer;
struct multicore_worker;