From 4f739cddb9f37b9780c9b0f4d1222b0be8d7e723 Mon Sep 17 00:00:00 2001 From: "Jason A. Donenfeld" Date: Fri, 15 Sep 2017 23:58:38 +0200 Subject: data: move from spinlocks to lockless data structure Reference: https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html Original-code-from: @aegis --- src/data.c | 79 +++++++++++++++++++++++++++++++++---------------- src/device.h | 4 +-- src/send.c | 2 +- src/tests/qemu/Makefile | 2 +- 4 files changed, 57 insertions(+), 30 deletions(-) diff --git a/src/data.c b/src/data.c index 0d01c64..080324d 100644 --- a/src/data.c +++ b/src/data.c @@ -43,8 +43,9 @@ struct multicore_worker __percpu *packet_alloc_percpu_multicore_worker(work_func int packet_queue_init(struct crypt_queue *queue, work_func_t function, bool multicore) { - INIT_LIST_HEAD(&queue->queue); - queue->len = 0; + queue->list.next = queue->list.prev = NULL; + queue->head = queue->tail = &queue->list; + atomic_set(&queue->len, 0); spin_lock_init(&queue->lock); if (multicore) { queue->worker = packet_alloc_percpu_multicore_worker(function, queue); @@ -55,32 +56,57 @@ int packet_queue_init(struct crypt_queue *queue, work_func_t function, bool mult return 0; } +/* We don't yet know how to properly free things, so just leak memory. :( */ +#define kmem_cache_free(a, b) + static struct list_head *queue_dequeue(struct crypt_queue *queue) { - struct list_head *node; - spin_lock_bh(&queue->lock); - node = queue->queue.next; - if (&queue->queue == node) { - spin_unlock_bh(&queue->lock); - return NULL; + struct list_head *head, *tail, *head_next, *tail_next; + for (;;) { + head = READ_ONCE(queue->head); + tail = READ_ONCE(queue->tail); + head_next = READ_ONCE(head->next); + tail_next = READ_ONCE(tail->next); + /* Ensure our pointers are in sync */ + if (head == READ_ONCE(queue->head)) { + if (head == tail) { + if (head_next == NULL)/* Nothing to dequeue */ + break; + else /* Tail is behind, advance it */ + cmpxchg(&queue->tail, tail, tail_next); + } else if (cmpxchg(&queue->head, head, head_next)) + break; /* Dequeue successful */ + } } - list_del(node); - --queue->len; - spin_unlock_bh(&queue->lock); - return node; + if (head_next) + atomic_dec(&queue->len); + return head_next; } -static bool queue_enqueue(struct crypt_queue *queue, struct list_head *node, int limit) +static bool queue_enqueue(struct crypt_queue *queue, struct list_head *head, int limit) { - spin_lock_bh(&queue->lock); - if (limit && queue->len >= limit) { - spin_unlock_bh(&queue->lock); - return false; - } - list_add_tail(node, &queue->queue); - ++queue->len; - spin_unlock_bh(&queue->lock); - return true; + struct list_head *tail, *next; + bool have_space = !limit || atomic_inc_return(&queue->len) <= limit; + head->next = head->prev = NULL; + if (have_space) { + for (;;) { + tail = READ_ONCE(queue->tail); + next = READ_ONCE(tail->next); + /* Ensure our pointers are in sync */ + if (tail == READ_ONCE(queue->tail)) { + if (next == NULL) { + /* Try to insert node at end */ + if (cmpxchg(&tail->next, next, head) == next) /* Insert successful */ + break; + } else /* Advance tail */ + cmpxchg(&queue->tail, tail, next); + } + } + /* Pivot tail to our inserted node if tail hasn't moved from under us yet (which is okay) */ + cmpxchg(&queue->tail, tail, head); + } else + atomic_dec(&queue->len); + return have_space; } static inline struct crypt_ctx *queue_dequeue_per_peer(struct crypt_queue *queue) @@ -97,9 +123,10 @@ static inline struct crypt_ctx *queue_dequeue_per_device(struct crypt_queue *que static inline struct crypt_ctx *queue_first_per_peer(struct crypt_queue *queue) { - /* TODO: yikes, this isn't locked. But only called from single-consumer contexts. - * That should be okay, hopefully. Unless it isn't. */ - return list_first_entry_or_null(&queue->queue, struct crypt_ctx, per_peer_node); + struct list_head *first = READ_ONCE(queue->head->next); + if (&queue->list == first) + return NULL; + return container_of(first, struct crypt_ctx, per_peer_node); } static inline bool queue_enqueue_per_peer(struct crypt_queue *peer_queue, struct crypt_ctx *ctx) @@ -410,7 +437,7 @@ void packet_create_data(struct wireguard_peer *peer, struct sk_buff_head *packet /* If there are already packets on the init queue, these must go behind * them to maintain the correct order, so we can only take the fast path * when the init queue is empty. */ - if (likely(list_empty(&peer->init_queue.queue))) { + if (!atomic_read(&peer->init_queue.len)) { if (likely(populate_sending_ctx(ctx))) { if (unlikely(!queue_enqueue_per_device_and_peer(&wg->send_queue, &peer->send_queue, ctx, wg->packet_crypt_wq, &wg->send_queue.last_cpu))) free_ctx(ctx); diff --git a/src/device.h b/src/device.h index c71b251..a0960ad 100644 --- a/src/device.h +++ b/src/device.h @@ -23,7 +23,7 @@ struct multicore_worker { struct crypt_queue { spinlock_t lock; - struct list_head queue; + struct list_head list, *head, *tail; union { struct { struct multicore_worker __percpu *worker; @@ -31,7 +31,7 @@ struct crypt_queue { }; struct work_struct work; }; - int len; + atomic_t len; }; struct wireguard_device { diff --git a/src/send.c b/src/send.c index f270ad2..850f771 100644 --- a/src/send.c +++ b/src/send.c @@ -111,7 +111,7 @@ void packet_send_keepalive(struct wireguard_peer *peer) struct sk_buff *skb; struct sk_buff_head queue; - if (list_empty(&peer->init_queue.queue)) { + if (!atomic_read(&peer->init_queue.len)) { skb = alloc_skb(DATA_PACKET_HEAD_ROOM + MESSAGE_MINIMUM_LENGTH, GFP_ATOMIC); if (unlikely(!skb)) return; diff --git a/src/tests/qemu/Makefile b/src/tests/qemu/Makefile index 05081cd..2450684 100644 --- a/src/tests/qemu/Makefile +++ b/src/tests/qemu/Makefile @@ -73,7 +73,7 @@ qemu: $(KERNEL_BZIMAGE) $(QEMU_MACHINE) \ -cpu host \ -smp $(NR_CPUS) \ - -m 96M \ + -m 16G \ -object rng-random,id=rng0,filename=/dev/urandom \ -device virtio-rng-pci,rng=rng0 \ -device virtio-serial,max_ports=2 \ -- cgit v1.2.3-59-g8ed1b