diff options
-rw-r--r-- | src/data.c | 166 | ||||
-rw-r--r-- | src/device.c | 30 | ||||
-rw-r--r-- | src/device.h | 23 | ||||
-rw-r--r-- | src/packets.h | 4 | ||||
-rw-r--r-- | src/peer.c | 10 | ||||
-rw-r--r-- | src/peer.h | 1 | ||||
-rw-r--r-- | src/receive.c | 2 | ||||
-rw-r--r-- | src/send.c | 2 |
8 files changed, 112 insertions, 126 deletions
@@ -17,8 +17,7 @@ #include <crypto/algapi.h> struct crypt_ctx { - struct list_head per_peer_head; - struct list_head per_device_head; + struct list_head per_peer_node, per_device_node; union { struct sk_buff_head packets; struct sk_buff *skb; @@ -29,91 +28,97 @@ struct crypt_ctx { atomic_t is_finished; }; -/** - * queue_dequeue - Atomically remove the first item in a queue. - * - * @return The address of the dequeued item, or NULL if the queue is empty. - * - * This function is safe to execute concurrently with any number of - * queue_enqueue() calls, but *not* with another queue_dequeue() call - * operating on the same queue. - */ -static struct list_head *queue_dequeue(struct crypt_queue *queue) +struct multicore_worker __percpu *packet_alloc_percpu_multicore_worker(work_func_t function, void *ptr) { - struct list_head *first, *second; - first = READ_ONCE(queue->list.next); - if (first == &queue->list) + int cpu; + struct multicore_worker __percpu *worker = alloc_percpu(struct multicore_worker); + if (!worker) return NULL; - do { - second = READ_ONCE(first->next); - WRITE_ONCE(queue->list.next, second); - } while (cmpxchg(&second->prev, first, &queue->list) != first); - if (first) - atomic_dec(&queue->qlen); - return first; + for_each_possible_cpu (cpu) { + per_cpu_ptr(worker, cpu)->ptr = ptr; + INIT_WORK(&per_cpu_ptr(worker, cpu)->work, function); + } + return worker; } -/** - * queue_enqueue - Atomically append an item to the tail of a queue. - * - * This function is safe to execute concurrently with any number of other - * queue_enqueue() calls, as well as with one queue_dequeue() call - * operating on the same queue. - */ -static bool queue_enqueue(struct crypt_queue *queue, struct list_head *head, int limit) +int packet_queue_init(struct crypt_queue *queue, work_func_t function, bool multicore) { - bool have_space = !limit || atomic_inc_return(&queue->qlen) <= limit; - if (have_space) { - struct list_head *last; - WRITE_ONCE(head->next, &queue->list); - do { - last = READ_ONCE(queue->list.prev); - WRITE_ONCE(head->prev, last); - } while (cmpxchg(&queue->list.prev, last, head) != last); - WRITE_ONCE(last->next, head); + INIT_LIST_HEAD(&queue->queue); + queue->len = 0; + spin_lock_init(&queue->lock); + if (multicore) { + queue->worker = packet_alloc_percpu_multicore_worker(function, queue); + if (!queue->worker) + return -ENOMEM; } else - atomic_dec(&queue->qlen); - return have_space; + INIT_WORK(&queue->work, function); + return 0; +} + +static struct list_head *queue_dequeue(struct crypt_queue *queue) +{ + struct list_head *node; + spin_lock_bh(&queue->lock); + node = queue->queue.next; + if (&queue->queue == node) { + spin_unlock_bh(&queue->lock); + return NULL; + } + list_del(node); + --queue->len; + spin_unlock_bh(&queue->lock); + return node; +} + +static bool queue_enqueue(struct crypt_queue *queue, struct list_head *node, int limit) +{ + spin_lock_bh(&queue->lock); + if (limit && queue->len >= limit) { + spin_unlock_bh(&queue->lock); + return false; + } + list_add_tail(node, &queue->queue); + ++queue->len; + spin_unlock_bh(&queue->lock); + return true; } static inline struct crypt_ctx *queue_dequeue_per_peer(struct crypt_queue *queue) { - struct list_head *head = queue_dequeue(queue); - return head ? list_entry(head, struct crypt_ctx, per_peer_head) : NULL; + struct list_head *node = queue_dequeue(queue); + return node ? list_entry(node, struct crypt_ctx, per_peer_node) : NULL; } static inline struct crypt_ctx *queue_dequeue_per_device(struct crypt_queue *queue) { - struct list_head *head = queue_dequeue(queue); - return head ? list_entry(head, struct crypt_ctx, per_device_head) : NULL; + struct list_head *node = queue_dequeue(queue); + return node ? list_entry(node, struct crypt_ctx, per_device_node) : NULL; } -static inline bool queue_enqueue_per_peer(struct crypt_queue *queue, struct crypt_ctx *ctx) +static inline struct crypt_ctx *queue_first_per_peer(struct crypt_queue *queue) { - /* TODO: While using MAX_QUEUED_PACKETS makes sense for the init_queue, it's - * not ideal to be using this for the encrypt/decrypt queues or the send/receive - * queues, where dynamic_queue_limit (dql) should be used instead. */ - return queue_enqueue(queue, &(ctx)->per_peer_head, MAX_QUEUED_PACKETS); + /* TODO: yikes, this isn't locked. But only called from single-consumer contexts. + * That should be okay, hopefully. Unless it isn't. */ + return list_first_entry_or_null(&queue->queue, struct crypt_ctx, per_peer_node); } -static inline void queue_enqueue_per_device(struct crypt_queue __percpu *queue, struct crypt_ctx *ctx, struct workqueue_struct *wq, int *next_cpu) +static inline bool queue_enqueue_per_peer(struct crypt_queue *peer_queue, struct crypt_ctx *ctx) { - struct crypt_queue *cpu_queue; - int cpu = cpumask_next_online(next_cpu); - /* Avoid running parallel work on the same CPU as the one handling all - * of the serial work. This improves overall throughput and especially - * throughput stability where we have at least two cores left for - * parallel work. */ - if (cpu == ctx->peer->serial_work_cpu && num_online_cpus() >= 3) - cpu = cpumask_next_online(next_cpu); - cpu_queue = per_cpu_ptr(queue, cpu); - queue_enqueue(cpu_queue, &ctx->per_device_head, 0); - queue_work_on(cpu, wq, &cpu_queue->work); + /* TODO: While using MAX_QUEUED_PACKETS makes sense for the init_queue, it's + * not ideal to be using this for the encrypt/decrypt queues or the send/receive + * queues, where dynamic_queue_limit (dql) should be used instead. */ + return queue_enqueue(peer_queue, &ctx->per_peer_node, MAX_QUEUED_PACKETS); } -static inline struct crypt_ctx *queue_first_per_peer(struct crypt_queue *queue) +static inline bool queue_enqueue_per_device_and_peer(struct crypt_queue *device_queue, struct crypt_queue *peer_queue, struct crypt_ctx *ctx, struct workqueue_struct *wq, int *next_cpu) { - return list_first_entry_or_null(&(queue)->list, struct crypt_ctx, per_peer_head); + int cpu; + if (unlikely(!queue_enqueue_per_peer(peer_queue, ctx))) + return false; + cpu = cpumask_next_online(next_cpu); + queue_enqueue(device_queue, &ctx->per_device_node, 0); + queue_work_on(cpu, wq, &per_cpu_ptr(device_queue->worker, cpu)->work); + return true; } /* This is RFC6479, a replay detection bitmap algorithm that avoids bitshifts */ @@ -343,7 +348,7 @@ void packet_send_worker(struct work_struct *work) void packet_encrypt_worker(struct work_struct *work) { struct crypt_ctx *ctx; - struct crypt_queue *queue = container_of(work, struct crypt_queue, work); + struct crypt_queue *queue = container_of(work, struct multicore_worker, work)->ptr; struct sk_buff *skb, *tmp; struct wireguard_peer *peer; bool have_simd = chacha20poly1305_init_simd(); @@ -374,19 +379,16 @@ void packet_init_worker(struct work_struct *work) struct wireguard_peer *peer = container_of(queue, struct wireguard_peer, init_queue); struct wireguard_device *wg = peer->device; - spin_lock(&peer->init_queue_lock); + /* TODO: does this race with packet_purge_init_queue and the other dequeuer in create_data, since it's unlocked? */ while ((ctx = queue_first_per_peer(queue)) != NULL) { if (unlikely(!populate_sending_ctx(ctx))) { packet_queue_handshake_initiation(peer, false); break; } queue_dequeue(queue); - if (likely(queue_enqueue_per_peer(&peer->send_queue, ctx))) - queue_enqueue_per_device(wg->send_queue, ctx, wg->packet_crypt_wq, &wg->encrypt_cpu); - else + if (unlikely(!queue_enqueue_per_device_and_peer(&wg->send_queue, &peer->send_queue, ctx, wg->packet_crypt_wq, &wg->send_queue.last_cpu))) free_ctx(ctx); } - spin_unlock(&peer->init_queue_lock); } void packet_create_data(struct wireguard_peer *peer, struct sk_buff_head *packets) @@ -408,11 +410,9 @@ void packet_create_data(struct wireguard_peer *peer, struct sk_buff_head *packet /* If there are already packets on the init queue, these must go behind * them to maintain the correct order, so we can only take the fast path * when the init queue is empty. */ - if (likely(list_empty(&peer->init_queue.list))) { + if (likely(list_empty(&peer->init_queue.queue))) { if (likely(populate_sending_ctx(ctx))) { - if (likely(queue_enqueue_per_peer(&peer->send_queue, ctx))) - queue_enqueue_per_device(wg->send_queue, ctx, wg->packet_crypt_wq, &wg->encrypt_cpu); - else + if (unlikely(!queue_enqueue_per_device_and_peer(&wg->send_queue, &peer->send_queue, ctx, wg->packet_crypt_wq, &wg->send_queue.last_cpu))) free_ctx(ctx); return; } @@ -430,13 +430,11 @@ void packet_create_data(struct wireguard_peer *peer, struct sk_buff_head *packet * ones. If we cannot acquire the lock, packets are being dequeued on * another thread, so race for the open slot. */ while (unlikely(!queue_enqueue_per_peer(&peer->init_queue, ctx))) { - if (spin_trylock(&peer->init_queue_lock)) { - struct crypt_ctx *tmp = queue_dequeue_per_peer(&peer->init_queue); - if (likely(tmp)) - free_ctx(tmp); - spin_unlock(&peer->init_queue_lock); - } + struct crypt_ctx *tmp = queue_dequeue_per_peer(&peer->init_queue); + if (likely(tmp)) + free_ctx(tmp); } + /* Oops, we added something to the queue while removing the peer. */ if (unlikely(atomic_read(&peer->is_draining))) { packet_purge_init_queue(peer); @@ -482,7 +480,7 @@ void packet_receive_worker(struct work_struct *work) void packet_decrypt_worker(struct work_struct *work) { struct crypt_ctx *ctx; - struct crypt_queue *queue = container_of(work, struct crypt_queue, work); + struct crypt_queue *queue = container_of(work, struct multicore_worker, work)->ptr; struct wireguard_peer *peer; while ((ctx = queue_dequeue_per_device(queue)) != NULL) { @@ -525,9 +523,7 @@ void packet_consume_data(struct sk_buff *skb, struct wireguard_device *wg) /* index_hashtable_lookup() already gets a reference to peer. */ ctx->peer = ctx->keypair->entry.peer; - if (likely(queue_enqueue_per_peer(&ctx->peer->receive_queue, ctx))) - queue_enqueue_per_device(wg->receive_queue, ctx, wg->packet_crypt_wq, &wg->decrypt_cpu); - else { + if (unlikely(!queue_enqueue_per_device_and_peer(&wg->receive_queue, &ctx->peer->receive_queue, ctx, wg->packet_crypt_wq, &wg->receive_queue.last_cpu))) { /* TODO: replace this with a call to free_ctx when receiving uses skb_queues as well. */ noise_keypair_put(ctx->keypair); peer_put(ctx->peer); @@ -539,8 +535,6 @@ void packet_consume_data(struct sk_buff *skb, struct wireguard_device *wg) void packet_purge_init_queue(struct wireguard_peer *peer) { struct crypt_ctx *ctx; - spin_lock(&peer->init_queue_lock); while ((ctx = queue_dequeue_per_peer(&peer->init_queue)) != NULL) free_ctx(ctx); - spin_unlock(&peer->init_queue_lock); } diff --git a/src/device.c b/src/device.c index 3615125..672b58f 100644 --- a/src/device.c +++ b/src/device.c @@ -222,8 +222,8 @@ static void destruct(struct net_device *dev) wg->incoming_port = 0; destroy_workqueue(wg->handshake_receive_wq); destroy_workqueue(wg->handshake_send_wq); - free_percpu(wg->receive_queue); - free_percpu(wg->send_queue); + free_percpu(wg->receive_queue.worker); + free_percpu(wg->send_queue.worker); destroy_workqueue(wg->packet_crypt_wq); routing_table_free(&wg->peer_routing_table); ratelimiter_uninit(); @@ -271,7 +271,7 @@ static void setup(struct net_device *dev) static int newlink(struct net *src_net, struct net_device *dev, struct nlattr *tb[], struct nlattr *data[], struct netlink_ext_ack *extack) { - int ret = -ENOMEM, cpu; + int ret = -ENOMEM; struct wireguard_device *wg = netdev_priv(dev); wg->creating_net = get_net(src_net); @@ -289,13 +289,9 @@ static int newlink(struct net *src_net, struct net_device *dev, struct nlattr *t if (!dev->tstats) goto error_1; - wg->incoming_handshakes_worker = alloc_percpu(struct handshake_worker); + wg->incoming_handshakes_worker = packet_alloc_percpu_multicore_worker(packet_process_queued_handshake_packets, wg); if (!wg->incoming_handshakes_worker) goto error_2; - for_each_possible_cpu (cpu) { - per_cpu_ptr(wg->incoming_handshakes_worker, cpu)->wg = wg; - INIT_WORK(&per_cpu_ptr(wg->incoming_handshakes_worker, cpu)->work, packet_process_queued_handshake_packets); - } wg->handshake_receive_wq = alloc_workqueue("wg-kex-%s", WQ_CPU_INTENSIVE | WQ_FREEZABLE, 0, dev->name); if (!wg->handshake_receive_wq) @@ -309,21 +305,11 @@ static int newlink(struct net *src_net, struct net_device *dev, struct nlattr *t if (!wg->packet_crypt_wq) goto error_5; - wg->send_queue = alloc_percpu(struct crypt_queue); - if (!wg->send_queue) + if (packet_queue_init(&wg->send_queue, packet_encrypt_worker, true) < 0) goto error_6; - for_each_possible_cpu (cpu) { - INIT_LIST_HEAD(&per_cpu_ptr(wg->send_queue, cpu)->list); - INIT_WORK(&per_cpu_ptr(wg->send_queue, cpu)->work, packet_encrypt_worker); - } - wg->receive_queue = alloc_percpu(struct crypt_queue); - if (!wg->receive_queue) + if (packet_queue_init(&wg->receive_queue, packet_decrypt_worker, true) < 0) goto error_7; - for_each_possible_cpu (cpu) { - INIT_LIST_HEAD(&per_cpu_ptr(wg->receive_queue, cpu)->list); - INIT_WORK(&per_cpu_ptr(wg->receive_queue, cpu)->work, packet_decrypt_worker); - } ret = ratelimiter_init(); if (ret < 0) @@ -345,9 +331,9 @@ static int newlink(struct net *src_net, struct net_device *dev, struct nlattr *t error_9: ratelimiter_uninit(); error_8: - free_percpu(wg->receive_queue); + free_percpu(wg->receive_queue.worker); error_7: - free_percpu(wg->send_queue); + free_percpu(wg->send_queue.worker); error_6: destroy_workqueue(wg->packet_crypt_wq); error_5: diff --git a/src/device.h b/src/device.h index 047fdf5..c71b251 100644 --- a/src/device.h +++ b/src/device.h @@ -16,15 +16,22 @@ struct wireguard_device; -struct handshake_worker { - struct wireguard_device *wg; +struct multicore_worker { + void *ptr; struct work_struct work; }; struct crypt_queue { - struct list_head list; - struct work_struct work; - atomic_t qlen; + spinlock_t lock; + struct list_head queue; + union { + struct { + struct multicore_worker __percpu *worker; + int last_cpu; + }; + struct work_struct work; + }; + int len; }; struct wireguard_device { @@ -37,9 +44,9 @@ struct wireguard_device { struct noise_static_identity static_identity; struct workqueue_struct *handshake_receive_wq, *handshake_send_wq, *packet_crypt_wq; struct sk_buff_head incoming_handshakes; - struct crypt_queue __percpu *send_queue, *receive_queue; - int incoming_handshake_cpu, encrypt_cpu, decrypt_cpu; - struct handshake_worker __percpu *incoming_handshakes_worker; + struct crypt_queue send_queue, receive_queue; + int incoming_handshake_cpu; + struct multicore_worker __percpu *incoming_handshakes_worker; struct cookie_checker cookie_checker; struct pubkey_hashtable peer_hashtable; struct index_hashtable index_hashtable; diff --git a/src/packets.h b/src/packets.h index 0b8fe1e..bf6bad3 100644 --- a/src/packets.h +++ b/src/packets.h @@ -14,6 +14,8 @@ struct wireguard_device; struct wireguard_peer; +struct multicore_worker; +struct crypt_queue; struct sk_buff; struct packet_cb { @@ -33,6 +35,8 @@ void packet_receive_worker(struct work_struct *work); void packet_decrypt_worker(struct work_struct *work); void packet_consume_data(struct sk_buff *skb, struct wireguard_device *wg); void packet_purge_init_queue(struct wireguard_peer *peer); +int packet_queue_init(struct crypt_queue *queue, work_func_t function, bool multicore); +struct multicore_worker __percpu *packet_alloc_percpu_multicore_worker(work_func_t function, void *ptr); /* receive.c */ void packet_process_queued_handshake_packets(struct work_struct *work); @@ -46,13 +46,9 @@ struct wireguard_peer *peer_create(struct wireguard_device *wg, const u8 public_ kref_init(&peer->refcount); pubkey_hashtable_add(&wg->peer_hashtable, peer); list_add_tail(&peer->peer_list, &wg->peer_list); - INIT_LIST_HEAD(&peer->init_queue.list); - INIT_WORK(&peer->init_queue.work, packet_init_worker); - INIT_LIST_HEAD(&peer->send_queue.list); - INIT_WORK(&peer->send_queue.work, packet_send_worker); - INIT_LIST_HEAD(&peer->receive_queue.list); - INIT_WORK(&peer->receive_queue.work, packet_receive_worker); - spin_lock_init(&peer->init_queue_lock); + packet_queue_init(&peer->send_queue, packet_send_worker, false); + packet_queue_init(&peer->receive_queue, packet_receive_worker, false); + packet_queue_init(&peer->init_queue, packet_init_worker, false); pr_debug("%s: Peer %Lu created\n", wg->dev->name, peer->internal_id); return peer; } @@ -54,7 +54,6 @@ struct wireguard_peer { struct list_head peer_list; u64 internal_id; struct crypt_queue init_queue, send_queue, receive_queue; - spinlock_t init_queue_lock; atomic_t is_draining; int serial_work_cpu; }; diff --git a/src/receive.c b/src/receive.c index 86d522b..aa1684b 100644 --- a/src/receive.c +++ b/src/receive.c @@ -147,7 +147,7 @@ static void receive_handshake_packet(struct wireguard_device *wg, struct sk_buff void packet_process_queued_handshake_packets(struct work_struct *work) { - struct wireguard_device *wg = container_of(work, struct handshake_worker, work)->wg; + struct wireguard_device *wg = container_of(work, struct multicore_worker, work)->ptr; struct sk_buff *skb; while ((skb = skb_dequeue(&wg->incoming_handshakes)) != NULL) { @@ -111,7 +111,7 @@ void packet_send_keepalive(struct wireguard_peer *peer) struct sk_buff *skb; struct sk_buff_head queue; - if (list_empty(&peer->init_queue.list)) { + if (list_empty(&peer->init_queue.queue)) { skb = alloc_skb(DATA_PACKET_HEAD_ROOM + MESSAGE_MINIMUM_LENGTH, GFP_ATOMIC); if (unlikely(!skb)) return; |