aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/src/data.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/data.c')
-rw-r--r--src/data.c166
1 files changed, 80 insertions, 86 deletions
diff --git a/src/data.c b/src/data.c
index f30b777..0d01c64 100644
--- a/src/data.c
+++ b/src/data.c
@@ -17,8 +17,7 @@
#include <crypto/algapi.h>
struct crypt_ctx {
- struct list_head per_peer_head;
- struct list_head per_device_head;
+ struct list_head per_peer_node, per_device_node;
union {
struct sk_buff_head packets;
struct sk_buff *skb;
@@ -29,91 +28,97 @@ struct crypt_ctx {
atomic_t is_finished;
};
-/**
- * queue_dequeue - Atomically remove the first item in a queue.
- *
- * @return The address of the dequeued item, or NULL if the queue is empty.
- *
- * This function is safe to execute concurrently with any number of
- * queue_enqueue() calls, but *not* with another queue_dequeue() call
- * operating on the same queue.
- */
-static struct list_head *queue_dequeue(struct crypt_queue *queue)
+struct multicore_worker __percpu *packet_alloc_percpu_multicore_worker(work_func_t function, void *ptr)
{
- struct list_head *first, *second;
- first = READ_ONCE(queue->list.next);
- if (first == &queue->list)
+ int cpu;
+ struct multicore_worker __percpu *worker = alloc_percpu(struct multicore_worker);
+ if (!worker)
return NULL;
- do {
- second = READ_ONCE(first->next);
- WRITE_ONCE(queue->list.next, second);
- } while (cmpxchg(&second->prev, first, &queue->list) != first);
- if (first)
- atomic_dec(&queue->qlen);
- return first;
+ for_each_possible_cpu (cpu) {
+ per_cpu_ptr(worker, cpu)->ptr = ptr;
+ INIT_WORK(&per_cpu_ptr(worker, cpu)->work, function);
+ }
+ return worker;
}
-/**
- * queue_enqueue - Atomically append an item to the tail of a queue.
- *
- * This function is safe to execute concurrently with any number of other
- * queue_enqueue() calls, as well as with one queue_dequeue() call
- * operating on the same queue.
- */
-static bool queue_enqueue(struct crypt_queue *queue, struct list_head *head, int limit)
+int packet_queue_init(struct crypt_queue *queue, work_func_t function, bool multicore)
{
- bool have_space = !limit || atomic_inc_return(&queue->qlen) <= limit;
- if (have_space) {
- struct list_head *last;
- WRITE_ONCE(head->next, &queue->list);
- do {
- last = READ_ONCE(queue->list.prev);
- WRITE_ONCE(head->prev, last);
- } while (cmpxchg(&queue->list.prev, last, head) != last);
- WRITE_ONCE(last->next, head);
+ INIT_LIST_HEAD(&queue->queue);
+ queue->len = 0;
+ spin_lock_init(&queue->lock);
+ if (multicore) {
+ queue->worker = packet_alloc_percpu_multicore_worker(function, queue);
+ if (!queue->worker)
+ return -ENOMEM;
} else
- atomic_dec(&queue->qlen);
- return have_space;
+ INIT_WORK(&queue->work, function);
+ return 0;
+}
+
+static struct list_head *queue_dequeue(struct crypt_queue *queue)
+{
+ struct list_head *node;
+ spin_lock_bh(&queue->lock);
+ node = queue->queue.next;
+ if (&queue->queue == node) {
+ spin_unlock_bh(&queue->lock);
+ return NULL;
+ }
+ list_del(node);
+ --queue->len;
+ spin_unlock_bh(&queue->lock);
+ return node;
+}
+
+static bool queue_enqueue(struct crypt_queue *queue, struct list_head *node, int limit)
+{
+ spin_lock_bh(&queue->lock);
+ if (limit && queue->len >= limit) {
+ spin_unlock_bh(&queue->lock);
+ return false;
+ }
+ list_add_tail(node, &queue->queue);
+ ++queue->len;
+ spin_unlock_bh(&queue->lock);
+ return true;
}
static inline struct crypt_ctx *queue_dequeue_per_peer(struct crypt_queue *queue)
{
- struct list_head *head = queue_dequeue(queue);
- return head ? list_entry(head, struct crypt_ctx, per_peer_head) : NULL;
+ struct list_head *node = queue_dequeue(queue);
+ return node ? list_entry(node, struct crypt_ctx, per_peer_node) : NULL;
}
static inline struct crypt_ctx *queue_dequeue_per_device(struct crypt_queue *queue)
{
- struct list_head *head = queue_dequeue(queue);
- return head ? list_entry(head, struct crypt_ctx, per_device_head) : NULL;
+ struct list_head *node = queue_dequeue(queue);
+ return node ? list_entry(node, struct crypt_ctx, per_device_node) : NULL;
}
-static inline bool queue_enqueue_per_peer(struct crypt_queue *queue, struct crypt_ctx *ctx)
+static inline struct crypt_ctx *queue_first_per_peer(struct crypt_queue *queue)
{
- /* TODO: While using MAX_QUEUED_PACKETS makes sense for the init_queue, it's
- * not ideal to be using this for the encrypt/decrypt queues or the send/receive
- * queues, where dynamic_queue_limit (dql) should be used instead. */
- return queue_enqueue(queue, &(ctx)->per_peer_head, MAX_QUEUED_PACKETS);
+ /* TODO: yikes, this isn't locked. But only called from single-consumer contexts.
+ * That should be okay, hopefully. Unless it isn't. */
+ return list_first_entry_or_null(&queue->queue, struct crypt_ctx, per_peer_node);
}
-static inline void queue_enqueue_per_device(struct crypt_queue __percpu *queue, struct crypt_ctx *ctx, struct workqueue_struct *wq, int *next_cpu)
+static inline bool queue_enqueue_per_peer(struct crypt_queue *peer_queue, struct crypt_ctx *ctx)
{
- struct crypt_queue *cpu_queue;
- int cpu = cpumask_next_online(next_cpu);
- /* Avoid running parallel work on the same CPU as the one handling all
- * of the serial work. This improves overall throughput and especially
- * throughput stability where we have at least two cores left for
- * parallel work. */
- if (cpu == ctx->peer->serial_work_cpu && num_online_cpus() >= 3)
- cpu = cpumask_next_online(next_cpu);
- cpu_queue = per_cpu_ptr(queue, cpu);
- queue_enqueue(cpu_queue, &ctx->per_device_head, 0);
- queue_work_on(cpu, wq, &cpu_queue->work);
+ /* TODO: While using MAX_QUEUED_PACKETS makes sense for the init_queue, it's
+ * not ideal to be using this for the encrypt/decrypt queues or the send/receive
+ * queues, where dynamic_queue_limit (dql) should be used instead. */
+ return queue_enqueue(peer_queue, &ctx->per_peer_node, MAX_QUEUED_PACKETS);
}
-static inline struct crypt_ctx *queue_first_per_peer(struct crypt_queue *queue)
+static inline bool queue_enqueue_per_device_and_peer(struct crypt_queue *device_queue, struct crypt_queue *peer_queue, struct crypt_ctx *ctx, struct workqueue_struct *wq, int *next_cpu)
{
- return list_first_entry_or_null(&(queue)->list, struct crypt_ctx, per_peer_head);
+ int cpu;
+ if (unlikely(!queue_enqueue_per_peer(peer_queue, ctx)))
+ return false;
+ cpu = cpumask_next_online(next_cpu);
+ queue_enqueue(device_queue, &ctx->per_device_node, 0);
+ queue_work_on(cpu, wq, &per_cpu_ptr(device_queue->worker, cpu)->work);
+ return true;
}
/* This is RFC6479, a replay detection bitmap algorithm that avoids bitshifts */
@@ -343,7 +348,7 @@ void packet_send_worker(struct work_struct *work)
void packet_encrypt_worker(struct work_struct *work)
{
struct crypt_ctx *ctx;
- struct crypt_queue *queue = container_of(work, struct crypt_queue, work);
+ struct crypt_queue *queue = container_of(work, struct multicore_worker, work)->ptr;
struct sk_buff *skb, *tmp;
struct wireguard_peer *peer;
bool have_simd = chacha20poly1305_init_simd();
@@ -374,19 +379,16 @@ void packet_init_worker(struct work_struct *work)
struct wireguard_peer *peer = container_of(queue, struct wireguard_peer, init_queue);
struct wireguard_device *wg = peer->device;
- spin_lock(&peer->init_queue_lock);
+ /* TODO: does this race with packet_purge_init_queue and the other dequeuer in create_data, since it's unlocked? */
while ((ctx = queue_first_per_peer(queue)) != NULL) {
if (unlikely(!populate_sending_ctx(ctx))) {
packet_queue_handshake_initiation(peer, false);
break;
}
queue_dequeue(queue);
- if (likely(queue_enqueue_per_peer(&peer->send_queue, ctx)))
- queue_enqueue_per_device(wg->send_queue, ctx, wg->packet_crypt_wq, &wg->encrypt_cpu);
- else
+ if (unlikely(!queue_enqueue_per_device_and_peer(&wg->send_queue, &peer->send_queue, ctx, wg->packet_crypt_wq, &wg->send_queue.last_cpu)))
free_ctx(ctx);
}
- spin_unlock(&peer->init_queue_lock);
}
void packet_create_data(struct wireguard_peer *peer, struct sk_buff_head *packets)
@@ -408,11 +410,9 @@ void packet_create_data(struct wireguard_peer *peer, struct sk_buff_head *packet
/* If there are already packets on the init queue, these must go behind
* them to maintain the correct order, so we can only take the fast path
* when the init queue is empty. */
- if (likely(list_empty(&peer->init_queue.list))) {
+ if (likely(list_empty(&peer->init_queue.queue))) {
if (likely(populate_sending_ctx(ctx))) {
- if (likely(queue_enqueue_per_peer(&peer->send_queue, ctx)))
- queue_enqueue_per_device(wg->send_queue, ctx, wg->packet_crypt_wq, &wg->encrypt_cpu);
- else
+ if (unlikely(!queue_enqueue_per_device_and_peer(&wg->send_queue, &peer->send_queue, ctx, wg->packet_crypt_wq, &wg->send_queue.last_cpu)))
free_ctx(ctx);
return;
}
@@ -430,13 +430,11 @@ void packet_create_data(struct wireguard_peer *peer, struct sk_buff_head *packet
* ones. If we cannot acquire the lock, packets are being dequeued on
* another thread, so race for the open slot. */
while (unlikely(!queue_enqueue_per_peer(&peer->init_queue, ctx))) {
- if (spin_trylock(&peer->init_queue_lock)) {
- struct crypt_ctx *tmp = queue_dequeue_per_peer(&peer->init_queue);
- if (likely(tmp))
- free_ctx(tmp);
- spin_unlock(&peer->init_queue_lock);
- }
+ struct crypt_ctx *tmp = queue_dequeue_per_peer(&peer->init_queue);
+ if (likely(tmp))
+ free_ctx(tmp);
}
+
/* Oops, we added something to the queue while removing the peer. */
if (unlikely(atomic_read(&peer->is_draining))) {
packet_purge_init_queue(peer);
@@ -482,7 +480,7 @@ void packet_receive_worker(struct work_struct *work)
void packet_decrypt_worker(struct work_struct *work)
{
struct crypt_ctx *ctx;
- struct crypt_queue *queue = container_of(work, struct crypt_queue, work);
+ struct crypt_queue *queue = container_of(work, struct multicore_worker, work)->ptr;
struct wireguard_peer *peer;
while ((ctx = queue_dequeue_per_device(queue)) != NULL) {
@@ -525,9 +523,7 @@ void packet_consume_data(struct sk_buff *skb, struct wireguard_device *wg)
/* index_hashtable_lookup() already gets a reference to peer. */
ctx->peer = ctx->keypair->entry.peer;
- if (likely(queue_enqueue_per_peer(&ctx->peer->receive_queue, ctx)))
- queue_enqueue_per_device(wg->receive_queue, ctx, wg->packet_crypt_wq, &wg->decrypt_cpu);
- else {
+ if (unlikely(!queue_enqueue_per_device_and_peer(&wg->receive_queue, &ctx->peer->receive_queue, ctx, wg->packet_crypt_wq, &wg->receive_queue.last_cpu))) {
/* TODO: replace this with a call to free_ctx when receiving uses skb_queues as well. */
noise_keypair_put(ctx->keypair);
peer_put(ctx->peer);
@@ -539,8 +535,6 @@ void packet_consume_data(struct sk_buff *skb, struct wireguard_device *wg)
void packet_purge_init_queue(struct wireguard_peer *peer)
{
struct crypt_ctx *ctx;
- spin_lock(&peer->init_queue_lock);
while ((ctx = queue_dequeue_per_peer(&peer->init_queue)) != NULL)
free_ctx(ctx);
- spin_unlock(&peer->init_queue_lock);
}