aboutsummaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
authorJason A. Donenfeld <Jason@zx2c4.com>2017-09-15 03:45:41 +0200
committerJason A. Donenfeld <Jason@zx2c4.com>2017-09-15 16:40:56 +0200
commit9c1a628121770c6d16a500cd2a1886076e4d8e88 (patch)
treececee0da0d16a59285daa7401b7d86f46255b6c3
parentdata: reorganize and edit new queuing code (diff)
downloadwireguard-monolithic-historical-9c1a628121770c6d16a500cd2a1886076e4d8e88.tar.xz
wireguard-monolithic-historical-9c1a628121770c6d16a500cd2a1886076e4d8e88.zip
Try out spinlocks for multiconsumer
-rw-r--r--src/data.c74
-rw-r--r--src/device.c30
-rw-r--r--src/device.h24
-rw-r--r--src/peer.c3
-rw-r--r--src/receive.c2
-rwxr-xr-xsrc/tests/netns.sh8
6 files changed, 69 insertions, 72 deletions
diff --git a/src/data.c b/src/data.c
index 000f035..fc48d88 100644
--- a/src/data.c
+++ b/src/data.c
@@ -29,51 +29,32 @@ struct crypt_ctx {
atomic_t is_finished;
};
-/**
- * queue_dequeue - Atomically remove the first item in a queue.
- *
- * @return The address of the dequeued item, or NULL if the queue is empty.
- *
- * This function is safe to execute concurrently with any number of
- * queue_enqueue() calls, but *not* with another queue_dequeue() call
- * operating on the same queue.
- */
static struct list_head *queue_dequeue(struct crypt_queue *queue)
{
- struct list_head *first, *second;
- first = READ_ONCE(queue->list.next);
- if (first == &queue->list)
+ struct list_head *head;
+ spin_lock_bh(&queue->lock);
+ head = READ_ONCE(queue->list.next);
+ if (&queue->list == head) {
+ spin_unlock_bh(&queue->lock);
return NULL;
- do {
- second = READ_ONCE(first->next);
- WRITE_ONCE(queue->list.next, second);
- } while (cmpxchg(&second->prev, first, &queue->list) != first);
- if (first)
- atomic_dec(&queue->qlen);
- return first;
+ }
+ list_del(head);
+ --queue->qlen;
+ spin_unlock_bh(&queue->lock);
+ return head;
}
-/**
- * queue_enqueue - Atomically append an item to the tail of a queue.
- *
- * This function is safe to execute concurrently with any number of other
- * queue_enqueue() calls, as well as with one queue_dequeue() call
- * operating on the same queue.
- */
static bool queue_enqueue(struct crypt_queue *queue, struct list_head *head, int limit)
{
- bool have_space = !limit || atomic_inc_return(&queue->qlen) <= limit;
- if (have_space) {
- struct list_head *last;
- WRITE_ONCE(head->next, &queue->list);
- do {
- last = READ_ONCE(queue->list.prev);
- WRITE_ONCE(head->prev, last);
- } while (cmpxchg(&queue->list.prev, last, head) != last);
- WRITE_ONCE(last->next, head);
- } else
- atomic_dec(&queue->qlen);
- return have_space;
+ spin_lock_bh(&queue->lock);
+ if (limit && queue->qlen >= limit) {
+ spin_unlock_bh(&queue->lock);
+ return false;
+ }
+ ++queue->qlen;
+ list_add_tail(head, &queue->list);
+ spin_unlock_bh(&queue->lock);
+ return true;
}
static inline struct crypt_ctx *queue_dequeue_per_peer(struct crypt_queue *queue)
@@ -96,12 +77,11 @@ static inline bool queue_enqueue_per_peer(struct crypt_queue *queue, struct cryp
return queue_enqueue(queue, &(ctx)->per_peer_head, MAX_QUEUED_PACKETS);
}
-static inline void queue_enqueue_per_device(struct crypt_queue __percpu *queue, struct crypt_ctx *ctx, struct workqueue_struct *wq, int *next_cpu)
+static inline void queue_enqueue_per_device(struct crypt_queue *queue, struct crypt_ctx *ctx, struct workqueue_struct *wq, int *next_cpu)
{
int cpu = cpumask_next_online(next_cpu);
- struct crypt_queue *cpu_queue = per_cpu_ptr(queue, cpu);
- queue_enqueue(cpu_queue, &ctx->per_device_head, 0);
- queue_work_on(cpu, wq, &cpu_queue->work);
+ queue_enqueue(queue, &ctx->per_device_head, 0);
+ queue_work_on(cpu, wq, &per_cpu_ptr(queue->worker, cpu)->work);
}
static inline struct crypt_ctx *queue_first_per_peer(struct crypt_queue *queue)
@@ -336,7 +316,7 @@ void packet_send_worker(struct work_struct *work)
void packet_encrypt_worker(struct work_struct *work)
{
struct crypt_ctx *ctx;
- struct crypt_queue *queue = container_of(work, struct crypt_queue, work);
+ struct crypt_queue *queue = container_of(work, struct multicore_worker, work)->queue;
struct sk_buff *skb, *tmp;
struct wireguard_peer *peer;
bool have_simd = chacha20poly1305_init_simd();
@@ -375,7 +355,7 @@ void packet_init_worker(struct work_struct *work)
}
queue_dequeue(queue);
if (likely(queue_enqueue_per_peer(&peer->send_queue, ctx)))
- queue_enqueue_per_device(wg->send_queue, ctx, wg->packet_crypt_wq, &wg->encrypt_cpu);
+ queue_enqueue_per_device(&wg->send_queue, ctx, wg->packet_crypt_wq, &wg->send_queue.last_cpu);
else
free_ctx(ctx);
}
@@ -404,7 +384,7 @@ void packet_create_data(struct wireguard_peer *peer, struct sk_buff_head *packet
if (likely(list_empty(&peer->init_queue.list))) {
if (likely(populate_sending_ctx(ctx))) {
if (likely(queue_enqueue_per_peer(&peer->send_queue, ctx)))
- queue_enqueue_per_device(wg->send_queue, ctx, wg->packet_crypt_wq, &wg->encrypt_cpu);
+ queue_enqueue_per_device(&wg->send_queue, ctx, wg->packet_crypt_wq, &wg->send_queue.last_cpu);
else
free_ctx(ctx);
return;
@@ -475,7 +455,7 @@ void packet_receive_worker(struct work_struct *work)
void packet_decrypt_worker(struct work_struct *work)
{
struct crypt_ctx *ctx;
- struct crypt_queue *queue = container_of(work, struct crypt_queue, work);
+ struct crypt_queue *queue = container_of(work, struct multicore_worker, work)->queue;
struct wireguard_peer *peer;
while ((ctx = queue_dequeue_per_device(queue)) != NULL) {
@@ -519,7 +499,7 @@ void packet_consume_data(struct sk_buff *skb, struct wireguard_device *wg)
ctx->peer = ctx->keypair->entry.peer;
if (likely(queue_enqueue_per_peer(&ctx->peer->receive_queue, ctx)))
- queue_enqueue_per_device(wg->receive_queue, ctx, wg->packet_crypt_wq, &wg->decrypt_cpu);
+ queue_enqueue_per_device(&wg->receive_queue, ctx, wg->packet_crypt_wq, &wg->receive_queue.last_cpu);
else {
/* TODO: replace this with a call to free_ctx when receiving uses skb_queues as well. */
noise_keypair_put(ctx->keypair);
diff --git a/src/device.c b/src/device.c
index 3615125..070e5e5 100644
--- a/src/device.c
+++ b/src/device.c
@@ -222,8 +222,8 @@ static void destruct(struct net_device *dev)
wg->incoming_port = 0;
destroy_workqueue(wg->handshake_receive_wq);
destroy_workqueue(wg->handshake_send_wq);
- free_percpu(wg->receive_queue);
- free_percpu(wg->send_queue);
+ free_percpu(wg->receive_queue.worker);
+ free_percpu(wg->send_queue.worker);
destroy_workqueue(wg->packet_crypt_wq);
routing_table_free(&wg->peer_routing_table);
ratelimiter_uninit();
@@ -289,7 +289,7 @@ static int newlink(struct net *src_net, struct net_device *dev, struct nlattr *t
if (!dev->tstats)
goto error_1;
- wg->incoming_handshakes_worker = alloc_percpu(struct handshake_worker);
+ wg->incoming_handshakes_worker = alloc_percpu(struct multicore_worker);
if (!wg->incoming_handshakes_worker)
goto error_2;
for_each_possible_cpu (cpu) {
@@ -309,20 +309,24 @@ static int newlink(struct net *src_net, struct net_device *dev, struct nlattr *t
if (!wg->packet_crypt_wq)
goto error_5;
- wg->send_queue = alloc_percpu(struct crypt_queue);
- if (!wg->send_queue)
+ INIT_LIST_HEAD(&wg->send_queue.list);
+ spin_lock_init(&wg->send_queue.lock);
+ wg->send_queue.worker = alloc_percpu(struct multicore_worker);
+ if (!wg->send_queue.worker)
goto error_6;
for_each_possible_cpu (cpu) {
- INIT_LIST_HEAD(&per_cpu_ptr(wg->send_queue, cpu)->list);
- INIT_WORK(&per_cpu_ptr(wg->send_queue, cpu)->work, packet_encrypt_worker);
+ per_cpu_ptr(wg->send_queue.worker, cpu)->queue = &wg->send_queue;
+ INIT_WORK(&per_cpu_ptr(wg->send_queue.worker, cpu)->work, packet_encrypt_worker);
}
- wg->receive_queue = alloc_percpu(struct crypt_queue);
- if (!wg->receive_queue)
+ INIT_LIST_HEAD(&wg->receive_queue.list);
+ spin_lock_init(&wg->receive_queue.lock);
+ wg->receive_queue.worker = alloc_percpu(struct multicore_worker);
+ if (!wg->receive_queue.worker)
goto error_7;
for_each_possible_cpu (cpu) {
- INIT_LIST_HEAD(&per_cpu_ptr(wg->receive_queue, cpu)->list);
- INIT_WORK(&per_cpu_ptr(wg->receive_queue, cpu)->work, packet_decrypt_worker);
+ per_cpu_ptr(wg->receive_queue.worker, cpu)->queue = &wg->receive_queue;
+ INIT_WORK(&per_cpu_ptr(wg->receive_queue.worker, cpu)->work, packet_decrypt_worker);
}
ret = ratelimiter_init();
@@ -345,9 +349,9 @@ static int newlink(struct net *src_net, struct net_device *dev, struct nlattr *t
error_9:
ratelimiter_uninit();
error_8:
- free_percpu(wg->receive_queue);
+ free_percpu(wg->receive_queue.worker);
error_7:
- free_percpu(wg->send_queue);
+ free_percpu(wg->send_queue.worker);
error_6:
destroy_workqueue(wg->packet_crypt_wq);
error_5:
diff --git a/src/device.h b/src/device.h
index 047fdf5..9378b8b 100644
--- a/src/device.h
+++ b/src/device.h
@@ -16,15 +16,25 @@
struct wireguard_device;
-struct handshake_worker {
- struct wireguard_device *wg;
+struct multicore_worker {
+ union {
+ struct wireguard_device *wg;
+ struct crypt_queue *queue;
+ };
struct work_struct work;
};
struct crypt_queue {
+ spinlock_t lock;
struct list_head list;
- struct work_struct work;
- atomic_t qlen;
+ union {
+ struct {
+ struct multicore_worker __percpu *worker;
+ int last_cpu;
+ };
+ struct work_struct work;
+ };
+ int qlen;
};
struct wireguard_device {
@@ -37,9 +47,9 @@ struct wireguard_device {
struct noise_static_identity static_identity;
struct workqueue_struct *handshake_receive_wq, *handshake_send_wq, *packet_crypt_wq;
struct sk_buff_head incoming_handshakes;
- struct crypt_queue __percpu *send_queue, *receive_queue;
- int incoming_handshake_cpu, encrypt_cpu, decrypt_cpu;
- struct handshake_worker __percpu *incoming_handshakes_worker;
+ struct crypt_queue send_queue, receive_queue;
+ int incoming_handshake_cpu;
+ struct multicore_worker __percpu *incoming_handshakes_worker;
struct cookie_checker cookie_checker;
struct pubkey_hashtable peer_hashtable;
struct index_hashtable index_hashtable;
diff --git a/src/peer.c b/src/peer.c
index 62d3259..9d88da7 100644
--- a/src/peer.c
+++ b/src/peer.c
@@ -48,10 +48,13 @@ struct wireguard_peer *peer_create(struct wireguard_device *wg, const u8 public_
list_add_tail(&peer->peer_list, &wg->peer_list);
INIT_LIST_HEAD(&peer->init_queue.list);
INIT_WORK(&peer->init_queue.work, packet_init_worker);
+ spin_lock_init(&peer->init_queue.lock);
INIT_LIST_HEAD(&peer->send_queue.list);
INIT_WORK(&peer->send_queue.work, packet_send_worker);
+ spin_lock_init(&peer->send_queue.lock);
INIT_LIST_HEAD(&peer->receive_queue.list);
INIT_WORK(&peer->receive_queue.work, packet_receive_worker);
+ spin_lock_init(&peer->receive_queue.lock);
spin_lock_init(&peer->init_queue_lock);
pr_debug("%s: Peer %Lu created\n", wg->dev->name, peer->internal_id);
return peer;
diff --git a/src/receive.c b/src/receive.c
index 86d522b..0a91065 100644
--- a/src/receive.c
+++ b/src/receive.c
@@ -147,7 +147,7 @@ static void receive_handshake_packet(struct wireguard_device *wg, struct sk_buff
void packet_process_queued_handshake_packets(struct work_struct *work)
{
- struct wireguard_device *wg = container_of(work, struct handshake_worker, work)->wg;
+ struct wireguard_device *wg = container_of(work, struct multicore_worker, work)->wg;
struct sk_buff *skb;
while ((skb = skb_dequeue(&wg->incoming_handshakes)) != NULL) {
diff --git a/src/tests/netns.sh b/src/tests/netns.sh
index ea70fb5..166f8a2 100755
--- a/src/tests/netns.sh
+++ b/src/tests/netns.sh
@@ -116,22 +116,22 @@ tests() {
# TCP over IPv4
n2 iperf3 -s -1 -B 192.168.241.2 &
waitiperf $netns2
- n1 iperf3 -Z -n 1G -c 192.168.241.2
+ n1 iperf3 -Z -n 100000G -c 192.168.241.2
# TCP over IPv6
n1 iperf3 -s -1 -B fd00::1 &
waitiperf $netns1
- n2 iperf3 -Z -n 1G -c fd00::1
+ n2 iperf3 -Z -n 100000G -c fd00::1
# UDP over IPv4
n1 iperf3 -s -1 -B 192.168.241.1 &
waitiperf $netns1
- n2 iperf3 -Z -n 1G -b 0 -u -c 192.168.241.1
+ n2 iperf3 -Z -n 100000G -b 0 -u -c 192.168.241.1
# UDP over IPv6
n2 iperf3 -s -1 -B fd00::2 &
waitiperf $netns2
- n1 iperf3 -Z -n 1G -b 0 -u -c fd00::2
+ n1 iperf3 -Z -n 100000G -b 0 -u -c fd00::2
}
[[ $(ip1 link show dev wg0) =~ mtu\ ([0-9]+) ]] && orig_mtu="${BASH_REMATCH[1]}"