From 06eb9a6b9abd04481f2a3d561282453b9a1720c8 Mon Sep 17 00:00:00 2001 From: "Jason A. Donenfeld" Date: Wed, 22 Apr 2020 14:43:48 -0600 Subject: compat: import latest fixes for ptr_ring Signed-off-by: Jason A. Donenfeld --- src/compat/ptr_ring/include/linux/ptr_ring.h | 106 ++++++++++++++++++--------- 1 file changed, 70 insertions(+), 36 deletions(-) diff --git a/src/compat/ptr_ring/include/linux/ptr_ring.h b/src/compat/ptr_ring/include/linux/ptr_ring.h index 37b4bb2..417db0a 100644 --- a/src/compat/ptr_ring/include/linux/ptr_ring.h +++ b/src/compat/ptr_ring/include/linux/ptr_ring.h @@ -1,3 +1,4 @@ +/* SPDX-License-Identifier: GPL-2.0-or-later */ /* * Definitions for the 'struct ptr_ring' datastructure. * @@ -6,11 +7,6 @@ * * Copyright (C) 2016 Red Hat, Inc. * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. - * * This is a limited-size FIFO maintaining pointers in FIFO order, with * one CPU producing entries and another consuming entries from a FIFO. * @@ -26,8 +22,8 @@ #include #include #include -#include #include +#include #include #endif @@ -45,9 +41,10 @@ struct ptr_ring { }; /* Note: callers invoking this in a loop must use a compiler barrier, - * for example cpu_relax(). If ring is ever resized, callers must hold - * producer_lock - see e.g. ptr_ring_full. Otherwise, if callers don't hold - * producer_lock, the next call to __ptr_ring_produce may fail. + * for example cpu_relax(). + * + * NB: this is unlike __ptr_ring_empty in that callers must hold producer_lock: + * see e.g. ptr_ring_full. */ static inline bool __ptr_ring_full(struct ptr_ring *r) { @@ -101,13 +98,19 @@ static inline bool ptr_ring_full_bh(struct ptr_ring *r) /* Note: callers invoking this in a loop must use a compiler barrier, * for example cpu_relax(). Callers must hold producer_lock. + * Callers are responsible for making sure pointer that is being queued + * points to a valid data. */ static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr) { if (unlikely(!r->size) || r->queue[r->producer]) return -ENOSPC; - r->queue[r->producer++] = ptr; + /* Make sure the pointer we are storing points to a valid data. */ + /* Pairs with smp_read_barrier_depends in __ptr_ring_consume. */ + smp_wmb(); + + WRITE_ONCE(r->queue[r->producer++], ptr); if (unlikely(r->producer >= r->size)) r->producer = 0; return 0; @@ -163,26 +166,36 @@ static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr) return ret; } -/* Note: callers invoking this in a loop must use a compiler barrier, - * for example cpu_relax(). Callers must take consumer_lock - * if they dereference the pointer - see e.g. PTR_RING_PEEK_CALL. - * If ring is never resized, and if the pointer is merely - * tested, there's no need to take the lock - see e.g. __ptr_ring_empty. - */ static inline void *__ptr_ring_peek(struct ptr_ring *r) { if (likely(r->size)) - return r->queue[r->consumer_head]; + return READ_ONCE(r->queue[r->consumer_head]); return NULL; } -/* Note: callers invoking this in a loop must use a compiler barrier, - * for example cpu_relax(). Callers must take consumer_lock - * if the ring is ever resized - see e.g. ptr_ring_empty. +/* + * Test ring empty status without taking any locks. + * + * NB: This is only safe to call if ring is never resized. + * + * However, if some other CPU consumes ring entries at the same time, the value + * returned is not guaranteed to be correct. + * + * In this case - to avoid incorrectly detecting the ring + * as empty - the CPU consuming the ring entries is responsible + * for either consuming all ring entries until the ring is empty, + * or synchronizing with some other CPU and causing it to + * re-test __ptr_ring_empty and/or consume the ring enteries + * after the synchronization point. + * + * Note: callers invoking this in a loop must use a compiler barrier, + * for example cpu_relax(). */ static inline bool __ptr_ring_empty(struct ptr_ring *r) { - return !__ptr_ring_peek(r); + if (likely(r->size)) + return !r->queue[READ_ONCE(r->consumer_head)]; + return true; } static inline bool ptr_ring_empty(struct ptr_ring *r) @@ -236,22 +249,28 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r) /* Fundamentally, what we want to do is update consumer * index and zero out the entry so producer can reuse it. * Doing it naively at each consume would be as simple as: - * r->queue[r->consumer++] = NULL; - * if (unlikely(r->consumer >= r->size)) - * r->consumer = 0; + * consumer = r->consumer; + * r->queue[consumer++] = NULL; + * if (unlikely(consumer >= r->size)) + * consumer = 0; + * r->consumer = consumer; * but that is suboptimal when the ring is full as producer is writing * out new entries in the same cache line. Defer these updates until a * batch of entries has been consumed. */ - int head = r->consumer_head++; + /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty + * to work correctly. + */ + int consumer_head = r->consumer_head; + int head = consumer_head++; /* Once we have processed enough entries invalidate them in * the ring all at once so producer can reuse their space in the ring. * We also do this when we reach end of the ring - not mandatory * but helps keep the implementation simple. */ - if (unlikely(r->consumer_head - r->consumer_tail >= r->batch || - r->consumer_head >= r->size)) { + if (unlikely(consumer_head - r->consumer_tail >= r->batch || + consumer_head >= r->size)) { /* Zero out entries in the reverse order: this way we touch the * cache line that producer might currently be reading the last; * producer won't make progress and touch other cache lines @@ -259,18 +278,24 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r) */ while (likely(head >= r->consumer_tail)) r->queue[head--] = NULL; - r->consumer_tail = r->consumer_head; + r->consumer_tail = consumer_head; } - if (unlikely(r->consumer_head >= r->size)) { - r->consumer_head = 0; + if (unlikely(consumer_head >= r->size)) { + consumer_head = 0; r->consumer_tail = 0; } + /* matching READ_ONCE in __ptr_ring_empty for lockless tests */ + WRITE_ONCE(r->consumer_head, consumer_head); } static inline void *__ptr_ring_consume(struct ptr_ring *r) { void *ptr; + /* The READ_ONCE in __ptr_ring_peek guarantees that anyone + * accessing data through the pointer is up to date. Pairs + * with smp_wmb in __ptr_ring_produce. + */ ptr = __ptr_ring_peek(r); if (ptr) __ptr_ring_discard_one(r); @@ -436,9 +461,14 @@ static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r, __PTR_RING_PEEK_CALL_v; \ }) +/* Not all gfp_t flags (besides GFP_KERNEL) are allowed. See + * documentation for vmalloc for which of them are legal. + */ static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t gfp) { - return kcalloc(size, sizeof(void *), gfp); + if (size > KMALLOC_MAX_SIZE / sizeof(void *)) + return NULL; + return kvmalloc_array(size, sizeof(void *), gfp | __GFP_ZERO); } static inline void __ptr_ring_set_size(struct ptr_ring *r, int size) @@ -512,7 +542,9 @@ static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n, goto done; } r->queue[head] = batch[--n]; - r->consumer_tail = r->consumer_head = head; + r->consumer_tail = head; + /* matching READ_ONCE in __ptr_ring_empty for lockless tests */ + WRITE_ONCE(r->consumer_head, head); } done: @@ -537,6 +569,8 @@ static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue, else if (destroy) destroy(ptr); + if (producer >= size) + producer = 0; __ptr_ring_set_size(r, size); r->producer = producer; r->consumer_head = 0; @@ -571,7 +605,7 @@ static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp, spin_unlock(&(r)->producer_lock); spin_unlock_irqrestore(&(r)->consumer_lock, flags); - kfree(old); + kvfree(old); return 0; } @@ -611,7 +645,7 @@ static inline int ptr_ring_resize_multiple(struct ptr_ring **rings, } for (i = 0; i < nrings; ++i) - kfree(queues[i]); + kvfree(queues[i]); kfree(queues); @@ -619,7 +653,7 @@ static inline int ptr_ring_resize_multiple(struct ptr_ring **rings, nomem: while (--i >= 0) - kfree(queues[i]); + kvfree(queues[i]); kfree(queues); @@ -634,7 +668,7 @@ static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *)) if (destroy) while ((ptr = ptr_ring_consume(r))) destroy(ptr); - kfree(r->queue); + kvfree(r->queue); } #endif /* _LINUX_PTR_RING_H */ -- cgit v1.2.3-59-g8ed1b