aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorAlexei Starovoitov <ast@kernel.org>2019-04-16 20:13:10 -0700
committerAlexei Starovoitov <ast@kernel.org>2019-04-16 20:13:11 -0700
commit00967e84f742f87603e769529628e32076ade188 (patch)
tree851eb8e92dd057e389e6a7b733dbda56f8a1d666 /net
parenttools/bpftool: show btf_id in map listing (diff)
parentlibbpf: optimize barrier for XDP socket rings (diff)
downloadlinux-dev-00967e84f742f87603e769529628e32076ade188.tar.xz
linux-dev-00967e84f742f87603e769529628e32076ade188.zip
Merge branch 'af_xdp-smp_mb-fixes'
Magnus Karlsson says: ==================== This patch set fixes one bug and removes two dependencies on Linux kernel headers from the XDP socket code in libbpf. A number of people have pointed out that these two dependencies make it hard to build the XDP socket part of libbpf without any kernel header dependencies. The two removed dependecies are: * Remove the usage of likely and unlikely (compiler.h) in xsk.h. It has been reported that the use of these actually decreases the performance of the ring access code due to an increase in instruction cache misses, so let us just remove these. * Remove the dependency on barrier.h as it brings in a lot of kernel headers. As the XDP socket code only uses two simple functions from it, we can reimplement these. As a bonus, the new implementation is faster as it uses the same barrier primitives as the kernel does when the same code is compiled there. Without this patch, the user land code uses lfence and sfence on x86, which are unnecessarily harsh/thorough. In the process of removing these dependencies a missing barrier function for at least PPC64 was discovered. For a full explanation on the missing barrier, please refer to patch 1. So the patch set now starts with two patches fixing this. I have also added a patch at the end removing this full memory barrier for x86 only, as it is not needed there. Structure of the patch set: Patch 1-2: Adds the missing barrier function in kernel and user space. Patch 3-4: Removes the dependencies Patch 5: Optimizes the added barrier from patch 2 so that it does not do unnecessary work on x86. v2 -> v3: * Added missing memory barrier in ring code * Added an explanation on the three barriers we use in the code * Moved barrier functions from xsk.h to libbpf_util.h * Added comment on why we have these functions in libbpf_util.h * Added a new barrier function in user space that makes it possible to remove the full memory barrier on x86. v1 -> v2: * Added comment about validity of ARM 32-bit barriers. Only armv7 and above. /Magnus ==================== Acked-by: Song Liu <songliubraving@fb.com> Signed-off-by: Alexei Starovoitov <ast@kernel.org>
Diffstat (limited to 'net')
-rw-r--r--net/xdp/xsk_queue.h56
1 files changed, 52 insertions, 4 deletions
diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h
index 610c0bdc0c2b..88b9ae24658d 100644
--- a/net/xdp/xsk_queue.h
+++ b/net/xdp/xsk_queue.h
@@ -43,6 +43,48 @@ struct xsk_queue {
u64 invalid_descs;
};
+/* The structure of the shared state of the rings are the same as the
+ * ring buffer in kernel/events/ring_buffer.c. For the Rx and completion
+ * ring, the kernel is the producer and user space is the consumer. For
+ * the Tx and fill rings, the kernel is the consumer and user space is
+ * the producer.
+ *
+ * producer consumer
+ *
+ * if (LOAD ->consumer) { LOAD ->producer
+ * (A) smp_rmb() (C)
+ * STORE $data LOAD $data
+ * smp_wmb() (B) smp_mb() (D)
+ * STORE ->producer STORE ->consumer
+ * }
+ *
+ * (A) pairs with (D), and (B) pairs with (C).
+ *
+ * Starting with (B), it protects the data from being written after
+ * the producer pointer. If this barrier was missing, the consumer
+ * could observe the producer pointer being set and thus load the data
+ * before the producer has written the new data. The consumer would in
+ * this case load the old data.
+ *
+ * (C) protects the consumer from speculatively loading the data before
+ * the producer pointer actually has been read. If we do not have this
+ * barrier, some architectures could load old data as speculative loads
+ * are not discarded as the CPU does not know there is a dependency
+ * between ->producer and data.
+ *
+ * (A) is a control dependency that separates the load of ->consumer
+ * from the stores of $data. In case ->consumer indicates there is no
+ * room in the buffer to store $data we do not. So no barrier is needed.
+ *
+ * (D) protects the load of the data to be observed to happen after the
+ * store of the consumer pointer. If we did not have this memory
+ * barrier, the producer could observe the consumer pointer being set
+ * and overwrite the data with a new value before the consumer got the
+ * chance to read the old value. The consumer would thus miss reading
+ * the old entry and very likely read the new entry twice, once right
+ * now and again after circling through the ring.
+ */
+
/* Common functions operating for both RXTX and umem queues */
static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q)
@@ -106,6 +148,7 @@ static inline u64 *xskq_validate_addr(struct xsk_queue *q, u64 *addr)
static inline u64 *xskq_peek_addr(struct xsk_queue *q, u64 *addr)
{
if (q->cons_tail == q->cons_head) {
+ smp_mb(); /* D, matches A */
WRITE_ONCE(q->ring->consumer, q->cons_tail);
q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE);
@@ -128,10 +171,11 @@ static inline int xskq_produce_addr(struct xsk_queue *q, u64 addr)
if (xskq_nb_free(q, q->prod_tail, 1) == 0)
return -ENOSPC;
+ /* A, matches D */
ring->desc[q->prod_tail++ & q->ring_mask] = addr;
/* Order producer and data */
- smp_wmb();
+ smp_wmb(); /* B, matches C */
WRITE_ONCE(q->ring->producer, q->prod_tail);
return 0;
@@ -144,6 +188,7 @@ static inline int xskq_produce_addr_lazy(struct xsk_queue *q, u64 addr)
if (xskq_nb_free(q, q->prod_head, LAZY_UPDATE_THRESHOLD) == 0)
return -ENOSPC;
+ /* A, matches D */
ring->desc[q->prod_head++ & q->ring_mask] = addr;
return 0;
}
@@ -152,7 +197,7 @@ static inline void xskq_produce_flush_addr_n(struct xsk_queue *q,
u32 nb_entries)
{
/* Order producer and data */
- smp_wmb();
+ smp_wmb(); /* B, matches C */
q->prod_tail += nb_entries;
WRITE_ONCE(q->ring->producer, q->prod_tail);
@@ -163,6 +208,7 @@ static inline int xskq_reserve_addr(struct xsk_queue *q)
if (xskq_nb_free(q, q->prod_head, 1) == 0)
return -ENOSPC;
+ /* A, matches D */
q->prod_head++;
return 0;
}
@@ -204,11 +250,12 @@ static inline struct xdp_desc *xskq_peek_desc(struct xsk_queue *q,
struct xdp_desc *desc)
{
if (q->cons_tail == q->cons_head) {
+ smp_mb(); /* D, matches A */
WRITE_ONCE(q->ring->consumer, q->cons_tail);
q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE);
/* Order consumer and data */
- smp_rmb();
+ smp_rmb(); /* C, matches B */
}
return xskq_validate_desc(q, desc);
@@ -228,6 +275,7 @@ static inline int xskq_produce_batch_desc(struct xsk_queue *q,
if (xskq_nb_free(q, q->prod_head, 1) == 0)
return -ENOSPC;
+ /* A, matches D */
idx = (q->prod_head++) & q->ring_mask;
ring->desc[idx].addr = addr;
ring->desc[idx].len = len;
@@ -238,7 +286,7 @@ static inline int xskq_produce_batch_desc(struct xsk_queue *q,
static inline void xskq_produce_flush_desc(struct xsk_queue *q)
{
/* Order producer and data */
- smp_wmb();
+ smp_wmb(); /* B, matches C */
q->prod_tail = q->prod_head,
WRITE_ONCE(q->ring->producer, q->prod_tail);