From e1fead9769303cf160addfb9f16f8f9fda1ff617 Mon Sep 17 00:00:00 2001 From: Samuel Holland Date: Wed, 7 Jun 2017 01:39:08 -0500 Subject: data: entirely rework parallel system This removes our dependency on padata. Signed-off-by: Samuel Holland --- src/Kbuild | 3 - src/Kconfig | 12 - src/compat/Kbuild.include | 8 - src/compat/padata/padata.c | 903 ------------------------------------------- src/config.c | 2 +- src/data.c | 432 +++++++++++---------- src/device.c | 46 ++- src/device.h | 12 +- src/main.c | 12 +- src/packets.h | 21 +- src/peer.c | 28 +- src/peer.h | 9 +- src/queue.h | 139 +++++++ src/receive.c | 5 +- src/send.c | 96 +---- src/tests/qemu/kernel.config | 1 - src/timers.c | 4 +- 17 files changed, 445 insertions(+), 1288 deletions(-) delete mode 100644 src/compat/padata/padata.c create mode 100644 src/queue.h diff --git a/src/Kbuild b/src/Kbuild index c5b8718..0ab3277 100644 --- a/src/Kbuild +++ b/src/Kbuild @@ -26,9 +26,6 @@ endif ifneq ($(KBUILD_EXTMOD),) CONFIG_WIREGUARD := m -ifneq ($(CONFIG_SMP),) -ccflags-y += -DCONFIG_WIREGUARD_PARALLEL=y -endif endif include $(src)/compat/Kbuild.include diff --git a/src/Kconfig b/src/Kconfig index e86a935..e84aebb 100644 --- a/src/Kconfig +++ b/src/Kconfig @@ -16,18 +16,6 @@ config WIREGUARD It's safe to say Y or M here, as the driver is very lightweight and is only in use when an administrator chooses to add an interface. -config WIREGUARD_PARALLEL - bool "Enable parallel engine" - depends on SMP && WIREGUARD - select PADATA - default y - ---help--- - This will allow WireGuard to utilize all CPU cores when encrypting - and decrypting packets. - - It's safe to say Y here, and you probably should, as the performance - improvements are substantial. - config WIREGUARD_DEBUG bool "Debugging checks and verbose messages" depends on WIREGUARD diff --git a/src/compat/Kbuild.include b/src/compat/Kbuild.include index 688a573..aacc9f6 100644 --- a/src/compat/Kbuild.include +++ b/src/compat/Kbuild.include @@ -31,11 +31,3 @@ ifeq ($(shell grep -F "int crypto_memneq" "$(srctree)/include/crypto/algapi.h"), ccflags-y += -include $(src)/compat/memneq/include.h wireguard-y += compat/memneq/memneq.o endif - -ifneq ($(KBUILD_EXTMOD),) -ifneq ($(CONFIG_SMP),) -ifeq (,$(filter $(CONFIG_PADATA),y m)) -wireguard-y += compat/padata/padata.o -endif -endif -endif diff --git a/src/compat/padata/padata.c b/src/compat/padata/padata.c deleted file mode 100644 index fa6acac..0000000 --- a/src/compat/padata/padata.c +++ /dev/null @@ -1,903 +0,0 @@ -/* - * padata.c - generic interface to process data streams in parallel - * - * See Documentation/padata.txt for an api documentation. - * - * Copyright (C) 2008, 2009 secunet Security Networks AG - * Copyright (C) 2008, 2009 Steffen Klassert - * - * This program is free software; you can redistribute it and/or modify it - * under the terms and conditions of the GNU General Public License, - * version 2, as published by the Free Software Foundation. - * - * This program is distributed in the hope it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along with - * this program; if not, write to the Free Software Foundation, Inc., - * 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA. - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#define MAX_OBJ_NUM 1000 - -static int padata_index_to_cpu(struct parallel_data *pd, int cpu_index) -{ - int cpu, target_cpu; - - target_cpu = cpumask_first(pd->cpumask.pcpu); - for (cpu = 0; cpu < cpu_index; cpu++) - target_cpu = cpumask_next(target_cpu, pd->cpumask.pcpu); - - return target_cpu; -} - -static int padata_cpu_hash(struct parallel_data *pd) -{ - int cpu_index; - /* - * Hash the sequence numbers to the cpus by taking - * seq_nr mod. number of cpus in use. - */ -#if LINUX_VERSION_CODE < KERNEL_VERSION(3, 13, 0) - spin_lock(&pd->seq_lock); - cpu_index = pd->seq_nr % cpumask_weight(pd->cpumask.pcpu); - pd->seq_nr++; - spin_unlock(&pd->seq_lock); -#else -#ifdef CONFIG_PAX_REFCOUNT - unsigned int seq_nr = atomic_inc_return_unchecked(&pd->seq_nr); -#else - unsigned int seq_nr = atomic_inc_return(&pd->seq_nr); -#endif - cpu_index = seq_nr % cpumask_weight(pd->cpumask.pcpu); -#endif - - return padata_index_to_cpu(pd, cpu_index); -} - -static void padata_parallel_worker(struct work_struct *parallel_work) -{ - struct padata_parallel_queue *pqueue; - LIST_HEAD(local_list); - - local_bh_disable(); - pqueue = container_of(parallel_work, - struct padata_parallel_queue, work); - - spin_lock(&pqueue->parallel.lock); - list_replace_init(&pqueue->parallel.list, &local_list); - spin_unlock(&pqueue->parallel.lock); - - while (!list_empty(&local_list)) { - struct padata_priv *padata; - - padata = list_entry(local_list.next, - struct padata_priv, list); - - list_del_init(&padata->list); - - padata->parallel(padata); - } - - local_bh_enable(); -} - -/** - * padata_do_parallel - padata parallelization function - * - * @pinst: padata instance - * @padata: object to be parallelized - * @cb_cpu: cpu the serialization callback function will run on, - * must be in the serial cpumask of padata(i.e. cpumask.cbcpu). - * - * The parallelization callback function will run with BHs off. - * Note: Every object which is parallelized by padata_do_parallel - * must be seen by padata_do_serial. - */ -int padata_do_parallel(struct padata_instance *pinst, - struct padata_priv *padata, int cb_cpu) -{ - int target_cpu, err; - struct padata_parallel_queue *queue; - struct parallel_data *pd; - - rcu_read_lock_bh(); - - pd = rcu_dereference_bh(pinst->pd); - - err = -EINVAL; - if (!(pinst->flags & PADATA_INIT) || pinst->flags & PADATA_INVALID) - goto out; - - if (!cpumask_test_cpu(cb_cpu, pd->cpumask.cbcpu)) - goto out; - - err = -EBUSY; - if ((pinst->flags & PADATA_RESET)) - goto out; - - if (atomic_read(&pd->refcnt) >= MAX_OBJ_NUM) - goto out; - - err = 0; - atomic_inc(&pd->refcnt); - padata->pd = pd; - padata->cb_cpu = cb_cpu; - - target_cpu = padata_cpu_hash(pd); - queue = per_cpu_ptr(pd->pqueue, target_cpu); - - spin_lock(&queue->parallel.lock); - list_add_tail(&padata->list, &queue->parallel.list); - spin_unlock(&queue->parallel.lock); - - queue_work_on(target_cpu, pinst->wq, &queue->work); - -out: - rcu_read_unlock_bh(); - - return err; -} - -/* - * padata_get_next - Get the next object that needs serialization. - * - * Return values are: - * - * A pointer to the control struct of the next object that needs - * serialization, if present in one of the percpu reorder queues. - * - * -EINPROGRESS, if the next object that needs serialization will - * be parallel processed by another cpu and is not yet present in - * the cpu's reorder queue. - * - * -ENODATA, if this cpu has to do the parallel processing for - * the next object. - */ -static struct padata_priv *padata_get_next(struct parallel_data *pd) -{ - int cpu, num_cpus; - unsigned int next_nr, next_index; - struct padata_parallel_queue *next_queue; - struct padata_priv *padata; - struct padata_list *reorder; - - num_cpus = cpumask_weight(pd->cpumask.pcpu); - - /* - * Calculate the percpu reorder queue and the sequence - * number of the next object. - */ - next_nr = pd->processed; - next_index = next_nr % num_cpus; - cpu = padata_index_to_cpu(pd, next_index); - next_queue = per_cpu_ptr(pd->pqueue, cpu); - - reorder = &next_queue->reorder; - - spin_lock(&reorder->lock); - if (!list_empty(&reorder->list)) { - padata = list_entry(reorder->list.next, - struct padata_priv, list); - - list_del_init(&padata->list); - atomic_dec(&pd->reorder_objects); - - pd->processed++; - - spin_unlock(&reorder->lock); - goto out; - } - spin_unlock(&reorder->lock); - - if (__this_cpu_read(pd->pqueue->cpu_index) == next_queue->cpu_index) { - padata = ERR_PTR(-ENODATA); - goto out; - } - - padata = ERR_PTR(-EINPROGRESS); -out: - return padata; -} - -static void padata_reorder(struct parallel_data *pd) -{ - int cb_cpu; - struct padata_priv *padata; - struct padata_serial_queue *squeue; - struct padata_instance *pinst = pd->pinst; - - /* - * We need to ensure that only one cpu can work on dequeueing of - * the reorder queue the time. Calculating in which percpu reorder - * queue the next object will arrive takes some time. A spinlock - * would be highly contended. Also it is not clear in which order - * the objects arrive to the reorder queues. So a cpu could wait to - * get the lock just to notice that there is nothing to do at the - * moment. Therefore we use a trylock and let the holder of the lock - * care for all the objects enqueued during the holdtime of the lock. - */ - if (!spin_trylock_bh(&pd->lock)) - return; - - while (1) { - padata = padata_get_next(pd); - - /* - * If the next object that needs serialization is parallel - * processed by another cpu and is still on it's way to the - * cpu's reorder queue, nothing to do for now. - */ - if (PTR_ERR(padata) == -EINPROGRESS) - break; - - /* - * This cpu has to do the parallel processing of the next - * object. It's waiting in the cpu's parallelization queue, - * so exit immediately. - */ - if (PTR_ERR(padata) == -ENODATA) { - del_timer(&pd->timer); - spin_unlock_bh(&pd->lock); - return; - } - - cb_cpu = padata->cb_cpu; - squeue = per_cpu_ptr(pd->squeue, cb_cpu); - - spin_lock(&squeue->serial.lock); - list_add_tail(&padata->list, &squeue->serial.list); - spin_unlock(&squeue->serial.lock); - - queue_work_on(cb_cpu, pinst->wq, &squeue->work); - } - - spin_unlock_bh(&pd->lock); - - /* - * The next object that needs serialization might have arrived to - * the reorder queues in the meantime, we will be called again - * from the timer function if no one else cares for it. - */ - if (atomic_read(&pd->reorder_objects) - && !(pinst->flags & PADATA_RESET)) - mod_timer(&pd->timer, jiffies + HZ); - else - del_timer(&pd->timer); - - return; -} - -static void padata_reorder_timer(unsigned long arg) -{ - struct parallel_data *pd = (struct parallel_data *)arg; - - padata_reorder(pd); -} - -static void padata_serial_worker(struct work_struct *serial_work) -{ - struct padata_serial_queue *squeue; - struct parallel_data *pd; - LIST_HEAD(local_list); - - local_bh_disable(); - squeue = container_of(serial_work, struct padata_serial_queue, work); - pd = squeue->pd; - - spin_lock(&squeue->serial.lock); - list_replace_init(&squeue->serial.list, &local_list); - spin_unlock(&squeue->serial.lock); - - while (!list_empty(&local_list)) { - struct padata_priv *padata; - - padata = list_entry(local_list.next, - struct padata_priv, list); - - list_del_init(&padata->list); - - padata->serial(padata); - atomic_dec(&pd->refcnt); - } - local_bh_enable(); -} - -/** - * padata_do_serial - padata serialization function - * - * @padata: object to be serialized. - * - * padata_do_serial must be called for every parallelized object. - * The serialization callback function will run with BHs off. - */ -void padata_do_serial(struct padata_priv *padata) -{ - int cpu; - struct padata_parallel_queue *pqueue; - struct parallel_data *pd; - - pd = padata->pd; - - cpu = get_cpu(); - pqueue = per_cpu_ptr(pd->pqueue, cpu); - - spin_lock(&pqueue->reorder.lock); - atomic_inc(&pd->reorder_objects); - list_add_tail(&padata->list, &pqueue->reorder.list); - spin_unlock(&pqueue->reorder.lock); - - put_cpu(); - - padata_reorder(pd); -} - -static int padata_setup_cpumasks(struct parallel_data *pd, - const struct cpumask *pcpumask, - const struct cpumask *cbcpumask) -{ - if (!alloc_cpumask_var(&pd->cpumask.pcpu, GFP_KERNEL)) - return -ENOMEM; - - cpumask_and(pd->cpumask.pcpu, pcpumask, cpu_online_mask); - if (!alloc_cpumask_var(&pd->cpumask.cbcpu, GFP_KERNEL)) { - free_cpumask_var(pd->cpumask.pcpu); - return -ENOMEM; - } - - cpumask_and(pd->cpumask.cbcpu, cbcpumask, cpu_online_mask); - return 0; -} - -static void __padata_list_init(struct padata_list *pd_list) -{ - INIT_LIST_HEAD(&pd_list->list); - spin_lock_init(&pd_list->lock); -} - -/* Initialize all percpu queues used by serial workers */ -static void padata_init_squeues(struct parallel_data *pd) -{ - int cpu; - struct padata_serial_queue *squeue; - - for_each_cpu(cpu, pd->cpumask.cbcpu) { - squeue = per_cpu_ptr(pd->squeue, cpu); - squeue->pd = pd; - __padata_list_init(&squeue->serial); - INIT_WORK(&squeue->work, padata_serial_worker); - } -} - -/* Initialize all percpu queues used by parallel workers */ -static void padata_init_pqueues(struct parallel_data *pd) -{ - int cpu_index, cpu; - struct padata_parallel_queue *pqueue; - - cpu_index = 0; - for_each_cpu(cpu, pd->cpumask.pcpu) { - pqueue = per_cpu_ptr(pd->pqueue, cpu); - pqueue->pd = pd; - pqueue->cpu_index = cpu_index; - cpu_index++; - - __padata_list_init(&pqueue->reorder); - __padata_list_init(&pqueue->parallel); - INIT_WORK(&pqueue->work, padata_parallel_worker); - atomic_set(&pqueue->num_obj, 0); - } -} - -/* Allocate and initialize the internal cpumask dependend resources. */ -static struct parallel_data *padata_alloc_pd(struct padata_instance *pinst, - const struct cpumask *pcpumask, - const struct cpumask *cbcpumask) -{ - struct parallel_data *pd; - - pd = kzalloc(sizeof(struct parallel_data), GFP_KERNEL); - if (!pd) - goto err; - - pd->pqueue = alloc_percpu(struct padata_parallel_queue); - if (!pd->pqueue) - goto err_free_pd; - - pd->squeue = alloc_percpu(struct padata_serial_queue); - if (!pd->squeue) - goto err_free_pqueue; - if (padata_setup_cpumasks(pd, pcpumask, cbcpumask) < 0) - goto err_free_squeue; - - padata_init_pqueues(pd); - padata_init_squeues(pd); - setup_timer(&pd->timer, padata_reorder_timer, (unsigned long)pd); -#if LINUX_VERSION_CODE < KERNEL_VERSION(3, 13, 0) - pd->seq_nr = 0; -#else -#ifdef CONFIG_PAX_REFCOUNT - atomic_set_unchecked(&pd->seq_nr, -1); -#else - atomic_set(&pd->seq_nr, -1); -#endif -#endif - atomic_set(&pd->reorder_objects, 0); - atomic_set(&pd->refcnt, 0); - pd->pinst = pinst; - spin_lock_init(&pd->lock); - - return pd; - -err_free_squeue: - free_percpu(pd->squeue); -err_free_pqueue: - free_percpu(pd->pqueue); -err_free_pd: - kfree(pd); -err: - return NULL; -} - -static void padata_free_pd(struct parallel_data *pd) -{ - free_cpumask_var(pd->cpumask.pcpu); - free_cpumask_var(pd->cpumask.cbcpu); - free_percpu(pd->pqueue); - free_percpu(pd->squeue); - kfree(pd); -} - -/* Flush all objects out of the padata queues. */ -static void padata_flush_queues(struct parallel_data *pd) -{ - int cpu; - struct padata_parallel_queue *pqueue; - struct padata_serial_queue *squeue; - - for_each_cpu(cpu, pd->cpumask.pcpu) { - pqueue = per_cpu_ptr(pd->pqueue, cpu); - flush_work(&pqueue->work); - } - - del_timer_sync(&pd->timer); - - if (atomic_read(&pd->reorder_objects)) - padata_reorder(pd); - - for_each_cpu(cpu, pd->cpumask.cbcpu) { - squeue = per_cpu_ptr(pd->squeue, cpu); - flush_work(&squeue->work); - } - - BUG_ON(atomic_read(&pd->refcnt) != 0); -} - -static void __padata_start(struct padata_instance *pinst) -{ - pinst->flags |= PADATA_INIT; -} - -static void __padata_stop(struct padata_instance *pinst) -{ - if (!(pinst->flags & PADATA_INIT)) - return; - - pinst->flags &= ~PADATA_INIT; - - synchronize_rcu(); - - get_online_cpus(); - padata_flush_queues(pinst->pd); - put_online_cpus(); -} - -/* Replace the internal control structure with a new one. */ -static void padata_replace(struct padata_instance *pinst, - struct parallel_data *pd_new) -{ - struct parallel_data *pd_old = pinst->pd; - int notification_mask = 0; - - pinst->flags |= PADATA_RESET; - - rcu_assign_pointer(pinst->pd, pd_new); - - synchronize_rcu(); - - if (!cpumask_equal(pd_old->cpumask.pcpu, pd_new->cpumask.pcpu)) - notification_mask |= PADATA_CPU_PARALLEL; - if (!cpumask_equal(pd_old->cpumask.cbcpu, pd_new->cpumask.cbcpu)) - notification_mask |= PADATA_CPU_SERIAL; - - padata_flush_queues(pd_old); - padata_free_pd(pd_old); - - if (notification_mask) - blocking_notifier_call_chain(&pinst->cpumask_change_notifier, - notification_mask, - &pd_new->cpumask); - - pinst->flags &= ~PADATA_RESET; -} - -/** - * padata_register_cpumask_notifier - Registers a notifier that will be called - * if either pcpu or cbcpu or both cpumasks change. - * - * @pinst: A poineter to padata instance - * @nblock: A pointer to notifier block. - */ -int padata_register_cpumask_notifier(struct padata_instance *pinst, - struct notifier_block *nblock) -{ - return blocking_notifier_chain_register(&pinst->cpumask_change_notifier, - nblock); -} - -/** - * padata_unregister_cpumask_notifier - Unregisters cpumask notifier - * registered earlier using padata_register_cpumask_notifier - * - * @pinst: A pointer to data instance. - * @nlock: A pointer to notifier block. - */ -int padata_unregister_cpumask_notifier(struct padata_instance *pinst, - struct notifier_block *nblock) -{ - return blocking_notifier_chain_unregister( - &pinst->cpumask_change_notifier, - nblock); -} - - -/* If cpumask contains no active cpu, we mark the instance as invalid. */ -static bool padata_validate_cpumask(struct padata_instance *pinst, - const struct cpumask *cpumask) -{ - if (!cpumask_intersects(cpumask, cpu_online_mask)) { - pinst->flags |= PADATA_INVALID; - return false; - } - - pinst->flags &= ~PADATA_INVALID; - return true; -} - -static int __padata_set_cpumasks(struct padata_instance *pinst, - cpumask_var_t pcpumask, - cpumask_var_t cbcpumask) -{ - int valid; - struct parallel_data *pd; - - valid = padata_validate_cpumask(pinst, pcpumask); - if (!valid) { - __padata_stop(pinst); - goto out_replace; - } - - valid = padata_validate_cpumask(pinst, cbcpumask); - if (!valid) - __padata_stop(pinst); - -out_replace: - pd = padata_alloc_pd(pinst, pcpumask, cbcpumask); - if (!pd) - return -ENOMEM; - - cpumask_copy(pinst->cpumask.pcpu, pcpumask); - cpumask_copy(pinst->cpumask.cbcpu, cbcpumask); - - padata_replace(pinst, pd); - - if (valid) - __padata_start(pinst); - - return 0; -} - -/** - * padata_set_cpumask: Sets specified by @cpumask_type cpumask to the value - * equivalent to @cpumask. - * - * @pinst: padata instance - * @cpumask_type: PADATA_CPU_SERIAL or PADATA_CPU_PARALLEL corresponding - * to parallel and serial cpumasks respectively. - * @cpumask: the cpumask to use - */ -int padata_set_cpumask(struct padata_instance *pinst, int cpumask_type, - cpumask_var_t cpumask) -{ - struct cpumask *serial_mask, *parallel_mask; - int err = -EINVAL; - - mutex_lock(&pinst->lock); - get_online_cpus(); - - switch (cpumask_type) { - case PADATA_CPU_PARALLEL: - serial_mask = pinst->cpumask.cbcpu; - parallel_mask = cpumask; - break; - case PADATA_CPU_SERIAL: - parallel_mask = pinst->cpumask.pcpu; - serial_mask = cpumask; - break; - default: - goto out; - } - - err = __padata_set_cpumasks(pinst, parallel_mask, serial_mask); - -out: - put_online_cpus(); - mutex_unlock(&pinst->lock); - - return err; -} - -/** - * padata_start - start the parallel processing - * - * @pinst: padata instance to start - */ -int padata_start(struct padata_instance *pinst) -{ - int err = 0; - - mutex_lock(&pinst->lock); - - if (pinst->flags & PADATA_INVALID) - err = -EINVAL; - - __padata_start(pinst); - - mutex_unlock(&pinst->lock); - - return err; -} - -/** - * padata_stop - stop the parallel processing - * - * @pinst: padata instance to stop - */ -void padata_stop(struct padata_instance *pinst) -{ - mutex_lock(&pinst->lock); - __padata_stop(pinst); - mutex_unlock(&pinst->lock); -} - -static void __padata_free(struct padata_instance *pinst) -{ - padata_stop(pinst); - padata_free_pd(pinst->pd); - free_cpumask_var(pinst->cpumask.pcpu); - free_cpumask_var(pinst->cpumask.cbcpu); - kfree(pinst); -} - -#define kobj2pinst(_kobj) \ - container_of(_kobj, struct padata_instance, kobj) -#define attr2pentry(_attr) \ - container_of(_attr, struct padata_sysfs_entry, attr) - -static void padata_sysfs_release(struct kobject *kobj) -{ - struct padata_instance *pinst = kobj2pinst(kobj); - __padata_free(pinst); -} - -struct padata_sysfs_entry { - struct attribute attr; - ssize_t (*show)(struct padata_instance *, struct attribute *, char *); - ssize_t (*store)(struct padata_instance *, struct attribute *, - const char *, size_t); -}; - -static ssize_t show_cpumask(struct padata_instance *pinst, - struct attribute *attr, char *buf) -{ - struct cpumask *cpumask; - ssize_t len; - - mutex_lock(&pinst->lock); - if (!strcmp(attr->name, "serial_cpumask")) - cpumask = pinst->cpumask.cbcpu; - else - cpumask = pinst->cpumask.pcpu; - - len = snprintf(buf, PAGE_SIZE, "%*pb\n", - nr_cpu_ids, cpumask_bits(cpumask)); - mutex_unlock(&pinst->lock); - return len < PAGE_SIZE ? len : -EINVAL; -} - -static ssize_t store_cpumask(struct padata_instance *pinst, - struct attribute *attr, - const char *buf, size_t count) -{ - cpumask_var_t new_cpumask; - ssize_t ret; - int mask_type; - - if (!alloc_cpumask_var(&new_cpumask, GFP_KERNEL)) - return -ENOMEM; - - ret = bitmap_parse(buf, count, cpumask_bits(new_cpumask), - nr_cpumask_bits); - if (ret < 0) - goto out; - - mask_type = !strcmp(attr->name, "serial_cpumask") ? - PADATA_CPU_SERIAL : PADATA_CPU_PARALLEL; - ret = padata_set_cpumask(pinst, mask_type, new_cpumask); - if (!ret) - ret = count; - -out: - free_cpumask_var(new_cpumask); - return ret; -} - -#define PADATA_ATTR_RW(_name, _show_name, _store_name) \ - static struct padata_sysfs_entry _name##_attr = \ - __ATTR(_name, 0644, _show_name, _store_name) -#define PADATA_ATTR_RO(_name, _show_name) \ - static struct padata_sysfs_entry _name##_attr = \ - __ATTR(_name, 0400, _show_name, NULL) - -PADATA_ATTR_RW(serial_cpumask, show_cpumask, store_cpumask); -PADATA_ATTR_RW(parallel_cpumask, show_cpumask, store_cpumask); - -/* - * Padata sysfs provides the following objects: - * serial_cpumask [RW] - cpumask for serial workers - * parallel_cpumask [RW] - cpumask for parallel workers - */ -static struct attribute *padata_default_attrs[] = { - &serial_cpumask_attr.attr, - ¶llel_cpumask_attr.attr, - NULL, -}; - -static ssize_t padata_sysfs_show(struct kobject *kobj, - struct attribute *attr, char *buf) -{ - struct padata_instance *pinst; - struct padata_sysfs_entry *pentry; - ssize_t ret = -EIO; - - pinst = kobj2pinst(kobj); - pentry = attr2pentry(attr); - if (pentry->show) - ret = pentry->show(pinst, attr, buf); - - return ret; -} - -static ssize_t padata_sysfs_store(struct kobject *kobj, struct attribute *attr, - const char *buf, size_t count) -{ - struct padata_instance *pinst; - struct padata_sysfs_entry *pentry; - ssize_t ret = -EIO; - - pinst = kobj2pinst(kobj); - pentry = attr2pentry(attr); - if (pentry->show) - ret = pentry->store(pinst, attr, buf, count); - - return ret; -} - -static const struct sysfs_ops padata_sysfs_ops = { - .show = padata_sysfs_show, - .store = padata_sysfs_store, -}; - -static struct kobj_type padata_attr_type = { - .sysfs_ops = &padata_sysfs_ops, - .default_attrs = padata_default_attrs, - .release = padata_sysfs_release, -}; - -/** - * padata_alloc - allocate and initialize a padata instance and specify - * cpumasks for serial and parallel workers. - * - * @wq: workqueue to use for the allocated padata instance - * @pcpumask: cpumask that will be used for padata parallelization - * @cbcpumask: cpumask that will be used for padata serialization - */ -struct padata_instance *padata_alloc(struct workqueue_struct *wq, - const struct cpumask *pcpumask, - const struct cpumask *cbcpumask) -{ - struct padata_instance *pinst; - struct parallel_data *pd = NULL; - - pinst = kzalloc(sizeof(struct padata_instance), GFP_KERNEL); - if (!pinst) - goto err; - - get_online_cpus(); - if (!alloc_cpumask_var(&pinst->cpumask.pcpu, GFP_KERNEL)) - goto err_free_inst; - if (!alloc_cpumask_var(&pinst->cpumask.cbcpu, GFP_KERNEL)) { - free_cpumask_var(pinst->cpumask.pcpu); - goto err_free_inst; - } - if (!padata_validate_cpumask(pinst, pcpumask) || - !padata_validate_cpumask(pinst, cbcpumask)) - goto err_free_masks; - - pd = padata_alloc_pd(pinst, pcpumask, cbcpumask); - if (!pd) - goto err_free_masks; - - rcu_assign_pointer(pinst->pd, pd); - - pinst->wq = wq; - - cpumask_copy(pinst->cpumask.pcpu, pcpumask); - cpumask_copy(pinst->cpumask.cbcpu, cbcpumask); - - pinst->flags = 0; - - put_online_cpus(); - - BLOCKING_INIT_NOTIFIER_HEAD(&pinst->cpumask_change_notifier); - kobject_init(&pinst->kobj, &padata_attr_type); - mutex_init(&pinst->lock); - - return pinst; - -err_free_masks: - free_cpumask_var(pinst->cpumask.pcpu); - free_cpumask_var(pinst->cpumask.cbcpu); -err_free_inst: - kfree(pinst); - put_online_cpus(); -err: - return NULL; -} - -/** - * padata_alloc_possible - Allocate and initialize padata instance. - * Use the cpu_possible_mask for serial and - * parallel workers. - * - * @wq: workqueue to use for the allocated padata instance - */ -struct padata_instance *padata_alloc_possible(struct workqueue_struct *wq) -{ - return padata_alloc(wq, cpu_possible_mask, cpu_possible_mask); -} - -/** - * padata_free - free a padata instance - * - * @padata_inst: padata instance to free - */ -void padata_free(struct padata_instance *pinst) -{ - kobject_put(&pinst->kobj); -} diff --git a/src/config.c b/src/config.c index 7ffc529..a964f27 100644 --- a/src/config.c +++ b/src/config.c @@ -114,7 +114,7 @@ static int set_peer(struct wireguard_device *wg, void __user *user_peer, size_t } if (wg->dev->flags & IFF_UP) - packet_send_queue(peer); + queue_work(wg->crypt_wq, &peer->init_queue.work); peer_put(peer); diff --git a/src/data.c b/src/data.c index fb91861..b5569c7 100644 --- a/src/data.c +++ b/src/data.c @@ -5,6 +5,8 @@ #include "peer.h" #include "messages.h" #include "packets.h" +#include "queue.h" +#include "timers.h" #include "hashtables.h" #include @@ -15,43 +17,42 @@ #include #include -struct encryption_ctx { - struct padata_priv padata; - struct sk_buff_head queue; - struct wireguard_peer *peer; - struct noise_keypair *keypair; -}; - -struct decryption_ctx { - struct padata_priv padata; - struct endpoint endpoint; - struct sk_buff *skb; - struct noise_keypair *keypair; -}; +static struct kmem_cache *crypt_ctx_cache __read_mostly; -#ifdef CONFIG_WIREGUARD_PARALLEL -static struct kmem_cache *encryption_ctx_cache __read_mostly; -static struct kmem_cache *decryption_ctx_cache __read_mostly; - -int __init packet_init_data_caches(void) +int __init init_crypt_cache(void) { - encryption_ctx_cache = KMEM_CACHE(encryption_ctx, 0); - if (!encryption_ctx_cache) - return -ENOMEM; - decryption_ctx_cache = KMEM_CACHE(decryption_ctx, 0); - if (!decryption_ctx_cache) { - kmem_cache_destroy(encryption_ctx_cache); + crypt_ctx_cache = KMEM_CACHE(crypt_ctx, 0); + if (!crypt_ctx_cache) return -ENOMEM; - } return 0; } -void packet_deinit_data_caches(void) +void deinit_crypt_cache(void) { - kmem_cache_destroy(encryption_ctx_cache); - kmem_cache_destroy(decryption_ctx_cache); + kmem_cache_destroy(crypt_ctx_cache); } -#endif + +static void drop_ctx(struct crypt_ctx *ctx, bool sending) +{ + if (ctx->keypair) + noise_keypair_put(ctx->keypair); + peer_put(ctx->peer); + if (sending) + skb_queue_purge(&ctx->packets); + else + dev_kfree_skb(ctx->skb); + kmem_cache_free(crypt_ctx_cache, ctx); +} + +#define drop_ctx_and_continue(ctx, sending) ({ \ + drop_ctx(ctx, sending); \ + continue; \ +}) + +#define drop_ctx_and_return(ctx, sending) ({ \ + drop_ctx(ctx, sending); \ + return; \ +}) /* This is RFC6479, a replay detection bitmap algorithm that avoids bitshifts */ static inline bool counter_validate(union noise_counter *counter, u64 their_counter) @@ -195,236 +196,243 @@ static inline bool skb_decrypt(struct sk_buff *skb, struct noise_symmetric_key * return !pskb_trim(skb, skb->len - noise_encrypted_len(0)); } -static inline bool get_encryption_nonce(u64 *nonce, struct noise_symmetric_key *key) +static inline bool packet_initialize_ctx(struct crypt_ctx *ctx) { - if (unlikely(!key)) - return false; - - if (unlikely(!key->is_valid || time_is_before_eq_jiffies64(key->birthdate + REJECT_AFTER_TIME))) { - key->is_valid = false; - return false; - } + struct noise_symmetric_key *key; + struct sk_buff *skb; - *nonce = atomic64_inc_return(&key->counter.counter) - 1; - if (*nonce >= REJECT_AFTER_MESSAGES) { - key->is_valid = false; + rcu_read_lock_bh(); + ctx->keypair = noise_keypair_get(rcu_dereference_bh(ctx->peer->keypairs.current_keypair)); + rcu_read_unlock_bh(); + if (unlikely(!ctx->keypair)) return false; + key = &ctx->keypair->sending; + if (unlikely(!key || !key->is_valid)) + goto out_nokey; + if (unlikely(time_is_before_eq_jiffies64(key->birthdate + REJECT_AFTER_TIME))) + goto out_invalid; + + skb_queue_walk(&ctx->packets, skb) { + PACKET_CB(skb)->nonce = atomic64_inc_return(&key->counter.counter) - 1; + if (unlikely(PACKET_CB(skb)->nonce >= REJECT_AFTER_MESSAGES)) + goto out_invalid; } return true; + +out_invalid: + key->is_valid = false; +out_nokey: + noise_keypair_put(ctx->keypair); + ctx->keypair = NULL; + return false; } -static inline void queue_encrypt_reset(struct sk_buff_head *queue, struct noise_keypair *keypair) +void packet_send_worker(struct work_struct *work) { + struct crypt_ctx *ctx; + struct crypt_queue *queue = container_of(work, struct crypt_queue, work); struct sk_buff *skb, *tmp; - bool have_simd = chacha20poly1305_init_simd(); - skb_queue_walk_safe (queue, skb, tmp) { - if (unlikely(!skb_encrypt(skb, keypair, have_simd))) { - __skb_unlink(skb, queue); - kfree_skb(skb); - continue; + struct wireguard_peer *peer = container_of(queue, struct wireguard_peer, send_queue); + bool data_sent = false; + + timers_any_authenticated_packet_traversal(peer); + while ((ctx = queue_first_peer(queue)) != NULL && atomic_read(&ctx->state) == CTX_FINISHED) { + queue_dequeue(queue); + skb_queue_walk_safe(&ctx->packets, skb, tmp) { + bool is_keepalive = skb->len == message_data_len(0); + if (likely(!socket_send_skb_to_peer(peer, skb, PACKET_CB(skb)->ds) && !is_keepalive)) + data_sent = true; } - skb_reset(skb); + noise_keypair_put(ctx->keypair); + peer_put(ctx->peer); + kmem_cache_free(crypt_ctx_cache, ctx); } - chacha20poly1305_deinit_simd(have_simd); - noise_keypair_put(keypair); + if (likely(data_sent)) + timers_data_sent(peer); + keep_key_fresh_send(peer); } -#ifdef CONFIG_WIREGUARD_PARALLEL -static void begin_parallel_encryption(struct padata_priv *padata) +void packet_encrypt_worker(struct work_struct *work) { - struct encryption_ctx *ctx = container_of(padata, struct encryption_ctx, padata); -#if IS_ENABLED(CONFIG_KERNEL_MODE_NEON) && defined(CONFIG_ARM) - local_bh_enable(); -#endif - queue_encrypt_reset(&ctx->queue, ctx->keypair); -#if IS_ENABLED(CONFIG_KERNEL_MODE_NEON) && defined(CONFIG_ARM) - local_bh_disable(); -#endif - padata_do_serial(padata); -} + struct crypt_ctx *ctx; + struct crypt_queue *queue = container_of(work, struct crypt_queue, work); + struct sk_buff *skb, *tmp; + struct wireguard_peer *peer; + bool have_simd = chacha20poly1305_init_simd(); -static void finish_parallel_encryption(struct padata_priv *padata) -{ - struct encryption_ctx *ctx = container_of(padata, struct encryption_ctx, padata); - packet_create_data_done(&ctx->queue, ctx->peer); - atomic_dec(&ctx->peer->parallel_encryption_inflight); - peer_put(ctx->peer); - kmem_cache_free(encryption_ctx_cache, ctx); + while ((ctx = queue_dequeue_shared(queue)) != NULL) { + skb_queue_walk_safe(&ctx->packets, skb, tmp) { + if (likely(skb_encrypt(skb, ctx->keypair, have_simd))) { + skb_reset(skb); + } else { + __skb_unlink(skb, &ctx->packets); + dev_kfree_skb(skb); + } + } + /* Dereferencing ctx is unsafe once ctx->state == CTX_FINISHED. */ + peer = peer_rcu_get(ctx->peer); + atomic_set(&ctx->state, CTX_FINISHED); + queue_work_on(peer->work_cpu, peer->device->crypt_wq, &peer->send_queue.work); + peer_put(peer); + } + chacha20poly1305_deinit_simd(have_simd); } -static inline unsigned int choose_cpu(__le32 key) +void packet_init_worker(struct work_struct *work) { - unsigned int cpu_index, cpu, cb_cpu; - - /* This ensures that packets encrypted to the same key are sent in-order. */ - cpu_index = ((__force unsigned int)key) % cpumask_weight(cpu_online_mask); - cb_cpu = cpumask_first(cpu_online_mask); - for (cpu = 0; cpu < cpu_index; ++cpu) - cb_cpu = cpumask_next(cb_cpu, cpu_online_mask); - - return cb_cpu; + struct crypt_ctx *ctx; + struct crypt_queue *queue = container_of(work, struct crypt_queue, work); + struct wireguard_peer *peer = container_of(queue, struct wireguard_peer, init_queue); + + spin_lock(&peer->init_queue_lock); + while ((ctx = queue_first_peer(queue)) != NULL) { + if (unlikely(!packet_initialize_ctx(ctx))) { + packet_queue_handshake_initiation(peer, false); + break; + } + queue_dequeue(queue); + if (unlikely(!queue_enqueue_peer(&peer->send_queue, ctx))) + drop_ctx_and_continue(ctx, true); + queue_enqueue_shared(peer->device->encrypt_queue, ctx, peer->device->crypt_wq, &peer->device->encrypt_cpu); + } + spin_unlock(&peer->init_queue_lock); } -#endif -int packet_create_data(struct sk_buff_head *queue, struct wireguard_peer *peer) +void packet_create_data(struct wireguard_peer *peer, struct sk_buff_head *packets) { - int ret = -ENOKEY; - struct noise_keypair *keypair; + struct crypt_ctx *ctx; struct sk_buff *skb; + struct wireguard_device *wg = peer->device; + bool need_handshake = false; - rcu_read_lock_bh(); - keypair = noise_keypair_get(rcu_dereference_bh(peer->keypairs.current_keypair)); - rcu_read_unlock_bh(); - if (unlikely(!keypair)) - return ret; - - skb_queue_walk (queue, skb) { - if (unlikely(!get_encryption_nonce(&PACKET_CB(skb)->nonce, &keypair->sending))) - goto err; - - /* After the first time through the loop, if we've suceeded with a legitimate nonce, - * then we don't want a -ENOKEY error if subsequent nonces fail. Rather, if this - * condition arises, we simply want error out hard, and drop the entire queue. This - * is partially lazy programming and TODO: this could be made to only requeue the - * ones that had no nonce. But I'm not sure it's worth the added complexity, given - * how rarely that condition should arise. */ - ret = -EPIPE; + ctx = kmem_cache_alloc(crypt_ctx_cache, GFP_ATOMIC); + if (unlikely(!ctx)) { + skb_queue_purge(packets); + return; } - -#ifdef CONFIG_WIREGUARD_PARALLEL - if ((skb_queue_len(queue) > 1 || queue->next->len > 256 || atomic_read(&peer->parallel_encryption_inflight) > 0) && cpumask_weight(cpu_online_mask) > 1) { - struct encryption_ctx *ctx = kmem_cache_alloc(encryption_ctx_cache, GFP_ATOMIC); - if (!ctx) - goto serial_encrypt; - skb_queue_head_init(&ctx->queue); - skb_queue_splice_init(queue, &ctx->queue); - memset(&ctx->padata, 0, sizeof(ctx->padata)); - ctx->padata.parallel = begin_parallel_encryption; - ctx->padata.serial = finish_parallel_encryption; - ctx->keypair = keypair; - ctx->peer = peer_rcu_get(peer); - ret = -EBUSY; - if (unlikely(!ctx->peer)) - goto err_parallel; - atomic_inc(&peer->parallel_encryption_inflight); - if (unlikely(padata_do_parallel(peer->device->encrypt_pd, &ctx->padata, choose_cpu(keypair->remote_index)))) { - atomic_dec(&peer->parallel_encryption_inflight); - peer_put(ctx->peer); -err_parallel: - skb_queue_splice(&ctx->queue, queue); - kmem_cache_free(encryption_ctx_cache, ctx); - goto err; + skb_queue_head_init(&ctx->packets); + skb_queue_splice_tail(packets, &ctx->packets); + ctx->peer = peer_rcu_get(peer); + ctx->keypair = NULL; + atomic_set(&ctx->state, CTX_NEW); + + /* If there are already packets on the init queue, these must go behind + * them to maintain the correct order, so we can only take the fast path + * when the queue is empty. */ + if (likely(queue_empty(&peer->init_queue))) { + if (likely(packet_initialize_ctx(ctx))) { + if (unlikely(!queue_enqueue_peer(&peer->send_queue, ctx))) + drop_ctx_and_return(ctx, true); + queue_enqueue_shared(wg->encrypt_queue, ctx, wg->crypt_wq, &wg->encrypt_cpu); + return; } - } else -serial_encrypt: -#endif - { - queue_encrypt_reset(queue, keypair); - packet_create_data_done(queue, peer); + /* Initialization failed, so we need a new keypair. */ + need_handshake = true; } - return 0; -err: - noise_keypair_put(keypair); - return ret; -} - -static void begin_decrypt_packet(struct decryption_ctx *ctx) -{ - if (unlikely(socket_endpoint_from_skb(&ctx->endpoint, ctx->skb) < 0 || !skb_decrypt(ctx->skb, &ctx->keypair->receiving))) { - peer_put(ctx->keypair->entry.peer); - noise_keypair_put(ctx->keypair); - dev_kfree_skb(ctx->skb); - ctx->skb = NULL; + /* Packets are kept around in the init queue as long as there is an + * ongoing handshake. Throw out the oldest packets instead of the new + * ones. If we cannot acquire the lock, packets are being dequeued on + * another thread. */ + if (unlikely(queue_full(&peer->init_queue)) && spin_trylock(&peer->init_queue_lock)) { + struct crypt_ctx *tmp = queue_dequeue_peer(&peer->init_queue); + if (likely(tmp)) + drop_ctx(tmp, true); + spin_unlock(&peer->init_queue_lock); } + skb_queue_walk(&ctx->packets, skb) + skb_orphan(skb); + if (unlikely(!queue_enqueue_peer(&peer->init_queue, ctx))) + drop_ctx_and_return(ctx, true); + if (need_handshake) + packet_queue_handshake_initiation(peer, false); + /* If we have a valid keypair, but took the slow path because init_queue + * had packets on it, init_queue.worker() may have finished + * processing the existing packets and returned since we checked if the + * init_queue was empty. Run the worker again if this is the only ctx + * remaining on the queue. */ + else if (unlikely(queue_first_peer(&peer->init_queue) == ctx)) + queue_work(peer->device->crypt_wq, &peer->init_queue.work); } -static void finish_decrypt_packet(struct decryption_ctx *ctx) +void packet_receive_worker(struct work_struct *work) { - bool used_new_key; - - if (!ctx->skb) - return; + struct crypt_ctx *ctx; + struct crypt_queue *queue = container_of(work, struct crypt_queue, work); + struct sk_buff *skb; - if (unlikely(!counter_validate(&ctx->keypair->receiving.counter, PACKET_CB(ctx->skb)->nonce))) { - net_dbg_ratelimited("%s: Packet has invalid nonce %Lu (max %Lu)\n", ctx->keypair->entry.peer->device->dev->name, PACKET_CB(ctx->skb)->nonce, ctx->keypair->receiving.counter.receive.counter); - peer_put(ctx->keypair->entry.peer); + while ((ctx = queue_first_peer(queue)) != NULL && atomic_read(&ctx->state) == CTX_FINISHED) { + queue_dequeue(queue); + if (likely(skb = ctx->skb)) { + if (unlikely(!counter_validate(&ctx->keypair->receiving.counter, PACKET_CB(skb)->nonce))) { + net_dbg_ratelimited("%s: Packet has invalid nonce %Lu (max %Lu)\n", ctx->peer->device->dev->name, PACKET_CB(ctx->skb)->nonce, ctx->keypair->receiving.counter.receive.counter); + dev_kfree_skb(skb); + } else { + skb_reset(skb); + packet_consume_data_done(skb, ctx->peer, &ctx->endpoint, noise_received_with_keypair(&ctx->peer->keypairs, ctx->keypair)); + } + } noise_keypair_put(ctx->keypair); - dev_kfree_skb(ctx->skb); - return; + peer_put(ctx->peer); + kmem_cache_free(crypt_ctx_cache, ctx); } - - used_new_key = noise_received_with_keypair(&ctx->keypair->entry.peer->keypairs, ctx->keypair); - skb_reset(ctx->skb); - packet_consume_data_done(ctx->skb, ctx->keypair->entry.peer, &ctx->endpoint, used_new_key); - noise_keypair_put(ctx->keypair); } -#ifdef CONFIG_WIREGUARD_PARALLEL -static void begin_parallel_decryption(struct padata_priv *padata) +void packet_decrypt_worker(struct work_struct *work) { - struct decryption_ctx *ctx = container_of(padata, struct decryption_ctx, padata); -#if IS_ENABLED(CONFIG_KERNEL_MODE_NEON) && defined(CONFIG_ARM) - local_bh_enable(); -#endif - begin_decrypt_packet(ctx); -#if IS_ENABLED(CONFIG_KERNEL_MODE_NEON) && defined(CONFIG_ARM) - local_bh_disable(); -#endif - padata_do_serial(padata); -} + struct crypt_ctx *ctx; + struct crypt_queue *queue = container_of(work, struct crypt_queue, work); + struct wireguard_peer *peer; -static void finish_parallel_decryption(struct padata_priv *padata) -{ - struct decryption_ctx *ctx = container_of(padata, struct decryption_ctx, padata); - finish_decrypt_packet(ctx); - kmem_cache_free(decryption_ctx_cache, ctx); + while ((ctx = queue_dequeue_shared(queue)) != NULL) { + if (unlikely(socket_endpoint_from_skb(&ctx->endpoint, ctx->skb) < 0 || !skb_decrypt(ctx->skb, &ctx->keypair->receiving))) { + dev_kfree_skb(ctx->skb); + ctx->skb = NULL; + } + /* Dereferencing ctx is unsafe once ctx->state == CTX_FINISHED. */ + peer = peer_rcu_get(ctx->peer); + atomic_set(&ctx->state, CTX_FINISHED); + queue_work_on(peer->work_cpu, peer->device->crypt_wq, &peer->receive_queue.work); + peer_put(peer); + } } -#endif void packet_consume_data(struct sk_buff *skb, struct wireguard_device *wg) { - struct noise_keypair *keypair; + struct crypt_ctx *ctx; __le32 idx = ((struct message_data *)skb->data)->key_idx; + ctx = kmem_cache_alloc(crypt_ctx_cache, GFP_ATOMIC); + if (unlikely(!ctx)) { + dev_kfree_skb(skb); + return; + } rcu_read_lock_bh(); - keypair = noise_keypair_get((struct noise_keypair *)index_hashtable_lookup(&wg->index_hashtable, INDEX_HASHTABLE_KEYPAIR, idx)); + ctx->keypair = noise_keypair_get((struct noise_keypair *)index_hashtable_lookup(&wg->index_hashtable, INDEX_HASHTABLE_KEYPAIR, idx)); rcu_read_unlock_bh(); - if (unlikely(!keypair)) - goto err; - -#ifdef CONFIG_WIREGUARD_PARALLEL - if (cpumask_weight(cpu_online_mask) > 1) { - struct decryption_ctx *ctx = kmem_cache_alloc(decryption_ctx_cache, GFP_ATOMIC); - if (unlikely(!ctx)) - goto err_peer; - ctx->skb = skb; - ctx->keypair = keypair; - memset(&ctx->padata, 0, sizeof(ctx->padata)); - ctx->padata.parallel = begin_parallel_decryption; - ctx->padata.serial = finish_parallel_decryption; - if (unlikely(padata_do_parallel(wg->decrypt_pd, &ctx->padata, choose_cpu(idx)))) { - kmem_cache_free(decryption_ctx_cache, ctx); - goto err_peer; - } - } else -#endif - { - struct decryption_ctx ctx = { - .skb = skb, - .keypair = keypair - }; - begin_decrypt_packet(&ctx); - finish_decrypt_packet(&ctx); + if (unlikely(!ctx->keypair)) { + kmem_cache_free(crypt_ctx_cache, ctx); + dev_kfree_skb(skb); + return; } - return; + ctx->skb = skb; + /* index_hashtable_lookup() already gets a reference to peer. */ + ctx->peer = ctx->keypair->entry.peer; + atomic_set(&ctx->state, CTX_NEW); + + if (unlikely(!queue_enqueue_peer(&ctx->peer->receive_queue, ctx))) + drop_ctx_and_return(ctx, false); + queue_enqueue_shared(wg->decrypt_queue, ctx, wg->crypt_wq, &wg->decrypt_cpu); +} -#ifdef CONFIG_WIREGUARD_PARALLEL -err_peer: - peer_put(keypair->entry.peer); - noise_keypair_put(keypair); -#endif -err: - dev_kfree_skb(skb); +void peer_purge_queues(struct wireguard_peer *peer) +{ + struct crypt_ctx *ctx; + + if (!spin_trylock(&peer->init_queue_lock)) + return; + while ((ctx = queue_dequeue_peer(&peer->init_queue)) != NULL) + drop_ctx(ctx, true); + spin_unlock(&peer->init_queue_lock); } diff --git a/src/device.c b/src/device.c index 2514822..d72736e 100644 --- a/src/device.c +++ b/src/device.c @@ -57,7 +57,7 @@ static int open(struct net_device *dev) return ret; peer_for_each (wg, peer, temp, true) { timers_init_peer(peer); - packet_send_queue(peer); + queue_work(wg->crypt_wq, &peer->init_queue.work); if (peer->persistent_keepalive_interval) packet_send_keepalive(peer); } @@ -111,6 +111,7 @@ static netdev_tx_t xmit(struct sk_buff *skb, struct net_device *dev) struct wireguard_device *wg = netdev_priv(dev); struct wireguard_peer *peer; struct sk_buff *next; + struct sk_buff_head queue; int ret; if (unlikely(dev_recursion_level() > 4)) { @@ -141,11 +142,7 @@ static netdev_tx_t xmit(struct sk_buff *skb, struct net_device *dev) goto err_peer; } - /* If the queue is getting too big, we start removing the oldest packets until it's small again. - * We do this before adding the new packet, so we don't remove GSO segments that are in excess. */ - while (skb_queue_len(&peer->tx_packet_queue) > MAX_QUEUED_OUTGOING_PACKETS) - dev_kfree_skb(skb_dequeue(&peer->tx_packet_queue)); - + __skb_queue_head_init(&queue); if (!skb_is_gso(skb)) skb->next = NULL; else { @@ -169,10 +166,11 @@ static netdev_tx_t xmit(struct sk_buff *skb, struct net_device *dev) * so at this point we're in a position to drop it. */ skb_dst_drop(skb); - skb_queue_tail(&peer->tx_packet_queue, skb); + __skb_queue_tail(&queue, skb); } while ((skb = next) != NULL); - packet_send_queue(peer); + packet_create_data(peer, &queue); + peer_put(peer); return NETDEV_TX_OK; @@ -224,11 +222,9 @@ static void destruct(struct net_device *dev) wg->incoming_port = 0; destroy_workqueue(wg->incoming_handshake_wq); destroy_workqueue(wg->peer_wq); -#ifdef CONFIG_WIREGUARD_PARALLEL - padata_free(wg->encrypt_pd); - padata_free(wg->decrypt_pd); + free_percpu(wg->decrypt_queue); + free_percpu(wg->encrypt_queue); destroy_workqueue(wg->crypt_wq); -#endif routing_table_free(&wg->peer_routing_table); ratelimiter_uninit(); memzero_explicit(&wg->static_identity, sizeof(struct noise_static_identity)); @@ -310,21 +306,25 @@ static int newlink(struct net *src_net, struct net_device *dev, struct nlattr *t if (!wg->peer_wq) goto error_4; -#ifdef CONFIG_WIREGUARD_PARALLEL wg->crypt_wq = alloc_workqueue("wg-crypt-%s", WQ_CPU_INTENSIVE | WQ_MEM_RECLAIM, 2, dev->name); if (!wg->crypt_wq) goto error_5; - wg->encrypt_pd = padata_alloc_possible(wg->crypt_wq); - if (!wg->encrypt_pd) + wg->encrypt_queue = alloc_percpu(struct crypt_queue); + if (!wg->encrypt_queue) goto error_6; - padata_start(wg->encrypt_pd); + for_each_possible_cpu (cpu) { + INIT_LIST_HEAD(&per_cpu_ptr(wg->encrypt_queue, cpu)->list); + INIT_WORK(&per_cpu_ptr(wg->encrypt_queue, cpu)->work, packet_encrypt_worker); + } - wg->decrypt_pd = padata_alloc_possible(wg->crypt_wq); - if (!wg->decrypt_pd) + wg->decrypt_queue = alloc_percpu(struct crypt_queue); + if (!wg->decrypt_queue) goto error_7; - padata_start(wg->decrypt_pd); -#endif + for_each_possible_cpu (cpu) { + INIT_LIST_HEAD(&per_cpu_ptr(wg->decrypt_queue, cpu)->list); + INIT_WORK(&per_cpu_ptr(wg->decrypt_queue, cpu)->work, packet_decrypt_worker); + } ret = ratelimiter_init(); if (ret < 0) @@ -346,14 +346,12 @@ static int newlink(struct net *src_net, struct net_device *dev, struct nlattr *t error_9: ratelimiter_uninit(); error_8: -#ifdef CONFIG_WIREGUARD_PARALLEL - padata_free(wg->decrypt_pd); + free_percpu(wg->decrypt_queue); error_7: - padata_free(wg->encrypt_pd); + free_percpu(wg->encrypt_queue); error_6: destroy_workqueue(wg->crypt_wq); error_5: -#endif destroy_workqueue(wg->peer_wq); error_4: destroy_workqueue(wg->incoming_handshake_wq); diff --git a/src/device.h b/src/device.h index 77f1b2e..1a75261 100644 --- a/src/device.h +++ b/src/device.h @@ -13,7 +13,6 @@ #include #include #include -#include struct wireguard_device; struct handshake_worker { @@ -21,6 +20,12 @@ struct handshake_worker { struct work_struct work; }; +struct crypt_queue { + struct list_head list; + struct work_struct work; + atomic_t qlen; +}; + struct wireguard_device { struct net_device *dev; struct list_head device_list; @@ -40,10 +45,9 @@ struct wireguard_device { struct list_head peer_list; struct mutex device_update_lock; struct mutex socket_update_lock; -#ifdef CONFIG_WIREGUARD_PARALLEL struct workqueue_struct *crypt_wq; - struct padata_instance *encrypt_pd, *decrypt_pd; -#endif + int encrypt_cpu, decrypt_cpu; + struct crypt_queue __percpu *encrypt_queue, *decrypt_queue; }; int device_init(void); diff --git a/src/main.c b/src/main.c index 0697741..9ed60d0 100644 --- a/src/main.c +++ b/src/main.c @@ -27,11 +27,9 @@ static int __init mod_init(void) #endif noise_init(); -#ifdef CONFIG_WIREGUARD_PARALLEL - ret = packet_init_data_caches(); + ret = init_crypt_cache(); if (ret < 0) goto err_packet; -#endif ret = device_init(); if (ret < 0) @@ -43,19 +41,15 @@ static int __init mod_init(void) return 0; err_device: -#ifdef CONFIG_WIREGUARD_PARALLEL - packet_deinit_data_caches(); + deinit_crypt_cache(); err_packet: -#endif return ret; } static void __exit mod_exit(void) { device_uninit(); -#ifdef CONFIG_WIREGUARD_PARALLEL - packet_deinit_data_caches(); -#endif + deinit_crypt_cache(); pr_debug("WireGuard unloaded\n"); } diff --git a/src/packets.h b/src/packets.h index c956c7a..6601ef6 100644 --- a/src/packets.h +++ b/src/packets.h @@ -8,7 +8,6 @@ #include "socket.h" #include -#include #include #include #include @@ -27,19 +26,26 @@ struct packet_cb { void packet_receive(struct wireguard_device *wg, struct sk_buff *skb); void packet_process_queued_handshake_packets(struct work_struct *work); void packet_consume_data_done(struct sk_buff *skb, struct wireguard_peer *peer, struct endpoint *endpoint, bool used_new_key); +void packet_receive_worker(struct work_struct *work); +void packet_decrypt_worker(struct work_struct *work); +void packet_consume_data(struct sk_buff *skb, struct wireguard_device *wg); /* send.c */ -void packet_send_queue(struct wireguard_peer *peer); +void keep_key_fresh_send(struct wireguard_peer *peer); void packet_send_keepalive(struct wireguard_peer *peer); void packet_queue_handshake_initiation(struct wireguard_peer *peer, bool is_retry); void packet_send_queued_handshakes(struct work_struct *work); void packet_send_handshake_response(struct wireguard_peer *peer); void packet_send_handshake_cookie(struct wireguard_device *wg, struct sk_buff *initiating_skb, __le32 sender_index); -void packet_create_data_done(struct sk_buff_head *queue, struct wireguard_peer *peer); +void packet_send_worker(struct work_struct *work); +void packet_encrypt_worker(struct work_struct *work); +void packet_init_worker(struct work_struct *work); +void packet_create_data(struct wireguard_peer *peer, struct sk_buff_head *packets); /* data.c */ -int packet_create_data(struct sk_buff_head *queue, struct wireguard_peer *peer); -void packet_consume_data(struct sk_buff *skb, struct wireguard_device *wg); +int init_crypt_cache(void); +void deinit_crypt_cache(void); +void peer_purge_queues(struct wireguard_peer *peer); /* Returns either the correct skb->protocol value, or 0 if invalid. */ static inline __be16 skb_examine_untrusted_ip_hdr(struct sk_buff *skb) @@ -51,11 +57,6 @@ static inline __be16 skb_examine_untrusted_ip_hdr(struct sk_buff *skb) return 0; } -#ifdef CONFIG_WIREGUARD_PARALLEL -int packet_init_data_caches(void); -void packet_deinit_data_caches(void); -#endif - #ifdef DEBUG bool packet_counter_selftest(void); #endif diff --git a/src/peer.c b/src/peer.c index f539d99..20fd6cb 100644 --- a/src/peer.c +++ b/src/peer.c @@ -14,6 +14,18 @@ static atomic64_t peer_counter = ATOMIC64_INIT(0); +static int choose_cpu(u64 id) +{ + unsigned int cpu, cpu_index, i; + + cpu_index = id % cpumask_weight(cpu_online_mask); + cpu = cpumask_first(cpu_online_mask); + for (i = 0; i < cpu_index; i += 1) + cpu = cpumask_next(cpu, cpu_online_mask); + + return cpu; +} + struct wireguard_peer *peer_create(struct wireguard_device *wg, const u8 public_key[NOISE_PUBLIC_KEY_LEN], const u8 preshared_key[NOISE_SYMMETRIC_KEY_LEN]) { struct wireguard_peer *peer; @@ -42,13 +54,17 @@ struct wireguard_peer *peer_create(struct wireguard_device *wg, const u8 public_ mutex_init(&peer->keypairs.keypair_update_lock); INIT_WORK(&peer->transmit_handshake_work, packet_send_queued_handshakes); rwlock_init(&peer->endpoint_lock); - skb_queue_head_init(&peer->tx_packet_queue); kref_init(&peer->refcount); pubkey_hashtable_add(&wg->peer_hashtable, peer); list_add_tail(&peer->peer_list, &wg->peer_list); -#ifdef CONFIG_WIREGUARD_PARALLEL - atomic_set(&peer->parallel_encryption_inflight, 0); -#endif + peer->work_cpu = choose_cpu(peer->internal_id); + INIT_LIST_HEAD(&peer->init_queue.list); + INIT_WORK(&peer->init_queue.work, packet_init_worker); + INIT_LIST_HEAD(&peer->send_queue.list); + INIT_WORK(&peer->send_queue.work, packet_send_worker); + INIT_LIST_HEAD(&peer->receive_queue.list); + INIT_WORK(&peer->receive_queue.work, packet_receive_worker); + spin_lock_init(&peer->init_queue_lock); pr_debug("%s: Peer %Lu created\n", wg->dev->name, peer->internal_id); return peer; } @@ -83,9 +99,10 @@ void peer_remove(struct wireguard_peer *peer) timers_uninit_peer(peer); routing_table_remove_by_peer(&peer->device->peer_routing_table, peer); pubkey_hashtable_remove(&peer->device->peer_hashtable, peer); + flush_workqueue(peer->device->crypt_wq); if (peer->device->peer_wq) flush_workqueue(peer->device->peer_wq); - skb_queue_purge(&peer->tx_packet_queue); + peer_purge_queues(peer); peer_put(peer); } @@ -93,7 +110,6 @@ static void rcu_release(struct rcu_head *rcu) { struct wireguard_peer *peer = container_of(rcu, struct wireguard_peer, rcu); pr_debug("%s: Peer %Lu (%pISpfsc) destroyed\n", peer->device->dev->name, peer->internal_id, &peer->endpoint.addr); - skb_queue_purge(&peer->tx_packet_queue); dst_cache_destroy(&peer->endpoint_cache); kzfree(peer); } diff --git a/src/peer.h b/src/peer.h index c10406b..90a394f 100644 --- a/src/peer.h +++ b/src/peer.h @@ -3,6 +3,7 @@ #ifndef PEER_H #define PEER_H +#include "device.h" #include "noise.h" #include "cookie.h" @@ -46,17 +47,15 @@ struct wireguard_peer { unsigned long persistent_keepalive_interval; bool timers_enabled; bool timer_need_another_keepalive; - bool need_resend_queue; bool sent_lastminute_handshake; struct timeval walltime_last_handshake; - struct sk_buff_head tx_packet_queue; struct kref refcount; struct rcu_head rcu; struct list_head peer_list; u64 internal_id; -#ifdef CONFIG_WIREGUARD_PARALLEL - atomic_t parallel_encryption_inflight; -#endif + int work_cpu; + struct crypt_queue init_queue, send_queue, receive_queue; + spinlock_t init_queue_lock; }; struct wireguard_peer *peer_create(struct wireguard_device *wg, const u8 public_key[NOISE_PUBLIC_KEY_LEN], const u8 preshared_key[NOISE_SYMMETRIC_KEY_LEN]); diff --git a/src/queue.h b/src/queue.h new file mode 100644 index 0000000..537bacf --- /dev/null +++ b/src/queue.h @@ -0,0 +1,139 @@ +/* Copyright (C) 2017 Samuel Holland . All Rights Reserved. */ + +#ifndef WGQUEUE_H +#define WGQUEUE_H + +#include +#include + +#include "device.h" +#include "peer.h" + +#define QUEUE_MAX_LEN 1000 + +enum { + CTX_NEW, + CTX_FINISHED, + CTX_FREEING, +}; + +struct crypt_ctx { + struct list_head peer_list; + struct list_head shared_list; + union { + struct sk_buff_head packets; + struct sk_buff *skb; + }; + struct wireguard_peer *peer; + struct noise_keypair *keypair; + struct endpoint endpoint; + atomic_t state; +}; + +static inline int next_cpu(int *next) +{ + int cpu = *next; + + if (cpu >= nr_cpumask_bits || !cpumask_test_cpu(cpu, cpu_online_mask)) + cpu = cpumask_first(cpu_online_mask); + *next = cpumask_next(cpu, cpu_online_mask); + return cpu; +} + +/** + * __queue_dequeue - Atomically remove the first item in a queue. + * + * @return The address of the dequeued item, or NULL if the queue is empty. + * + * This function is safe to execute concurrently with any number of + * __queue_enqueue() calls, but *not* with another __queue_dequeue() call + * operating on the same queue. + */ +static inline struct list_head *__queue_dequeue(struct list_head *queue) +{ + struct list_head *first, *second; + + first = READ_ONCE(queue->next); + if (first == queue) + return NULL; + do { + second = READ_ONCE(first->next); + WRITE_ONCE(queue->next, second); + } while (cmpxchg(&second->prev, first, queue) != first); + return first; +} + +static inline struct list_head *queue_dequeue(struct crypt_queue *queue) +{ + struct list_head *head = __queue_dequeue(&queue->list); + if (head) + atomic_dec(&queue->qlen); + return head; +} + +#define queue_dequeue_peer(queue) ({ \ + struct list_head *__head = queue_dequeue(queue); \ + __head ? list_entry(__head, struct crypt_ctx, peer_list) : NULL; \ +}) + +#define queue_dequeue_shared(queue) ({ \ + struct list_head *__head = queue_dequeue(queue); \ + __head ? list_entry(__head, struct crypt_ctx, shared_list) : NULL; \ +}) + +#define queue_empty(queue) \ + list_empty(&(queue)->list) + +/** + * __queue_enqueue - Atomically append an item to the tail of a queue. + * + * This function is safe to execute concurrently with any number of other + * __queue_enqueue() calls, as well as with one __queue_dequeue() call + * operating on the same queue. + */ +static inline void __queue_enqueue(struct list_head *queue, + struct list_head *head) +{ + struct list_head *last; + + WRITE_ONCE(head->next, queue); + do { + last = READ_ONCE(queue->prev); + WRITE_ONCE(head->prev, last); + } while (cmpxchg(&queue->prev, last, head) != last); + WRITE_ONCE(last->next, head); +} + +static inline bool queue_enqueue(struct crypt_queue *queue, + struct list_head *head, + int limit) +{ + bool have_space = !limit || atomic_inc_return(&queue->qlen) <= limit; + if (have_space) + __queue_enqueue(&queue->list, head); + else + atomic_dec(&queue->qlen); + return have_space; +} + +#define queue_enqueue_peer(queue, ctx) \ + queue_enqueue(queue, &(ctx)->peer_list, QUEUE_MAX_LEN) + +#define queue_enqueue_shared(queue, ctx, wq, cpu) ({ \ + int __cpu = next_cpu(cpu); \ + struct crypt_queue *__queue = per_cpu_ptr(queue, __cpu); \ + queue_enqueue(__queue, &(ctx)->shared_list, 0); \ + queue_work_on(__cpu, wq, &__queue->work); \ + true; \ +}) + +#define queue_first_peer(queue) \ + list_first_entry_or_null(&(queue)->list, struct crypt_ctx, peer_list) + +#define queue_first_shared(queue) \ + list_first_entry_or_null(&(queue)->list, struct crypt_ctx, shared_list) + +#define queue_full(queue) \ + (atomic_read(&(queue)->qlen) == QUEUE_MAX_LEN) + +#endif /* WGQUEUE_H */ diff --git a/src/receive.c b/src/receive.c index 08b2fe2..1ebbe89 100644 --- a/src/receive.c +++ b/src/receive.c @@ -187,7 +187,7 @@ void packet_consume_data_done(struct sk_buff *skb, struct wireguard_peer *peer, if (unlikely(used_new_key)) { timers_handshake_complete(peer); - packet_send_queue(peer); + queue_work(peer->device->crypt_wq, &peer->init_queue.work); } keep_key_fresh(peer); @@ -234,7 +234,7 @@ void packet_consume_data_done(struct sk_buff *skb, struct wireguard_peer *peer, goto dishonest_packet_peer; len = skb->len; - if (likely(netif_rx(skb) == NET_RX_SUCCESS)) + if (likely(netif_rx_ni(skb) == NET_RX_SUCCESS)) rx_stats(peer, len); else { ++dev->stats.rx_dropped; @@ -262,7 +262,6 @@ packet_processed: continue_processing: timers_any_authenticated_packet_received(peer); timers_any_authenticated_packet_traversal(peer); - peer_put(peer); } void packet_receive(struct wireguard_device *wg, struct sk_buff *skb) diff --git a/src/send.c b/src/send.c index 3a3e544..1dc42e4 100644 --- a/src/send.c +++ b/src/send.c @@ -4,6 +4,7 @@ #include "timers.h" #include "device.h" #include "peer.h" +#include "queue.h" #include "socket.h" #include "messages.h" #include "cookie.h" @@ -89,7 +90,7 @@ void packet_send_handshake_cookie(struct wireguard_device *wg, struct sk_buff *i socket_send_buffer_as_reply_to_skb(wg, initiating_skb, &packet, sizeof(packet)); } -static inline void keep_key_fresh(struct wireguard_peer *peer) +void keep_key_fresh_send(struct wireguard_peer *peer) { struct noise_keypair *keypair; bool send = false; @@ -109,95 +110,20 @@ static inline void keep_key_fresh(struct wireguard_peer *peer) void packet_send_keepalive(struct wireguard_peer *peer) { struct sk_buff *skb; - if (skb_queue_empty(&peer->tx_packet_queue)) { + struct sk_buff_head queue; + + if (queue_empty(&peer->init_queue)) { skb = alloc_skb(DATA_PACKET_HEAD_ROOM + MESSAGE_MINIMUM_LENGTH, GFP_ATOMIC); if (unlikely(!skb)) return; skb_reserve(skb, DATA_PACKET_HEAD_ROOM); skb->dev = peer->device->dev; - skb_queue_tail(&peer->tx_packet_queue, skb); + __skb_queue_head_init(&queue); + __skb_queue_tail(&queue, skb); + packet_create_data(peer, &queue); net_dbg_ratelimited("%s: Sending keepalive packet to peer %Lu (%pISpfsc)\n", peer->device->dev->name, peer->internal_id, &peer->endpoint.addr); - } - packet_send_queue(peer); -} - -void packet_create_data_done(struct sk_buff_head *queue, struct wireguard_peer *peer) -{ - struct sk_buff *skb, *tmp; - bool is_keepalive, data_sent = false; - - if (unlikely(skb_queue_empty(queue))) - return; - - timers_any_authenticated_packet_traversal(peer); - skb_queue_walk_safe (queue, skb, tmp) { - is_keepalive = skb->len == message_data_len(0); - if (likely(!socket_send_skb_to_peer(peer, skb, PACKET_CB(skb)->ds) && !is_keepalive)) - data_sent = true; - } - if (likely(data_sent)) - timers_data_sent(peer); - - keep_key_fresh(peer); - - if (unlikely(peer->need_resend_queue)) - packet_send_queue(peer); -} - -void packet_send_queue(struct wireguard_peer *peer) -{ - struct sk_buff_head queue; - struct sk_buff *skb; - - peer->need_resend_queue = false; - - /* Steal the current queue into our local one. */ - skb_queue_head_init(&queue); - spin_lock_bh(&peer->tx_packet_queue.lock); - skb_queue_splice_init(&peer->tx_packet_queue, &queue); - spin_unlock_bh(&peer->tx_packet_queue.lock); - - if (unlikely(skb_queue_empty(&queue))) - return; - - /* We submit it for encryption and sending. */ - switch (packet_create_data(&queue, peer)) { - case 0: - break; - case -EBUSY: - /* EBUSY happens when the parallel workers are all filled up, in which - * case we should requeue everything. */ - - /* First, we mark that we should try to do this later, when existing - * jobs are done. */ - peer->need_resend_queue = true; - - /* We stick the remaining skbs from local_queue at the top of the peer's - * queue again, setting the top of local_queue to be the skb that begins - * the requeueing. */ - spin_lock_bh(&peer->tx_packet_queue.lock); - skb_queue_splice(&queue, &peer->tx_packet_queue); - spin_unlock_bh(&peer->tx_packet_queue.lock); - break; - case -ENOKEY: - /* ENOKEY means that we don't have a valid session for the peer, which - * means we should initiate a session, but after requeuing like above. - * Since we'll be queuing these up for potentially a little while, we - * first make sure they're no longer using up a socket's write buffer. */ - - skb_queue_walk (&queue, skb) - skb_orphan(skb); - - spin_lock_bh(&peer->tx_packet_queue.lock); - skb_queue_splice(&queue, &peer->tx_packet_queue); - spin_unlock_bh(&peer->tx_packet_queue.lock); - - packet_queue_handshake_initiation(peer, false); - break; - default: - /* If we failed for any other reason, we want to just free the packets and - * forget about them. We do this unlocked, since we're the only ones with - * a reference to the local queue. */ - __skb_queue_purge(&queue); + } else { + /* There are packets pending which need to be initialized with the new keypair. */ + queue_work(peer->device->crypt_wq, &peer->init_queue.work); } } diff --git a/src/tests/qemu/kernel.config b/src/tests/qemu/kernel.config index 84a9875..e6016d0 100644 --- a/src/tests/qemu/kernel.config +++ b/src/tests/qemu/kernel.config @@ -71,5 +71,4 @@ CONFIG_BLK_DEV_INITRD=y CONFIG_LEGACY_VSYSCALL_NONE=y CONFIG_KERNEL_GZIP=y CONFIG_WIREGUARD=y -CONFIG_WIREGUARD_PARALLEL=y CONFIG_WIREGUARD_DEBUG=y diff --git a/src/timers.c b/src/timers.c index 9712c9e..b507aa3 100644 --- a/src/timers.c +++ b/src/timers.c @@ -31,9 +31,9 @@ static void expired_retransmit_handshake(unsigned long ptr) pr_debug("%s: Handshake for peer %Lu (%pISpfsc) did not complete after %d attempts, giving up\n", peer->device->dev->name, peer->internal_id, &peer->endpoint.addr, MAX_TIMER_HANDSHAKES + 2); del_timer(&peer->timer_send_keepalive); - /* We remove all existing packets and don't try again, + /* We drop all packets without a keypair and don't try again, * if we try unsuccessfully for too long to make a handshake. */ - skb_queue_purge(&peer->tx_packet_queue); + peer_purge_queues(peer); /* We set a timer for destroying any residue that might be left * of a partial exchange. */ if (likely(peer->timers_enabled) && !timer_pending(&peer->timer_zero_key_material)) -- cgit v1.2.3-59-g8ed1b