aboutsummaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
authorJason A. Donenfeld <Jason@zx2c4.com>2020-04-22 14:43:48 -0600
committerJason A. Donenfeld <Jason@zx2c4.com>2020-04-22 14:44:07 -0600
commit06eb9a6b9abd04481f2a3d561282453b9a1720c8 (patch)
tree1476ec78c67f63ad10dc8cecd397ec06b02754f0
parentcompat: include sch_generic.h header for skb_reset_tc (diff)
downloadwireguard-linux-compat-06eb9a6b9abd04481f2a3d561282453b9a1720c8.tar.xz
wireguard-linux-compat-06eb9a6b9abd04481f2a3d561282453b9a1720c8.zip
compat: import latest fixes for ptr_ring
Signed-off-by: Jason A. Donenfeld <Jason@zx2c4.com>
-rw-r--r--src/compat/ptr_ring/include/linux/ptr_ring.h106
1 files changed, 70 insertions, 36 deletions
diff --git a/src/compat/ptr_ring/include/linux/ptr_ring.h b/src/compat/ptr_ring/include/linux/ptr_ring.h
index 37b4bb2..417db0a 100644
--- a/src/compat/ptr_ring/include/linux/ptr_ring.h
+++ b/src/compat/ptr_ring/include/linux/ptr_ring.h
@@ -1,3 +1,4 @@
+/* SPDX-License-Identifier: GPL-2.0-or-later */
/*
* Definitions for the 'struct ptr_ring' datastructure.
*
@@ -6,11 +7,6 @@
*
* Copyright (C) 2016 Red Hat, Inc.
*
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; either version 2 of the License, or (at your
- * option) any later version.
- *
* This is a limited-size FIFO maintaining pointers in FIFO order, with
* one CPU producing entries and another consuming entries from a FIFO.
*
@@ -26,8 +22,8 @@
#include <linux/cache.h>
#include <linux/types.h>
#include <linux/compiler.h>
-#include <linux/cache.h>
#include <linux/slab.h>
+#include <linux/mm.h>
#include <asm/errno.h>
#endif
@@ -45,9 +41,10 @@ struct ptr_ring {
};
/* Note: callers invoking this in a loop must use a compiler barrier,
- * for example cpu_relax(). If ring is ever resized, callers must hold
- * producer_lock - see e.g. ptr_ring_full. Otherwise, if callers don't hold
- * producer_lock, the next call to __ptr_ring_produce may fail.
+ * for example cpu_relax().
+ *
+ * NB: this is unlike __ptr_ring_empty in that callers must hold producer_lock:
+ * see e.g. ptr_ring_full.
*/
static inline bool __ptr_ring_full(struct ptr_ring *r)
{
@@ -101,13 +98,19 @@ static inline bool ptr_ring_full_bh(struct ptr_ring *r)
/* Note: callers invoking this in a loop must use a compiler barrier,
* for example cpu_relax(). Callers must hold producer_lock.
+ * Callers are responsible for making sure pointer that is being queued
+ * points to a valid data.
*/
static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
{
if (unlikely(!r->size) || r->queue[r->producer])
return -ENOSPC;
- r->queue[r->producer++] = ptr;
+ /* Make sure the pointer we are storing points to a valid data. */
+ /* Pairs with smp_read_barrier_depends in __ptr_ring_consume. */
+ smp_wmb();
+
+ WRITE_ONCE(r->queue[r->producer++], ptr);
if (unlikely(r->producer >= r->size))
r->producer = 0;
return 0;
@@ -163,26 +166,36 @@ static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
return ret;
}
-/* Note: callers invoking this in a loop must use a compiler barrier,
- * for example cpu_relax(). Callers must take consumer_lock
- * if they dereference the pointer - see e.g. PTR_RING_PEEK_CALL.
- * If ring is never resized, and if the pointer is merely
- * tested, there's no need to take the lock - see e.g. __ptr_ring_empty.
- */
static inline void *__ptr_ring_peek(struct ptr_ring *r)
{
if (likely(r->size))
- return r->queue[r->consumer_head];
+ return READ_ONCE(r->queue[r->consumer_head]);
return NULL;
}
-/* Note: callers invoking this in a loop must use a compiler barrier,
- * for example cpu_relax(). Callers must take consumer_lock
- * if the ring is ever resized - see e.g. ptr_ring_empty.
+/*
+ * Test ring empty status without taking any locks.
+ *
+ * NB: This is only safe to call if ring is never resized.
+ *
+ * However, if some other CPU consumes ring entries at the same time, the value
+ * returned is not guaranteed to be correct.
+ *
+ * In this case - to avoid incorrectly detecting the ring
+ * as empty - the CPU consuming the ring entries is responsible
+ * for either consuming all ring entries until the ring is empty,
+ * or synchronizing with some other CPU and causing it to
+ * re-test __ptr_ring_empty and/or consume the ring enteries
+ * after the synchronization point.
+ *
+ * Note: callers invoking this in a loop must use a compiler barrier,
+ * for example cpu_relax().
*/
static inline bool __ptr_ring_empty(struct ptr_ring *r)
{
- return !__ptr_ring_peek(r);
+ if (likely(r->size))
+ return !r->queue[READ_ONCE(r->consumer_head)];
+ return true;
}
static inline bool ptr_ring_empty(struct ptr_ring *r)
@@ -236,22 +249,28 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r)
/* Fundamentally, what we want to do is update consumer
* index and zero out the entry so producer can reuse it.
* Doing it naively at each consume would be as simple as:
- * r->queue[r->consumer++] = NULL;
- * if (unlikely(r->consumer >= r->size))
- * r->consumer = 0;
+ * consumer = r->consumer;
+ * r->queue[consumer++] = NULL;
+ * if (unlikely(consumer >= r->size))
+ * consumer = 0;
+ * r->consumer = consumer;
* but that is suboptimal when the ring is full as producer is writing
* out new entries in the same cache line. Defer these updates until a
* batch of entries has been consumed.
*/
- int head = r->consumer_head++;
+ /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty
+ * to work correctly.
+ */
+ int consumer_head = r->consumer_head;
+ int head = consumer_head++;
/* Once we have processed enough entries invalidate them in
* the ring all at once so producer can reuse their space in the ring.
* We also do this when we reach end of the ring - not mandatory
* but helps keep the implementation simple.
*/
- if (unlikely(r->consumer_head - r->consumer_tail >= r->batch ||
- r->consumer_head >= r->size)) {
+ if (unlikely(consumer_head - r->consumer_tail >= r->batch ||
+ consumer_head >= r->size)) {
/* Zero out entries in the reverse order: this way we touch the
* cache line that producer might currently be reading the last;
* producer won't make progress and touch other cache lines
@@ -259,18 +278,24 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r)
*/
while (likely(head >= r->consumer_tail))
r->queue[head--] = NULL;
- r->consumer_tail = r->consumer_head;
+ r->consumer_tail = consumer_head;
}
- if (unlikely(r->consumer_head >= r->size)) {
- r->consumer_head = 0;
+ if (unlikely(consumer_head >= r->size)) {
+ consumer_head = 0;
r->consumer_tail = 0;
}
+ /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
+ WRITE_ONCE(r->consumer_head, consumer_head);
}
static inline void *__ptr_ring_consume(struct ptr_ring *r)
{
void *ptr;
+ /* The READ_ONCE in __ptr_ring_peek guarantees that anyone
+ * accessing data through the pointer is up to date. Pairs
+ * with smp_wmb in __ptr_ring_produce.
+ */
ptr = __ptr_ring_peek(r);
if (ptr)
__ptr_ring_discard_one(r);
@@ -436,9 +461,14 @@ static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r,
__PTR_RING_PEEK_CALL_v; \
})
+/* Not all gfp_t flags (besides GFP_KERNEL) are allowed. See
+ * documentation for vmalloc for which of them are legal.
+ */
static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t gfp)
{
- return kcalloc(size, sizeof(void *), gfp);
+ if (size > KMALLOC_MAX_SIZE / sizeof(void *))
+ return NULL;
+ return kvmalloc_array(size, sizeof(void *), gfp | __GFP_ZERO);
}
static inline void __ptr_ring_set_size(struct ptr_ring *r, int size)
@@ -512,7 +542,9 @@ static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n,
goto done;
}
r->queue[head] = batch[--n];
- r->consumer_tail = r->consumer_head = head;
+ r->consumer_tail = head;
+ /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
+ WRITE_ONCE(r->consumer_head, head);
}
done:
@@ -537,6 +569,8 @@ static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue,
else if (destroy)
destroy(ptr);
+ if (producer >= size)
+ producer = 0;
__ptr_ring_set_size(r, size);
r->producer = producer;
r->consumer_head = 0;
@@ -571,7 +605,7 @@ static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp,
spin_unlock(&(r)->producer_lock);
spin_unlock_irqrestore(&(r)->consumer_lock, flags);
- kfree(old);
+ kvfree(old);
return 0;
}
@@ -611,7 +645,7 @@ static inline int ptr_ring_resize_multiple(struct ptr_ring **rings,
}
for (i = 0; i < nrings; ++i)
- kfree(queues[i]);
+ kvfree(queues[i]);
kfree(queues);
@@ -619,7 +653,7 @@ static inline int ptr_ring_resize_multiple(struct ptr_ring **rings,
nomem:
while (--i >= 0)
- kfree(queues[i]);
+ kvfree(queues[i]);
kfree(queues);
@@ -634,7 +668,7 @@ static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *))
if (destroy)
while ((ptr = ptr_ring_consume(r)))
destroy(ptr);
- kfree(r->queue);
+ kvfree(r->queue);
}
#endif /* _LINUX_PTR_RING_H */