diff options
author | Jason A. Donenfeld <Jason@zx2c4.com> | 2017-09-12 18:51:12 +0200 |
---|---|---|
committer | Jason A. Donenfeld <Jason@zx2c4.com> | 2017-09-15 22:09:16 +0200 |
commit | 7183f5d0b13ca5527dc028283220708607052669 (patch) | |
tree | 120003fc872c5771855cee4dd43e72b292c68058 | |
parent | data: entirely rework parallel system (diff) | |
download | wireguard-monolithic-historical-7183f5d0b13ca5527dc028283220708607052669.tar.xz wireguard-monolithic-historical-7183f5d0b13ca5527dc028283220708607052669.zip |
data: reorganize and edit new queuing code
This involves many changes of Samuel's new system, in addition
to some TODOs for things that are not yet ideal.
-rw-r--r-- | src/config.c | 2 | ||||
-rw-r--r-- | src/data.c | 299 | ||||
-rw-r--r-- | src/device.c | 59 | ||||
-rw-r--r-- | src/device.h | 12 | ||||
-rw-r--r-- | src/main.c | 6 | ||||
-rw-r--r-- | src/messages.h | 2 | ||||
-rw-r--r-- | src/packets.h | 47 | ||||
-rw-r--r-- | src/peer.c | 23 | ||||
-rw-r--r-- | src/peer.h | 3 | ||||
-rw-r--r-- | src/queue.h | 139 | ||||
-rw-r--r-- | src/receive.c | 14 | ||||
-rw-r--r-- | src/send.c | 29 | ||||
-rw-r--r-- | src/timers.c | 4 |
13 files changed, 309 insertions, 330 deletions
diff --git a/src/config.c b/src/config.c index a964f27..2bc4361 100644 --- a/src/config.c +++ b/src/config.c @@ -114,7 +114,7 @@ static int set_peer(struct wireguard_device *wg, void __user *user_peer, size_t } if (wg->dev->flags & IFF_UP) - queue_work(wg->crypt_wq, &peer->init_queue.work); + queue_work(wg->packet_crypt_wq, &peer->init_queue.work); peer_put(peer); @@ -5,7 +5,6 @@ #include "peer.h" #include "messages.h" #include "packets.h" -#include "queue.h" #include "timers.h" #include "hashtables.h" @@ -17,42 +16,98 @@ #include <net/xfrm.h> #include <crypto/algapi.h> -static struct kmem_cache *crypt_ctx_cache __read_mostly; +struct crypt_ctx { + struct list_head per_peer_head; + struct list_head per_device_head; + union { + struct sk_buff_head packets; + struct sk_buff *skb; + }; + struct wireguard_peer *peer; + struct noise_keypair *keypair; + struct endpoint endpoint; + atomic_t is_finished; +}; + +/** + * queue_dequeue - Atomically remove the first item in a queue. + * + * @return The address of the dequeued item, or NULL if the queue is empty. + * + * This function is safe to execute concurrently with any number of + * queue_enqueue() calls, but *not* with another queue_dequeue() call + * operating on the same queue. + */ +static struct list_head *queue_dequeue(struct crypt_queue *queue) +{ + struct list_head *first, *second; + first = READ_ONCE(queue->list.next); + if (first == &queue->list) + return NULL; + do { + second = READ_ONCE(first->next); + WRITE_ONCE(queue->list.next, second); + } while (cmpxchg(&second->prev, first, &queue->list) != first); + if (first) + atomic_dec(&queue->qlen); + return first; +} -int __init init_crypt_cache(void) +/** + * queue_enqueue - Atomically append an item to the tail of a queue. + * + * This function is safe to execute concurrently with any number of other + * queue_enqueue() calls, as well as with one queue_dequeue() call + * operating on the same queue. + */ +static bool queue_enqueue(struct crypt_queue *queue, struct list_head *head, int limit) { - crypt_ctx_cache = KMEM_CACHE(crypt_ctx, 0); - if (!crypt_ctx_cache) - return -ENOMEM; - return 0; + bool have_space = !limit || atomic_inc_return(&queue->qlen) <= limit; + if (have_space) { + struct list_head *last; + WRITE_ONCE(head->next, &queue->list); + do { + last = READ_ONCE(queue->list.prev); + WRITE_ONCE(head->prev, last); + } while (cmpxchg(&queue->list.prev, last, head) != last); + WRITE_ONCE(last->next, head); + } else + atomic_dec(&queue->qlen); + return have_space; } -void deinit_crypt_cache(void) +static inline struct crypt_ctx *queue_dequeue_per_peer(struct crypt_queue *queue) { - kmem_cache_destroy(crypt_ctx_cache); + struct list_head *head = queue_dequeue(queue); + return head ? list_entry(head, struct crypt_ctx, per_peer_head) : NULL; } -static void drop_ctx(struct crypt_ctx *ctx, bool sending) +static inline struct crypt_ctx *queue_dequeue_per_device(struct crypt_queue *queue) { - if (ctx->keypair) - noise_keypair_put(ctx->keypair); - peer_put(ctx->peer); - if (sending) - skb_queue_purge(&ctx->packets); - else - dev_kfree_skb(ctx->skb); - kmem_cache_free(crypt_ctx_cache, ctx); + struct list_head *head = queue_dequeue(queue); + return head ? list_entry(head, struct crypt_ctx, per_device_head) : NULL; +} + +static inline bool queue_enqueue_per_peer(struct crypt_queue *queue, struct crypt_ctx *ctx) +{ + /* TODO: While using MAX_QUEUED_PACKETS makes sense for the init_queue, it's + * not ideal to be using this for the encrypt/decrypt queues or the send/receive + * queues, where dynamic_queue_limit (dql) should be used instead. */ + return queue_enqueue(queue, &(ctx)->per_peer_head, MAX_QUEUED_PACKETS); } -#define drop_ctx_and_continue(ctx, sending) ({ \ - drop_ctx(ctx, sending); \ - continue; \ -}) +static inline void queue_enqueue_per_device(struct crypt_queue __percpu *queue, struct crypt_ctx *ctx, struct workqueue_struct *wq, int *next_cpu) +{ + int cpu = cpumask_next_online(next_cpu); + struct crypt_queue *cpu_queue = per_cpu_ptr(queue, cpu); + queue_enqueue(cpu_queue, &ctx->per_device_head, 0); + queue_work_on(cpu, wq, &cpu_queue->work); +} -#define drop_ctx_and_return(ctx, sending) ({ \ - drop_ctx(ctx, sending); \ - return; \ -}) +static inline struct crypt_ctx *queue_first_per_peer(struct crypt_queue *queue) +{ + return list_first_entry_or_null(&(queue)->list, struct crypt_ctx, per_peer_head); +} /* This is RFC6479, a replay detection bitmap algorithm that avoids bitshifts */ static inline bool counter_validate(union noise_counter *counter, u64 their_counter) @@ -88,6 +143,19 @@ out: } #include "selftest/counter.h" +static inline int choose_cpu(int *stored_cpu, unsigned int id) +{ + unsigned int cpu = *stored_cpu, cpu_index, i; + if (unlikely(cpu == nr_cpumask_bits || !cpumask_test_cpu(cpu, cpu_online_mask))) { + cpu_index = id % cpumask_weight(cpu_online_mask); + cpu = cpumask_first(cpu_online_mask); + for (i = 0; i < cpu_index; ++i) + cpu = cpumask_next(cpu, cpu_online_mask); + *stored_cpu = cpu; + } + return cpu; +} + static inline unsigned int skb_padding(struct sk_buff *skb) { /* We do this modulo business with the MTU, just in case the networking layer @@ -196,7 +264,31 @@ static inline bool skb_decrypt(struct sk_buff *skb, struct noise_symmetric_key * return !pskb_trim(skb, skb->len - noise_encrypted_len(0)); } -static inline bool packet_initialize_ctx(struct crypt_ctx *ctx) +static struct kmem_cache *crypt_ctx_cache __read_mostly; + +int __init init_crypt_ctx_cache(void) +{ + crypt_ctx_cache = KMEM_CACHE(crypt_ctx, 0); + if (!crypt_ctx_cache) + return -ENOMEM; + return 0; +} + +void deinit_crypt_ctx_cache(void) +{ + kmem_cache_destroy(crypt_ctx_cache); +} + +static void free_ctx(struct crypt_ctx *ctx) +{ + if (ctx->keypair) + noise_keypair_put(ctx->keypair); + peer_put(ctx->peer); + skb_queue_purge(&ctx->packets); + kmem_cache_free(crypt_ctx_cache, ctx); +} + +static bool populate_sending_ctx(struct crypt_ctx *ctx) { struct noise_symmetric_key *key; struct sk_buff *skb; @@ -230,27 +322,15 @@ out_nokey: void packet_send_worker(struct work_struct *work) { - struct crypt_ctx *ctx; struct crypt_queue *queue = container_of(work, struct crypt_queue, work); - struct sk_buff *skb, *tmp; - struct wireguard_peer *peer = container_of(queue, struct wireguard_peer, send_queue); - bool data_sent = false; + struct crypt_ctx *ctx; - timers_any_authenticated_packet_traversal(peer); - while ((ctx = queue_first_peer(queue)) != NULL && atomic_read(&ctx->state) == CTX_FINISHED) { + while ((ctx = queue_first_per_peer(queue)) != NULL && atomic_read(&ctx->is_finished)) { queue_dequeue(queue); - skb_queue_walk_safe(&ctx->packets, skb, tmp) { - bool is_keepalive = skb->len == message_data_len(0); - if (likely(!socket_send_skb_to_peer(peer, skb, PACKET_CB(skb)->ds) && !is_keepalive)) - data_sent = true; - } - noise_keypair_put(ctx->keypair); + packet_create_data_done(&ctx->packets, ctx->peer); peer_put(ctx->peer); kmem_cache_free(crypt_ctx_cache, ctx); } - if (likely(data_sent)) - timers_data_sent(peer); - keep_key_fresh_send(peer); } void packet_encrypt_worker(struct work_struct *work) @@ -261,7 +341,7 @@ void packet_encrypt_worker(struct work_struct *work) struct wireguard_peer *peer; bool have_simd = chacha20poly1305_init_simd(); - while ((ctx = queue_dequeue_shared(queue)) != NULL) { + while ((ctx = queue_dequeue_per_device(queue)) != NULL) { skb_queue_walk_safe(&ctx->packets, skb, tmp) { if (likely(skb_encrypt(skb, ctx->keypair, have_simd))) { skb_reset(skb); @@ -270,10 +350,11 @@ void packet_encrypt_worker(struct work_struct *work) dev_kfree_skb(skb); } } - /* Dereferencing ctx is unsafe once ctx->state == CTX_FINISHED. */ + /* Dereferencing ctx is unsafe once ctx->is_finished == true, so + * we grab an additional reference to peer. */ peer = peer_rcu_get(ctx->peer); - atomic_set(&ctx->state, CTX_FINISHED); - queue_work_on(peer->work_cpu, peer->device->crypt_wq, &peer->send_queue.work); + atomic_set(&ctx->is_finished, true); + queue_work_on(choose_cpu(&peer->serial_work_cpu, peer->internal_id), peer->device->packet_crypt_wq, &peer->send_queue.work); peer_put(peer); } chacha20poly1305_deinit_simd(have_simd); @@ -284,17 +365,19 @@ void packet_init_worker(struct work_struct *work) struct crypt_ctx *ctx; struct crypt_queue *queue = container_of(work, struct crypt_queue, work); struct wireguard_peer *peer = container_of(queue, struct wireguard_peer, init_queue); + struct wireguard_device *wg = peer->device; spin_lock(&peer->init_queue_lock); - while ((ctx = queue_first_peer(queue)) != NULL) { - if (unlikely(!packet_initialize_ctx(ctx))) { + while ((ctx = queue_first_per_peer(queue)) != NULL) { + if (unlikely(!populate_sending_ctx(ctx))) { packet_queue_handshake_initiation(peer, false); break; } queue_dequeue(queue); - if (unlikely(!queue_enqueue_peer(&peer->send_queue, ctx))) - drop_ctx_and_continue(ctx, true); - queue_enqueue_shared(peer->device->encrypt_queue, ctx, peer->device->crypt_wq, &peer->device->encrypt_cpu); + if (likely(queue_enqueue_per_peer(&peer->send_queue, ctx))) + queue_enqueue_per_device(wg->send_queue, ctx, wg->packet_crypt_wq, &wg->encrypt_cpu); + else + free_ctx(ctx); } spin_unlock(&peer->init_queue_lock); } @@ -306,7 +389,7 @@ void packet_create_data(struct wireguard_peer *peer, struct sk_buff_head *packet struct wireguard_device *wg = peer->device; bool need_handshake = false; - ctx = kmem_cache_alloc(crypt_ctx_cache, GFP_ATOMIC); + ctx = kmem_cache_zalloc(crypt_ctx_cache, GFP_ATOMIC); if (unlikely(!ctx)) { skb_queue_purge(packets); return; @@ -314,37 +397,44 @@ void packet_create_data(struct wireguard_peer *peer, struct sk_buff_head *packet skb_queue_head_init(&ctx->packets); skb_queue_splice_tail(packets, &ctx->packets); ctx->peer = peer_rcu_get(peer); - ctx->keypair = NULL; - atomic_set(&ctx->state, CTX_NEW); /* If there are already packets on the init queue, these must go behind * them to maintain the correct order, so we can only take the fast path - * when the queue is empty. */ - if (likely(queue_empty(&peer->init_queue))) { - if (likely(packet_initialize_ctx(ctx))) { - if (unlikely(!queue_enqueue_peer(&peer->send_queue, ctx))) - drop_ctx_and_return(ctx, true); - queue_enqueue_shared(wg->encrypt_queue, ctx, wg->crypt_wq, &wg->encrypt_cpu); + * when the init queue is empty. */ + if (likely(list_empty(&peer->init_queue.list))) { + if (likely(populate_sending_ctx(ctx))) { + if (likely(queue_enqueue_per_peer(&peer->send_queue, ctx))) + queue_enqueue_per_device(wg->send_queue, ctx, wg->packet_crypt_wq, &wg->encrypt_cpu); + else + free_ctx(ctx); return; } /* Initialization failed, so we need a new keypair. */ need_handshake = true; } + /* We orphan the packets if we're waiting on a handshake, so that they + * don't block a socket's pool. */ + skb_queue_walk(&ctx->packets, skb) + skb_orphan(skb); + /* Packets are kept around in the init queue as long as there is an * ongoing handshake. Throw out the oldest packets instead of the new * ones. If we cannot acquire the lock, packets are being dequeued on - * another thread. */ - if (unlikely(queue_full(&peer->init_queue)) && spin_trylock(&peer->init_queue_lock)) { - struct crypt_ctx *tmp = queue_dequeue_peer(&peer->init_queue); - if (likely(tmp)) - drop_ctx(tmp, true); - spin_unlock(&peer->init_queue_lock); + * another thread, so race for the open slot. */ + while (unlikely(!queue_enqueue_per_peer(&peer->init_queue, ctx))) { + if (spin_trylock(&peer->init_queue_lock)) { + struct crypt_ctx *tmp = queue_dequeue_per_peer(&peer->init_queue); + if (likely(tmp)) + free_ctx(tmp); + spin_unlock(&peer->init_queue_lock); + } + } + /* Oops, we added something to the queue while removing the peer. */ + if (unlikely(atomic_read(&peer->is_draining))) { + packet_purge_init_queue(peer); + return; } - skb_queue_walk(&ctx->packets, skb) - skb_orphan(skb); - if (unlikely(!queue_enqueue_peer(&peer->init_queue, ctx))) - drop_ctx_and_return(ctx, true); if (need_handshake) packet_queue_handshake_initiation(peer, false); /* If we have a valid keypair, but took the slow path because init_queue @@ -352,8 +442,8 @@ void packet_create_data(struct wireguard_peer *peer, struct sk_buff_head *packet * processing the existing packets and returned since we checked if the * init_queue was empty. Run the worker again if this is the only ctx * remaining on the queue. */ - else if (unlikely(queue_first_peer(&peer->init_queue) == ctx)) - queue_work(peer->device->crypt_wq, &peer->init_queue.work); + if (unlikely(queue_first_per_peer(&peer->init_queue) == ctx)) + queue_work(peer->device->packet_crypt_wq, &peer->init_queue.work); } void packet_receive_worker(struct work_struct *work) @@ -362,21 +452,24 @@ void packet_receive_worker(struct work_struct *work) struct crypt_queue *queue = container_of(work, struct crypt_queue, work); struct sk_buff *skb; - while ((ctx = queue_first_peer(queue)) != NULL && atomic_read(&ctx->state) == CTX_FINISHED) { + local_bh_disable(); + while ((ctx = queue_first_per_peer(queue)) != NULL && atomic_read(&ctx->is_finished)) { queue_dequeue(queue); - if (likely(skb = ctx->skb)) { - if (unlikely(!counter_validate(&ctx->keypair->receiving.counter, PACKET_CB(skb)->nonce))) { - net_dbg_ratelimited("%s: Packet has invalid nonce %Lu (max %Lu)\n", ctx->peer->device->dev->name, PACKET_CB(ctx->skb)->nonce, ctx->keypair->receiving.counter.receive.counter); - dev_kfree_skb(skb); - } else { + if (likely((skb = ctx->skb) != NULL)) { + if (likely(counter_validate(&ctx->keypair->receiving.counter, PACKET_CB(skb)->nonce))) { skb_reset(skb); packet_consume_data_done(skb, ctx->peer, &ctx->endpoint, noise_received_with_keypair(&ctx->peer->keypairs, ctx->keypair)); } + else { + net_dbg_ratelimited("%s: Packet has invalid nonce %Lu (max %Lu)\n", ctx->peer->device->dev->name, PACKET_CB(ctx->skb)->nonce, ctx->keypair->receiving.counter.receive.counter); + dev_kfree_skb(skb); + } } noise_keypair_put(ctx->keypair); peer_put(ctx->peer); kmem_cache_free(crypt_ctx_cache, ctx); } + local_bh_enable(); } void packet_decrypt_worker(struct work_struct *work) @@ -385,15 +478,16 @@ void packet_decrypt_worker(struct work_struct *work) struct crypt_queue *queue = container_of(work, struct crypt_queue, work); struct wireguard_peer *peer; - while ((ctx = queue_dequeue_shared(queue)) != NULL) { + while ((ctx = queue_dequeue_per_device(queue)) != NULL) { if (unlikely(socket_endpoint_from_skb(&ctx->endpoint, ctx->skb) < 0 || !skb_decrypt(ctx->skb, &ctx->keypair->receiving))) { dev_kfree_skb(ctx->skb); ctx->skb = NULL; } - /* Dereferencing ctx is unsafe once ctx->state == CTX_FINISHED. */ + /* Dereferencing ctx is unsafe once ctx->is_finished == true, so + * we take a reference here first. */ peer = peer_rcu_get(ctx->peer); - atomic_set(&ctx->state, CTX_FINISHED); - queue_work_on(peer->work_cpu, peer->device->crypt_wq, &peer->receive_queue.work); + atomic_set(&ctx->is_finished, true); + queue_work_on(choose_cpu(&peer->serial_work_cpu, peer->internal_id), peer->device->packet_crypt_wq, &peer->receive_queue.work); peer_put(peer); } } @@ -401,38 +495,45 @@ void packet_decrypt_worker(struct work_struct *work) void packet_consume_data(struct sk_buff *skb, struct wireguard_device *wg) { struct crypt_ctx *ctx; + struct noise_keypair *keypair; __le32 idx = ((struct message_data *)skb->data)->key_idx; - ctx = kmem_cache_alloc(crypt_ctx_cache, GFP_ATOMIC); - if (unlikely(!ctx)) { + rcu_read_lock_bh(); + keypair = noise_keypair_get((struct noise_keypair *)index_hashtable_lookup(&wg->index_hashtable, INDEX_HASHTABLE_KEYPAIR, idx)); + rcu_read_unlock_bh(); + if (unlikely(!keypair)) { dev_kfree_skb(skb); return; } - rcu_read_lock_bh(); - ctx->keypair = noise_keypair_get((struct noise_keypair *)index_hashtable_lookup(&wg->index_hashtable, INDEX_HASHTABLE_KEYPAIR, idx)); - rcu_read_unlock_bh(); - if (unlikely(!ctx->keypair)) { - kmem_cache_free(crypt_ctx_cache, ctx); + + ctx = kmem_cache_zalloc(crypt_ctx_cache, GFP_ATOMIC); + if (unlikely(!ctx)) { + peer_put(ctx->keypair->entry.peer); + noise_keypair_put(keypair); dev_kfree_skb(skb); return; } + ctx->keypair = keypair; ctx->skb = skb; /* index_hashtable_lookup() already gets a reference to peer. */ ctx->peer = ctx->keypair->entry.peer; - atomic_set(&ctx->state, CTX_NEW); - if (unlikely(!queue_enqueue_peer(&ctx->peer->receive_queue, ctx))) - drop_ctx_and_return(ctx, false); - queue_enqueue_shared(wg->decrypt_queue, ctx, wg->crypt_wq, &wg->decrypt_cpu); + if (likely(queue_enqueue_per_peer(&ctx->peer->receive_queue, ctx))) + queue_enqueue_per_device(wg->receive_queue, ctx, wg->packet_crypt_wq, &wg->decrypt_cpu); + else { + /* TODO: replace this with a call to free_ctx when receiving uses skb_queues as well. */ + noise_keypair_put(ctx->keypair); + peer_put(ctx->peer); + dev_kfree_skb(ctx->skb); + kmem_cache_free(crypt_ctx_cache, ctx); + } } -void peer_purge_queues(struct wireguard_peer *peer) +void packet_purge_init_queue(struct wireguard_peer *peer) { struct crypt_ctx *ctx; - - if (!spin_trylock(&peer->init_queue_lock)) - return; - while ((ctx = queue_dequeue_peer(&peer->init_queue)) != NULL) - drop_ctx(ctx, true); + spin_lock(&peer->init_queue_lock); + while ((ctx = queue_dequeue_per_peer(&peer->init_queue)) != NULL) + free_ctx(ctx); spin_unlock(&peer->init_queue_lock); } diff --git a/src/device.c b/src/device.c index d72736e..3615125 100644 --- a/src/device.c +++ b/src/device.c @@ -57,7 +57,7 @@ static int open(struct net_device *dev) return ret; peer_for_each (wg, peer, temp, true) { timers_init_peer(peer); - queue_work(wg->crypt_wq, &peer->init_queue.work); + queue_work(wg->packet_crypt_wq, &peer->init_queue.work); if (peer->persistent_keepalive_interval) packet_send_keepalive(peer); } @@ -111,7 +111,7 @@ static netdev_tx_t xmit(struct sk_buff *skb, struct net_device *dev) struct wireguard_device *wg = netdev_priv(dev); struct wireguard_peer *peer; struct sk_buff *next; - struct sk_buff_head queue; + struct sk_buff_head packets; int ret; if (unlikely(dev_recursion_level() > 4)) { @@ -142,7 +142,7 @@ static netdev_tx_t xmit(struct sk_buff *skb, struct net_device *dev) goto err_peer; } - __skb_queue_head_init(&queue); + __skb_queue_head_init(&packets); if (!skb_is_gso(skb)) skb->next = NULL; else { @@ -166,10 +166,10 @@ static netdev_tx_t xmit(struct sk_buff *skb, struct net_device *dev) * so at this point we're in a position to drop it. */ skb_dst_drop(skb); - __skb_queue_tail(&queue, skb); + __skb_queue_tail(&packets, skb); } while ((skb = next) != NULL); - packet_create_data(peer, &queue); + packet_create_data(peer, &packets); peer_put(peer); return NETDEV_TX_OK; @@ -220,11 +220,11 @@ static void destruct(struct net_device *dev) mutex_lock(&wg->device_update_lock); peer_remove_all(wg); wg->incoming_port = 0; - destroy_workqueue(wg->incoming_handshake_wq); - destroy_workqueue(wg->peer_wq); - free_percpu(wg->decrypt_queue); - free_percpu(wg->encrypt_queue); - destroy_workqueue(wg->crypt_wq); + destroy_workqueue(wg->handshake_receive_wq); + destroy_workqueue(wg->handshake_send_wq); + free_percpu(wg->receive_queue); + free_percpu(wg->send_queue); + destroy_workqueue(wg->packet_crypt_wq); routing_table_free(&wg->peer_routing_table); ratelimiter_uninit(); memzero_explicit(&wg->static_identity, sizeof(struct noise_static_identity)); @@ -296,34 +296,33 @@ static int newlink(struct net *src_net, struct net_device *dev, struct nlattr *t per_cpu_ptr(wg->incoming_handshakes_worker, cpu)->wg = wg; INIT_WORK(&per_cpu_ptr(wg->incoming_handshakes_worker, cpu)->work, packet_process_queued_handshake_packets); } - atomic_set(&wg->incoming_handshake_seqnr, 0); - wg->incoming_handshake_wq = alloc_workqueue("wg-kex-%s", WQ_CPU_INTENSIVE | WQ_FREEZABLE, 0, dev->name); - if (!wg->incoming_handshake_wq) + wg->handshake_receive_wq = alloc_workqueue("wg-kex-%s", WQ_CPU_INTENSIVE | WQ_FREEZABLE, 0, dev->name); + if (!wg->handshake_receive_wq) goto error_3; - wg->peer_wq = alloc_workqueue("wg-kex-%s", WQ_UNBOUND | WQ_FREEZABLE, 0, dev->name); - if (!wg->peer_wq) + wg->handshake_send_wq = alloc_workqueue("wg-kex-%s", WQ_UNBOUND | WQ_FREEZABLE, 0, dev->name); + if (!wg->handshake_send_wq) goto error_4; - wg->crypt_wq = alloc_workqueue("wg-crypt-%s", WQ_CPU_INTENSIVE | WQ_MEM_RECLAIM, 2, dev->name); - if (!wg->crypt_wq) + wg->packet_crypt_wq = alloc_workqueue("wg-crypt-%s", WQ_CPU_INTENSIVE | WQ_MEM_RECLAIM, 0, dev->name); + if (!wg->packet_crypt_wq) goto error_5; - wg->encrypt_queue = alloc_percpu(struct crypt_queue); - if (!wg->encrypt_queue) + wg->send_queue = alloc_percpu(struct crypt_queue); + if (!wg->send_queue) goto error_6; for_each_possible_cpu (cpu) { - INIT_LIST_HEAD(&per_cpu_ptr(wg->encrypt_queue, cpu)->list); - INIT_WORK(&per_cpu_ptr(wg->encrypt_queue, cpu)->work, packet_encrypt_worker); + INIT_LIST_HEAD(&per_cpu_ptr(wg->send_queue, cpu)->list); + INIT_WORK(&per_cpu_ptr(wg->send_queue, cpu)->work, packet_encrypt_worker); } - wg->decrypt_queue = alloc_percpu(struct crypt_queue); - if (!wg->decrypt_queue) + wg->receive_queue = alloc_percpu(struct crypt_queue); + if (!wg->receive_queue) goto error_7; for_each_possible_cpu (cpu) { - INIT_LIST_HEAD(&per_cpu_ptr(wg->decrypt_queue, cpu)->list); - INIT_WORK(&per_cpu_ptr(wg->decrypt_queue, cpu)->work, packet_decrypt_worker); + INIT_LIST_HEAD(&per_cpu_ptr(wg->receive_queue, cpu)->list); + INIT_WORK(&per_cpu_ptr(wg->receive_queue, cpu)->work, packet_decrypt_worker); } ret = ratelimiter_init(); @@ -346,15 +345,15 @@ static int newlink(struct net *src_net, struct net_device *dev, struct nlattr *t error_9: ratelimiter_uninit(); error_8: - free_percpu(wg->decrypt_queue); + free_percpu(wg->receive_queue); error_7: - free_percpu(wg->encrypt_queue); + free_percpu(wg->send_queue); error_6: - destroy_workqueue(wg->crypt_wq); + destroy_workqueue(wg->packet_crypt_wq); error_5: - destroy_workqueue(wg->peer_wq); + destroy_workqueue(wg->handshake_send_wq); error_4: - destroy_workqueue(wg->incoming_handshake_wq); + destroy_workqueue(wg->handshake_receive_wq); error_3: free_percpu(wg->incoming_handshakes_worker); error_2: diff --git a/src/device.h b/src/device.h index 1a75261..047fdf5 100644 --- a/src/device.h +++ b/src/device.h @@ -15,6 +15,7 @@ #include <linux/net.h> struct wireguard_device; + struct handshake_worker { struct wireguard_device *wg; struct work_struct work; @@ -34,20 +35,17 @@ struct wireguard_device { u32 fwmark; struct net *creating_net; struct noise_static_identity static_identity; - struct workqueue_struct *incoming_handshake_wq, *peer_wq; + struct workqueue_struct *handshake_receive_wq, *handshake_send_wq, *packet_crypt_wq; struct sk_buff_head incoming_handshakes; - atomic_t incoming_handshake_seqnr; + struct crypt_queue __percpu *send_queue, *receive_queue; + int incoming_handshake_cpu, encrypt_cpu, decrypt_cpu; struct handshake_worker __percpu *incoming_handshakes_worker; struct cookie_checker cookie_checker; struct pubkey_hashtable peer_hashtable; struct index_hashtable index_hashtable; struct routing_table peer_routing_table; struct list_head peer_list; - struct mutex device_update_lock; - struct mutex socket_update_lock; - struct workqueue_struct *crypt_wq; - int encrypt_cpu, decrypt_cpu; - struct crypt_queue __percpu *encrypt_queue, *decrypt_queue; + struct mutex device_update_lock, socket_update_lock; }; int device_init(void); @@ -27,7 +27,7 @@ static int __init mod_init(void) #endif noise_init(); - ret = init_crypt_cache(); + ret = init_crypt_ctx_cache(); if (ret < 0) goto err_packet; @@ -41,7 +41,7 @@ static int __init mod_init(void) return 0; err_device: - deinit_crypt_cache(); + deinit_crypt_ctx_cache(); err_packet: return ret; } @@ -49,7 +49,7 @@ err_packet: static void __exit mod_exit(void) { device_uninit(); - deinit_crypt_cache(); + deinit_crypt_ctx_cache(); pr_debug("WireGuard unloaded\n"); } diff --git a/src/messages.h b/src/messages.h index 2c0658d..f86f9c8 100644 --- a/src/messages.h +++ b/src/messages.h @@ -50,7 +50,7 @@ enum limits { KEEPALIVE_TIMEOUT = 10 * HZ, MAX_TIMER_HANDSHAKES = (90 * HZ) / REKEY_TIMEOUT, MAX_QUEUED_INCOMING_HANDSHAKES = 4096, - MAX_QUEUED_OUTGOING_PACKETS = 1024 + MAX_QUEUED_PACKETS = 1024 }; enum message_type { diff --git a/src/packets.h b/src/packets.h index 6601ef6..0b8fe1e 100644 --- a/src/packets.h +++ b/src/packets.h @@ -22,30 +22,30 @@ struct packet_cb { }; #define PACKET_CB(skb) ((struct packet_cb *)skb->cb) -/* receive.c */ -void packet_receive(struct wireguard_device *wg, struct sk_buff *skb); -void packet_process_queued_handshake_packets(struct work_struct *work); -void packet_consume_data_done(struct sk_buff *skb, struct wireguard_peer *peer, struct endpoint *endpoint, bool used_new_key); +/* data.c */ +int init_crypt_ctx_cache(void); +void deinit_crypt_ctx_cache(void); +void packet_send_worker(struct work_struct *work); +void packet_encrypt_worker(struct work_struct *work); +void packet_init_worker(struct work_struct *work); +void packet_create_data(struct wireguard_peer *peer, struct sk_buff_head *packets); void packet_receive_worker(struct work_struct *work); void packet_decrypt_worker(struct work_struct *work); void packet_consume_data(struct sk_buff *skb, struct wireguard_device *wg); +void packet_purge_init_queue(struct wireguard_peer *peer); + +/* receive.c */ +void packet_process_queued_handshake_packets(struct work_struct *work); +void packet_consume_data_done(struct sk_buff *skb, struct wireguard_peer *peer, struct endpoint *endpoint, bool used_new_key); +void packet_receive(struct wireguard_device *wg, struct sk_buff *skb); /* send.c */ -void keep_key_fresh_send(struct wireguard_peer *peer); -void packet_send_keepalive(struct wireguard_peer *peer); -void packet_queue_handshake_initiation(struct wireguard_peer *peer, bool is_retry); void packet_send_queued_handshakes(struct work_struct *work); +void packet_queue_handshake_initiation(struct wireguard_peer *peer, bool is_retry); void packet_send_handshake_response(struct wireguard_peer *peer); void packet_send_handshake_cookie(struct wireguard_device *wg, struct sk_buff *initiating_skb, __le32 sender_index); -void packet_send_worker(struct work_struct *work); -void packet_encrypt_worker(struct work_struct *work); -void packet_init_worker(struct work_struct *work); -void packet_create_data(struct wireguard_peer *peer, struct sk_buff_head *packets); - -/* data.c */ -int init_crypt_cache(void); -void deinit_crypt_cache(void); -void peer_purge_queues(struct wireguard_peer *peer); +void packet_send_keepalive(struct wireguard_peer *peer); +void packet_create_data_done(struct sk_buff_head *queue, struct wireguard_peer *peer); /* Returns either the correct skb->protocol value, or 0 if invalid. */ static inline __be16 skb_examine_untrusted_ip_hdr(struct sk_buff *skb) @@ -57,6 +57,21 @@ static inline __be16 skb_examine_untrusted_ip_hdr(struct sk_buff *skb) return 0; } +/* This function is racy, in the sense that next is unlocked, so it could return + * the same CPU twice. A race-free version of this would be to instead store an + * atomic sequence number, do an increment-and-return, and then iterate through + * every possible CPU until we get to that index -- choose_cpu. However that's + * a bit slower, and it doesn't seem like this potential race actually introduces + * any performance loss, so we live with it. */ +static inline int cpumask_next_online(int *next) +{ + int cpu = *next; + while (unlikely(!cpumask_test_cpu(cpu, cpu_online_mask))) + cpu = cpumask_next(cpu, cpu_online_mask) % nr_cpumask_bits; + *next = cpumask_next(cpu, cpu_online_mask) % nr_cpumask_bits; + return cpu; +} + #ifdef DEBUG bool packet_counter_selftest(void); #endif @@ -14,18 +14,6 @@ static atomic64_t peer_counter = ATOMIC64_INIT(0); -static int choose_cpu(u64 id) -{ - unsigned int cpu, cpu_index, i; - - cpu_index = id % cpumask_weight(cpu_online_mask); - cpu = cpumask_first(cpu_online_mask); - for (i = 0; i < cpu_index; i += 1) - cpu = cpumask_next(cpu, cpu_online_mask); - - return cpu; -} - struct wireguard_peer *peer_create(struct wireguard_device *wg, const u8 public_key[NOISE_PUBLIC_KEY_LEN], const u8 preshared_key[NOISE_SYMMETRIC_KEY_LEN]) { struct wireguard_peer *peer; @@ -44,6 +32,7 @@ struct wireguard_peer *peer_create(struct wireguard_device *wg, const u8 public_ } peer->internal_id = atomic64_inc_return(&peer_counter); + peer->serial_work_cpu = nr_cpumask_bits; peer->device = wg; cookie_init(&peer->latest_cookie); if (!noise_handshake_init(&peer->handshake, &wg->static_identity, public_key, preshared_key, peer)) { @@ -57,7 +46,6 @@ struct wireguard_peer *peer_create(struct wireguard_device *wg, const u8 public_ kref_init(&peer->refcount); pubkey_hashtable_add(&wg->peer_hashtable, peer); list_add_tail(&peer->peer_list, &wg->peer_list); - peer->work_cpu = choose_cpu(peer->internal_id); INIT_LIST_HEAD(&peer->init_queue.list); INIT_WORK(&peer->init_queue.work, packet_init_worker); INIT_LIST_HEAD(&peer->send_queue.list); @@ -99,10 +87,11 @@ void peer_remove(struct wireguard_peer *peer) timers_uninit_peer(peer); routing_table_remove_by_peer(&peer->device->peer_routing_table, peer); pubkey_hashtable_remove(&peer->device->peer_hashtable, peer); - flush_workqueue(peer->device->crypt_wq); - if (peer->device->peer_wq) - flush_workqueue(peer->device->peer_wq); - peer_purge_queues(peer); + atomic_set(&peer->is_draining, true); + packet_purge_init_queue(peer); + flush_workqueue(peer->device->packet_crypt_wq); /* The first flush is for encrypt/decrypt step. */ + flush_workqueue(peer->device->packet_crypt_wq); /* The second flush is for send/receive step. */ + flush_workqueue(peer->device->handshake_send_wq); peer_put(peer); } @@ -53,9 +53,10 @@ struct wireguard_peer { struct rcu_head rcu; struct list_head peer_list; u64 internal_id; - int work_cpu; struct crypt_queue init_queue, send_queue, receive_queue; spinlock_t init_queue_lock; + atomic_t is_draining; + int serial_work_cpu; }; struct wireguard_peer *peer_create(struct wireguard_device *wg, const u8 public_key[NOISE_PUBLIC_KEY_LEN], const u8 preshared_key[NOISE_SYMMETRIC_KEY_LEN]); diff --git a/src/queue.h b/src/queue.h deleted file mode 100644 index 537bacf..0000000 --- a/src/queue.h +++ /dev/null @@ -1,139 +0,0 @@ -/* Copyright (C) 2017 Samuel Holland <samuel@sholland.org>. All Rights Reserved. */ - -#ifndef WGQUEUE_H -#define WGQUEUE_H - -#include <linux/kernel.h> -#include <linux/skbuff.h> - -#include "device.h" -#include "peer.h" - -#define QUEUE_MAX_LEN 1000 - -enum { - CTX_NEW, - CTX_FINISHED, - CTX_FREEING, -}; - -struct crypt_ctx { - struct list_head peer_list; - struct list_head shared_list; - union { - struct sk_buff_head packets; - struct sk_buff *skb; - }; - struct wireguard_peer *peer; - struct noise_keypair *keypair; - struct endpoint endpoint; - atomic_t state; -}; - -static inline int next_cpu(int *next) -{ - int cpu = *next; - - if (cpu >= nr_cpumask_bits || !cpumask_test_cpu(cpu, cpu_online_mask)) - cpu = cpumask_first(cpu_online_mask); - *next = cpumask_next(cpu, cpu_online_mask); - return cpu; -} - -/** - * __queue_dequeue - Atomically remove the first item in a queue. - * - * @return The address of the dequeued item, or NULL if the queue is empty. - * - * This function is safe to execute concurrently with any number of - * __queue_enqueue() calls, but *not* with another __queue_dequeue() call - * operating on the same queue. - */ -static inline struct list_head *__queue_dequeue(struct list_head *queue) -{ - struct list_head *first, *second; - - first = READ_ONCE(queue->next); - if (first == queue) - return NULL; - do { - second = READ_ONCE(first->next); - WRITE_ONCE(queue->next, second); - } while (cmpxchg(&second->prev, first, queue) != first); - return first; -} - -static inline struct list_head *queue_dequeue(struct crypt_queue *queue) -{ - struct list_head *head = __queue_dequeue(&queue->list); - if (head) - atomic_dec(&queue->qlen); - return head; -} - -#define queue_dequeue_peer(queue) ({ \ - struct list_head *__head = queue_dequeue(queue); \ - __head ? list_entry(__head, struct crypt_ctx, peer_list) : NULL; \ -}) - -#define queue_dequeue_shared(queue) ({ \ - struct list_head *__head = queue_dequeue(queue); \ - __head ? list_entry(__head, struct crypt_ctx, shared_list) : NULL; \ -}) - -#define queue_empty(queue) \ - list_empty(&(queue)->list) - -/** - * __queue_enqueue - Atomically append an item to the tail of a queue. - * - * This function is safe to execute concurrently with any number of other - * __queue_enqueue() calls, as well as with one __queue_dequeue() call - * operating on the same queue. - */ -static inline void __queue_enqueue(struct list_head *queue, - struct list_head *head) -{ - struct list_head *last; - - WRITE_ONCE(head->next, queue); - do { - last = READ_ONCE(queue->prev); - WRITE_ONCE(head->prev, last); - } while (cmpxchg(&queue->prev, last, head) != last); - WRITE_ONCE(last->next, head); -} - -static inline bool queue_enqueue(struct crypt_queue *queue, - struct list_head *head, - int limit) -{ - bool have_space = !limit || atomic_inc_return(&queue->qlen) <= limit; - if (have_space) - __queue_enqueue(&queue->list, head); - else - atomic_dec(&queue->qlen); - return have_space; -} - -#define queue_enqueue_peer(queue, ctx) \ - queue_enqueue(queue, &(ctx)->peer_list, QUEUE_MAX_LEN) - -#define queue_enqueue_shared(queue, ctx, wq, cpu) ({ \ - int __cpu = next_cpu(cpu); \ - struct crypt_queue *__queue = per_cpu_ptr(queue, __cpu); \ - queue_enqueue(__queue, &(ctx)->shared_list, 0); \ - queue_work_on(__cpu, wq, &__queue->work); \ - true; \ -}) - -#define queue_first_peer(queue) \ - list_first_entry_or_null(&(queue)->list, struct crypt_ctx, peer_list) - -#define queue_first_shared(queue) \ - list_first_entry_or_null(&(queue)->list, struct crypt_ctx, shared_list) - -#define queue_full(queue) \ - (atomic_read(&(queue)->qlen) == QUEUE_MAX_LEN) - -#endif /* WGQUEUE_H */ diff --git a/src/receive.c b/src/receive.c index 1ebbe89..86d522b 100644 --- a/src/receive.c +++ b/src/receive.c @@ -187,7 +187,7 @@ void packet_consume_data_done(struct sk_buff *skb, struct wireguard_peer *peer, if (unlikely(used_new_key)) { timers_handshake_complete(peer); - queue_work(peer->device->crypt_wq, &peer->init_queue.work); + queue_work(peer->device->packet_crypt_wq, &peer->init_queue.work); } keep_key_fresh(peer); @@ -234,7 +234,7 @@ void packet_consume_data_done(struct sk_buff *skb, struct wireguard_peer *peer, goto dishonest_packet_peer; len = skb->len; - if (likely(netif_rx_ni(skb) == NET_RX_SUCCESS)) + if (likely(netif_rx(skb) == NET_RX_SUCCESS)) rx_stats(peer, len); else { ++dev->stats.rx_dropped; @@ -273,19 +273,15 @@ void packet_receive(struct wireguard_device *wg, struct sk_buff *skb) case MESSAGE_HANDSHAKE_INITIATION: case MESSAGE_HANDSHAKE_RESPONSE: case MESSAGE_HANDSHAKE_COOKIE: { - int cpu_index, cpu, target_cpu; + int cpu; if (skb_queue_len(&wg->incoming_handshakes) > MAX_QUEUED_INCOMING_HANDSHAKES) { net_dbg_skb_ratelimited("%s: Too many handshakes queued, dropping packet from %pISpfsc\n", wg->dev->name, skb); goto err; } skb_queue_tail(&wg->incoming_handshakes, skb); - /* Select the CPU in a round-robin */ - cpu_index = ((unsigned int)atomic_inc_return(&wg->incoming_handshake_seqnr)) % cpumask_weight(cpu_online_mask); - target_cpu = cpumask_first(cpu_online_mask); - for (cpu = 0; cpu < cpu_index; ++cpu) - target_cpu = cpumask_next(target_cpu, cpu_online_mask); /* Queues up a call to packet_process_queued_handshake_packets(skb): */ - queue_work_on(target_cpu, wg->incoming_handshake_wq, &per_cpu_ptr(wg->incoming_handshakes_worker, target_cpu)->work); + cpu = cpumask_next_online(&wg->incoming_handshake_cpu); + queue_work_on(cpu, wg->handshake_receive_wq, &per_cpu_ptr(wg->incoming_handshakes_worker, cpu)->work); break; } case MESSAGE_DATA: @@ -4,7 +4,6 @@ #include "timers.h" #include "device.h" #include "peer.h" -#include "queue.h" #include "socket.h" #include "messages.h" #include "cookie.h" @@ -60,7 +59,7 @@ void packet_queue_handshake_initiation(struct wireguard_peer *peer, bool is_retr return; /* Queues up calling packet_send_queued_handshakes(peer), where we do a peer_put(peer) after: */ - if (!queue_work(peer->device->peer_wq, &peer->transmit_handshake_work)) + if (!queue_work(peer->device->handshake_send_wq, &peer->transmit_handshake_work)) peer_put(peer); /* If the work was already queued, we want to drop the extra reference */ } @@ -90,7 +89,7 @@ void packet_send_handshake_cookie(struct wireguard_device *wg, struct sk_buff *i socket_send_buffer_as_reply_to_skb(wg, initiating_skb, &packet, sizeof(packet)); } -void keep_key_fresh_send(struct wireguard_peer *peer) +static inline void keep_key_fresh(struct wireguard_peer *peer) { struct noise_keypair *keypair; bool send = false; @@ -112,7 +111,7 @@ void packet_send_keepalive(struct wireguard_peer *peer) struct sk_buff *skb; struct sk_buff_head queue; - if (queue_empty(&peer->init_queue)) { + if (list_empty(&peer->init_queue.list)) { skb = alloc_skb(DATA_PACKET_HEAD_ROOM + MESSAGE_MINIMUM_LENGTH, GFP_ATOMIC); if (unlikely(!skb)) return; @@ -124,6 +123,26 @@ void packet_send_keepalive(struct wireguard_peer *peer) net_dbg_ratelimited("%s: Sending keepalive packet to peer %Lu (%pISpfsc)\n", peer->device->dev->name, peer->internal_id, &peer->endpoint.addr); } else { /* There are packets pending which need to be initialized with the new keypair. */ - queue_work(peer->device->crypt_wq, &peer->init_queue.work); + queue_work(peer->device->packet_crypt_wq, &peer->init_queue.work); } } + +void packet_create_data_done(struct sk_buff_head *queue, struct wireguard_peer *peer) +{ + struct sk_buff *skb, *tmp; + bool is_keepalive, data_sent = false; + + if (unlikely(skb_queue_empty(queue))) + return; + + timers_any_authenticated_packet_traversal(peer); + skb_queue_walk_safe (queue, skb, tmp) { + is_keepalive = skb->len == message_data_len(0); + if (likely(!socket_send_skb_to_peer(peer, skb, PACKET_CB(skb)->ds) && !is_keepalive)) + data_sent = true; + } + if (likely(data_sent)) + timers_data_sent(peer); + + keep_key_fresh(peer); +} diff --git a/src/timers.c b/src/timers.c index b507aa3..07e9297 100644 --- a/src/timers.c +++ b/src/timers.c @@ -33,7 +33,7 @@ static void expired_retransmit_handshake(unsigned long ptr) del_timer(&peer->timer_send_keepalive); /* We drop all packets without a keypair and don't try again, * if we try unsuccessfully for too long to make a handshake. */ - peer_purge_queues(peer); + packet_purge_init_queue(peer); /* We set a timer for destroying any residue that might be left * of a partial exchange. */ if (likely(peer->timers_enabled) && !timer_pending(&peer->timer_zero_key_material)) @@ -75,7 +75,7 @@ static void expired_new_handshake(unsigned long ptr) static void expired_zero_key_material(unsigned long ptr) { peer_get_from_ptr(ptr); - if (!queue_work(peer->device->peer_wq, &peer->clear_peer_work)) /* Takes our reference. */ + if (!queue_work(peer->device->handshake_send_wq, &peer->clear_peer_work)) /* Takes our reference. */ peer_put(peer); /* If the work was already on the queue, we want to drop the extra reference */ } static void queued_expired_zero_key_material(struct work_struct *work) |