diff options
Diffstat (limited to '')
-rw-r--r-- | src/data.c | 166 |
1 files changed, 80 insertions, 86 deletions
@@ -17,8 +17,7 @@ #include <crypto/algapi.h> struct crypt_ctx { - struct list_head per_peer_head; - struct list_head per_device_head; + struct list_head per_peer_node, per_device_node; union { struct sk_buff_head packets; struct sk_buff *skb; @@ -29,91 +28,97 @@ struct crypt_ctx { atomic_t is_finished; }; -/** - * queue_dequeue - Atomically remove the first item in a queue. - * - * @return The address of the dequeued item, or NULL if the queue is empty. - * - * This function is safe to execute concurrently with any number of - * queue_enqueue() calls, but *not* with another queue_dequeue() call - * operating on the same queue. - */ -static struct list_head *queue_dequeue(struct crypt_queue *queue) +struct multicore_worker __percpu *packet_alloc_percpu_multicore_worker(work_func_t function, void *ptr) { - struct list_head *first, *second; - first = READ_ONCE(queue->list.next); - if (first == &queue->list) + int cpu; + struct multicore_worker __percpu *worker = alloc_percpu(struct multicore_worker); + if (!worker) return NULL; - do { - second = READ_ONCE(first->next); - WRITE_ONCE(queue->list.next, second); - } while (cmpxchg(&second->prev, first, &queue->list) != first); - if (first) - atomic_dec(&queue->qlen); - return first; + for_each_possible_cpu (cpu) { + per_cpu_ptr(worker, cpu)->ptr = ptr; + INIT_WORK(&per_cpu_ptr(worker, cpu)->work, function); + } + return worker; } -/** - * queue_enqueue - Atomically append an item to the tail of a queue. - * - * This function is safe to execute concurrently with any number of other - * queue_enqueue() calls, as well as with one queue_dequeue() call - * operating on the same queue. - */ -static bool queue_enqueue(struct crypt_queue *queue, struct list_head *head, int limit) +int packet_queue_init(struct crypt_queue *queue, work_func_t function, bool multicore) { - bool have_space = !limit || atomic_inc_return(&queue->qlen) <= limit; - if (have_space) { - struct list_head *last; - WRITE_ONCE(head->next, &queue->list); - do { - last = READ_ONCE(queue->list.prev); - WRITE_ONCE(head->prev, last); - } while (cmpxchg(&queue->list.prev, last, head) != last); - WRITE_ONCE(last->next, head); + INIT_LIST_HEAD(&queue->queue); + queue->len = 0; + spin_lock_init(&queue->lock); + if (multicore) { + queue->worker = packet_alloc_percpu_multicore_worker(function, queue); + if (!queue->worker) + return -ENOMEM; } else - atomic_dec(&queue->qlen); - return have_space; + INIT_WORK(&queue->work, function); + return 0; +} + +static struct list_head *queue_dequeue(struct crypt_queue *queue) +{ + struct list_head *node; + spin_lock_bh(&queue->lock); + node = queue->queue.next; + if (&queue->queue == node) { + spin_unlock_bh(&queue->lock); + return NULL; + } + list_del(node); + --queue->len; + spin_unlock_bh(&queue->lock); + return node; +} + +static bool queue_enqueue(struct crypt_queue *queue, struct list_head *node, int limit) +{ + spin_lock_bh(&queue->lock); + if (limit && queue->len >= limit) { + spin_unlock_bh(&queue->lock); + return false; + } + list_add_tail(node, &queue->queue); + ++queue->len; + spin_unlock_bh(&queue->lock); + return true; } static inline struct crypt_ctx *queue_dequeue_per_peer(struct crypt_queue *queue) { - struct list_head *head = queue_dequeue(queue); - return head ? list_entry(head, struct crypt_ctx, per_peer_head) : NULL; + struct list_head *node = queue_dequeue(queue); + return node ? list_entry(node, struct crypt_ctx, per_peer_node) : NULL; } static inline struct crypt_ctx *queue_dequeue_per_device(struct crypt_queue *queue) { - struct list_head *head = queue_dequeue(queue); - return head ? list_entry(head, struct crypt_ctx, per_device_head) : NULL; + struct list_head *node = queue_dequeue(queue); + return node ? list_entry(node, struct crypt_ctx, per_device_node) : NULL; } -static inline bool queue_enqueue_per_peer(struct crypt_queue *queue, struct crypt_ctx *ctx) +static inline struct crypt_ctx *queue_first_per_peer(struct crypt_queue *queue) { - /* TODO: While using MAX_QUEUED_PACKETS makes sense for the init_queue, it's - * not ideal to be using this for the encrypt/decrypt queues or the send/receive - * queues, where dynamic_queue_limit (dql) should be used instead. */ - return queue_enqueue(queue, &(ctx)->per_peer_head, MAX_QUEUED_PACKETS); + /* TODO: yikes, this isn't locked. But only called from single-consumer contexts. + * That should be okay, hopefully. Unless it isn't. */ + return list_first_entry_or_null(&queue->queue, struct crypt_ctx, per_peer_node); } -static inline void queue_enqueue_per_device(struct crypt_queue __percpu *queue, struct crypt_ctx *ctx, struct workqueue_struct *wq, int *next_cpu) +static inline bool queue_enqueue_per_peer(struct crypt_queue *peer_queue, struct crypt_ctx *ctx) { - struct crypt_queue *cpu_queue; - int cpu = cpumask_next_online(next_cpu); - /* Avoid running parallel work on the same CPU as the one handling all - * of the serial work. This improves overall throughput and especially - * throughput stability where we have at least two cores left for - * parallel work. */ - if (cpu == ctx->peer->serial_work_cpu && num_online_cpus() >= 3) - cpu = cpumask_next_online(next_cpu); - cpu_queue = per_cpu_ptr(queue, cpu); - queue_enqueue(cpu_queue, &ctx->per_device_head, 0); - queue_work_on(cpu, wq, &cpu_queue->work); + /* TODO: While using MAX_QUEUED_PACKETS makes sense for the init_queue, it's + * not ideal to be using this for the encrypt/decrypt queues or the send/receive + * queues, where dynamic_queue_limit (dql) should be used instead. */ + return queue_enqueue(peer_queue, &ctx->per_peer_node, MAX_QUEUED_PACKETS); } -static inline struct crypt_ctx *queue_first_per_peer(struct crypt_queue *queue) +static inline bool queue_enqueue_per_device_and_peer(struct crypt_queue *device_queue, struct crypt_queue *peer_queue, struct crypt_ctx *ctx, struct workqueue_struct *wq, int *next_cpu) { - return list_first_entry_or_null(&(queue)->list, struct crypt_ctx, per_peer_head); + int cpu; + if (unlikely(!queue_enqueue_per_peer(peer_queue, ctx))) + return false; + cpu = cpumask_next_online(next_cpu); + queue_enqueue(device_queue, &ctx->per_device_node, 0); + queue_work_on(cpu, wq, &per_cpu_ptr(device_queue->worker, cpu)->work); + return true; } /* This is RFC6479, a replay detection bitmap algorithm that avoids bitshifts */ @@ -343,7 +348,7 @@ void packet_send_worker(struct work_struct *work) void packet_encrypt_worker(struct work_struct *work) { struct crypt_ctx *ctx; - struct crypt_queue *queue = container_of(work, struct crypt_queue, work); + struct crypt_queue *queue = container_of(work, struct multicore_worker, work)->ptr; struct sk_buff *skb, *tmp; struct wireguard_peer *peer; bool have_simd = chacha20poly1305_init_simd(); @@ -374,19 +379,16 @@ void packet_init_worker(struct work_struct *work) struct wireguard_peer *peer = container_of(queue, struct wireguard_peer, init_queue); struct wireguard_device *wg = peer->device; - spin_lock(&peer->init_queue_lock); + /* TODO: does this race with packet_purge_init_queue and the other dequeuer in create_data, since it's unlocked? */ while ((ctx = queue_first_per_peer(queue)) != NULL) { if (unlikely(!populate_sending_ctx(ctx))) { packet_queue_handshake_initiation(peer, false); break; } queue_dequeue(queue); - if (likely(queue_enqueue_per_peer(&peer->send_queue, ctx))) - queue_enqueue_per_device(wg->send_queue, ctx, wg->packet_crypt_wq, &wg->encrypt_cpu); - else + if (unlikely(!queue_enqueue_per_device_and_peer(&wg->send_queue, &peer->send_queue, ctx, wg->packet_crypt_wq, &wg->send_queue.last_cpu))) free_ctx(ctx); } - spin_unlock(&peer->init_queue_lock); } void packet_create_data(struct wireguard_peer *peer, struct sk_buff_head *packets) @@ -408,11 +410,9 @@ void packet_create_data(struct wireguard_peer *peer, struct sk_buff_head *packet /* If there are already packets on the init queue, these must go behind * them to maintain the correct order, so we can only take the fast path * when the init queue is empty. */ - if (likely(list_empty(&peer->init_queue.list))) { + if (likely(list_empty(&peer->init_queue.queue))) { if (likely(populate_sending_ctx(ctx))) { - if (likely(queue_enqueue_per_peer(&peer->send_queue, ctx))) - queue_enqueue_per_device(wg->send_queue, ctx, wg->packet_crypt_wq, &wg->encrypt_cpu); - else + if (unlikely(!queue_enqueue_per_device_and_peer(&wg->send_queue, &peer->send_queue, ctx, wg->packet_crypt_wq, &wg->send_queue.last_cpu))) free_ctx(ctx); return; } @@ -430,13 +430,11 @@ void packet_create_data(struct wireguard_peer *peer, struct sk_buff_head *packet * ones. If we cannot acquire the lock, packets are being dequeued on * another thread, so race for the open slot. */ while (unlikely(!queue_enqueue_per_peer(&peer->init_queue, ctx))) { - if (spin_trylock(&peer->init_queue_lock)) { - struct crypt_ctx *tmp = queue_dequeue_per_peer(&peer->init_queue); - if (likely(tmp)) - free_ctx(tmp); - spin_unlock(&peer->init_queue_lock); - } + struct crypt_ctx *tmp = queue_dequeue_per_peer(&peer->init_queue); + if (likely(tmp)) + free_ctx(tmp); } + /* Oops, we added something to the queue while removing the peer. */ if (unlikely(atomic_read(&peer->is_draining))) { packet_purge_init_queue(peer); @@ -482,7 +480,7 @@ void packet_receive_worker(struct work_struct *work) void packet_decrypt_worker(struct work_struct *work) { struct crypt_ctx *ctx; - struct crypt_queue *queue = container_of(work, struct crypt_queue, work); + struct crypt_queue *queue = container_of(work, struct multicore_worker, work)->ptr; struct wireguard_peer *peer; while ((ctx = queue_dequeue_per_device(queue)) != NULL) { @@ -525,9 +523,7 @@ void packet_consume_data(struct sk_buff *skb, struct wireguard_device *wg) /* index_hashtable_lookup() already gets a reference to peer. */ ctx->peer = ctx->keypair->entry.peer; - if (likely(queue_enqueue_per_peer(&ctx->peer->receive_queue, ctx))) - queue_enqueue_per_device(wg->receive_queue, ctx, wg->packet_crypt_wq, &wg->decrypt_cpu); - else { + if (unlikely(!queue_enqueue_per_device_and_peer(&wg->receive_queue, &ctx->peer->receive_queue, ctx, wg->packet_crypt_wq, &wg->receive_queue.last_cpu))) { /* TODO: replace this with a call to free_ctx when receiving uses skb_queues as well. */ noise_keypair_put(ctx->keypair); peer_put(ctx->peer); @@ -539,8 +535,6 @@ void packet_consume_data(struct sk_buff *skb, struct wireguard_device *wg) void packet_purge_init_queue(struct wireguard_peer *peer) { struct crypt_ctx *ctx; - spin_lock(&peer->init_queue_lock); while ((ctx = queue_dequeue_per_peer(&peer->init_queue)) != NULL) free_ctx(ctx); - spin_unlock(&peer->init_queue_lock); } |