aboutsummaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
authorJason A. Donenfeld <Jason@zx2c4.com>2017-09-15 23:58:38 +0200
committerJason A. Donenfeld <Jason@zx2c4.com>2017-09-16 00:16:17 +0200
commit4f739cddb9f37b9780c9b0f4d1222b0be8d7e723 (patch)
treeeb9f568c5c2e4345c852721adc49daa96eb12305
parentdata: switch to multiconsumer model with spinlocks (diff)
downloadWireGuard-jd/lockless-queuing.tar.xz
WireGuard-jd/lockless-queuing.zip
data: move from spinlocks to lockless data structurejd/lockless-queuing
Reference: https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html Original-code-from: @aegis
-rw-r--r--src/data.c79
-rw-r--r--src/device.h4
-rw-r--r--src/send.c2
-rw-r--r--src/tests/qemu/Makefile2
4 files changed, 57 insertions, 30 deletions
diff --git a/src/data.c b/src/data.c
index 0d01c64..080324d 100644
--- a/src/data.c
+++ b/src/data.c
@@ -43,8 +43,9 @@ struct multicore_worker __percpu *packet_alloc_percpu_multicore_worker(work_func
int packet_queue_init(struct crypt_queue *queue, work_func_t function, bool multicore)
{
- INIT_LIST_HEAD(&queue->queue);
- queue->len = 0;
+ queue->list.next = queue->list.prev = NULL;
+ queue->head = queue->tail = &queue->list;
+ atomic_set(&queue->len, 0);
spin_lock_init(&queue->lock);
if (multicore) {
queue->worker = packet_alloc_percpu_multicore_worker(function, queue);
@@ -55,32 +56,57 @@ int packet_queue_init(struct crypt_queue *queue, work_func_t function, bool mult
return 0;
}
+/* We don't yet know how to properly free things, so just leak memory. :( */
+#define kmem_cache_free(a, b)
+
static struct list_head *queue_dequeue(struct crypt_queue *queue)
{
- struct list_head *node;
- spin_lock_bh(&queue->lock);
- node = queue->queue.next;
- if (&queue->queue == node) {
- spin_unlock_bh(&queue->lock);
- return NULL;
+ struct list_head *head, *tail, *head_next, *tail_next;
+ for (;;) {
+ head = READ_ONCE(queue->head);
+ tail = READ_ONCE(queue->tail);
+ head_next = READ_ONCE(head->next);
+ tail_next = READ_ONCE(tail->next);
+ /* Ensure our pointers are in sync */
+ if (head == READ_ONCE(queue->head)) {
+ if (head == tail) {
+ if (head_next == NULL)/* Nothing to dequeue */
+ break;
+ else /* Tail is behind, advance it */
+ cmpxchg(&queue->tail, tail, tail_next);
+ } else if (cmpxchg(&queue->head, head, head_next))
+ break; /* Dequeue successful */
+ }
}
- list_del(node);
- --queue->len;
- spin_unlock_bh(&queue->lock);
- return node;
+ if (head_next)
+ atomic_dec(&queue->len);
+ return head_next;
}
-static bool queue_enqueue(struct crypt_queue *queue, struct list_head *node, int limit)
+static bool queue_enqueue(struct crypt_queue *queue, struct list_head *head, int limit)
{
- spin_lock_bh(&queue->lock);
- if (limit && queue->len >= limit) {
- spin_unlock_bh(&queue->lock);
- return false;
- }
- list_add_tail(node, &queue->queue);
- ++queue->len;
- spin_unlock_bh(&queue->lock);
- return true;
+ struct list_head *tail, *next;
+ bool have_space = !limit || atomic_inc_return(&queue->len) <= limit;
+ head->next = head->prev = NULL;
+ if (have_space) {
+ for (;;) {
+ tail = READ_ONCE(queue->tail);
+ next = READ_ONCE(tail->next);
+ /* Ensure our pointers are in sync */
+ if (tail == READ_ONCE(queue->tail)) {
+ if (next == NULL) {
+ /* Try to insert node at end */
+ if (cmpxchg(&tail->next, next, head) == next) /* Insert successful */
+ break;
+ } else /* Advance tail */
+ cmpxchg(&queue->tail, tail, next);
+ }
+ }
+ /* Pivot tail to our inserted node if tail hasn't moved from under us yet (which is okay) */
+ cmpxchg(&queue->tail, tail, head);
+ } else
+ atomic_dec(&queue->len);
+ return have_space;
}
static inline struct crypt_ctx *queue_dequeue_per_peer(struct crypt_queue *queue)
@@ -97,9 +123,10 @@ static inline struct crypt_ctx *queue_dequeue_per_device(struct crypt_queue *que
static inline struct crypt_ctx *queue_first_per_peer(struct crypt_queue *queue)
{
- /* TODO: yikes, this isn't locked. But only called from single-consumer contexts.
- * That should be okay, hopefully. Unless it isn't. */
- return list_first_entry_or_null(&queue->queue, struct crypt_ctx, per_peer_node);
+ struct list_head *first = READ_ONCE(queue->head->next);
+ if (&queue->list == first)
+ return NULL;
+ return container_of(first, struct crypt_ctx, per_peer_node);
}
static inline bool queue_enqueue_per_peer(struct crypt_queue *peer_queue, struct crypt_ctx *ctx)
@@ -410,7 +437,7 @@ void packet_create_data(struct wireguard_peer *peer, struct sk_buff_head *packet
/* If there are already packets on the init queue, these must go behind
* them to maintain the correct order, so we can only take the fast path
* when the init queue is empty. */
- if (likely(list_empty(&peer->init_queue.queue))) {
+ if (!atomic_read(&peer->init_queue.len)) {
if (likely(populate_sending_ctx(ctx))) {
if (unlikely(!queue_enqueue_per_device_and_peer(&wg->send_queue, &peer->send_queue, ctx, wg->packet_crypt_wq, &wg->send_queue.last_cpu)))
free_ctx(ctx);
diff --git a/src/device.h b/src/device.h
index c71b251..a0960ad 100644
--- a/src/device.h
+++ b/src/device.h
@@ -23,7 +23,7 @@ struct multicore_worker {
struct crypt_queue {
spinlock_t lock;
- struct list_head queue;
+ struct list_head list, *head, *tail;
union {
struct {
struct multicore_worker __percpu *worker;
@@ -31,7 +31,7 @@ struct crypt_queue {
};
struct work_struct work;
};
- int len;
+ atomic_t len;
};
struct wireguard_device {
diff --git a/src/send.c b/src/send.c
index f270ad2..850f771 100644
--- a/src/send.c
+++ b/src/send.c
@@ -111,7 +111,7 @@ void packet_send_keepalive(struct wireguard_peer *peer)
struct sk_buff *skb;
struct sk_buff_head queue;
- if (list_empty(&peer->init_queue.queue)) {
+ if (!atomic_read(&peer->init_queue.len)) {
skb = alloc_skb(DATA_PACKET_HEAD_ROOM + MESSAGE_MINIMUM_LENGTH, GFP_ATOMIC);
if (unlikely(!skb))
return;
diff --git a/src/tests/qemu/Makefile b/src/tests/qemu/Makefile
index 05081cd..2450684 100644
--- a/src/tests/qemu/Makefile
+++ b/src/tests/qemu/Makefile
@@ -73,7 +73,7 @@ qemu: $(KERNEL_BZIMAGE)
$(QEMU_MACHINE) \
-cpu host \
-smp $(NR_CPUS) \
- -m 96M \
+ -m 16G \
-object rng-random,id=rng0,filename=/dev/urandom \
-device virtio-rng-pci,rng=rng0 \
-device virtio-serial,max_ports=2 \