aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--drivers/net/ethernet/intel/i40e/i40e_xsk.c4
-rw-r--r--drivers/net/ethernet/intel/ice/ice_xsk.c4
-rw-r--r--drivers/net/ethernet/intel/ixgbe/ixgbe_xsk.c4
-rw-r--r--drivers/net/ethernet/mellanox/mlx5/core/en/xsk/rx.c2
-rw-r--r--include/net/xdp_sock.h14
-rw-r--r--net/xdp/xsk.c62
-rw-r--r--net/xdp/xsk_queue.c15
-rw-r--r--net/xdp/xsk_queue.h371
8 files changed, 246 insertions, 230 deletions
diff --git a/drivers/net/ethernet/intel/i40e/i40e_xsk.c b/drivers/net/ethernet/intel/i40e/i40e_xsk.c
index d07e1a890428..20d6f11a3468 100644
--- a/drivers/net/ethernet/intel/i40e/i40e_xsk.c
+++ b/drivers/net/ethernet/intel/i40e/i40e_xsk.c
@@ -269,7 +269,7 @@ static bool i40e_alloc_buffer_zc(struct i40e_ring *rx_ring,
bi->handle = xsk_umem_adjust_offset(umem, handle, umem->headroom);
- xsk_umem_discard_addr(umem);
+ xsk_umem_release_addr(umem);
return true;
}
@@ -306,7 +306,7 @@ static bool i40e_alloc_buffer_slow_zc(struct i40e_ring *rx_ring,
bi->handle = xsk_umem_adjust_offset(umem, handle, umem->headroom);
- xsk_umem_discard_addr_rq(umem);
+ xsk_umem_release_addr_rq(umem);
return true;
}
diff --git a/drivers/net/ethernet/intel/ice/ice_xsk.c b/drivers/net/ethernet/intel/ice/ice_xsk.c
index cf9b8b22d24f..0af1f0b951c0 100644
--- a/drivers/net/ethernet/intel/ice/ice_xsk.c
+++ b/drivers/net/ethernet/intel/ice/ice_xsk.c
@@ -555,7 +555,7 @@ ice_alloc_buf_fast_zc(struct ice_ring *rx_ring, struct ice_rx_buf *rx_buf)
rx_buf->handle = handle + umem->headroom;
- xsk_umem_discard_addr(umem);
+ xsk_umem_release_addr(umem);
return true;
}
@@ -591,7 +591,7 @@ ice_alloc_buf_slow_zc(struct ice_ring *rx_ring, struct ice_rx_buf *rx_buf)
rx_buf->handle = handle + umem->headroom;
- xsk_umem_discard_addr_rq(umem);
+ xsk_umem_release_addr_rq(umem);
return true;
}
diff --git a/drivers/net/ethernet/intel/ixgbe/ixgbe_xsk.c b/drivers/net/ethernet/intel/ixgbe/ixgbe_xsk.c
index d6feaacfbf89..1501d0a22078 100644
--- a/drivers/net/ethernet/intel/ixgbe/ixgbe_xsk.c
+++ b/drivers/net/ethernet/intel/ixgbe/ixgbe_xsk.c
@@ -277,7 +277,7 @@ static bool ixgbe_alloc_buffer_zc(struct ixgbe_ring *rx_ring,
bi->handle = xsk_umem_adjust_offset(umem, handle, umem->headroom);
- xsk_umem_discard_addr(umem);
+ xsk_umem_release_addr(umem);
return true;
}
@@ -304,7 +304,7 @@ static bool ixgbe_alloc_buffer_slow_zc(struct ixgbe_ring *rx_ring,
bi->handle = xsk_umem_adjust_offset(umem, handle, umem->headroom);
- xsk_umem_discard_addr_rq(umem);
+ xsk_umem_release_addr_rq(umem);
return true;
}
diff --git a/drivers/net/ethernet/mellanox/mlx5/core/en/xsk/rx.c b/drivers/net/ethernet/mellanox/mlx5/core/en/xsk/rx.c
index 475b6bd5d29b..62fc8a128a8d 100644
--- a/drivers/net/ethernet/mellanox/mlx5/core/en/xsk/rx.c
+++ b/drivers/net/ethernet/mellanox/mlx5/core/en/xsk/rx.c
@@ -35,7 +35,7 @@ int mlx5e_xsk_page_alloc_umem(struct mlx5e_rq *rq,
*/
dma_info->addr = xdp_umem_get_dma(umem, handle);
- xsk_umem_discard_addr_rq(umem);
+ xsk_umem_release_addr_rq(umem);
dma_sync_single_for_device(rq->pdev, dma_info->addr, PAGE_SIZE,
DMA_BIDIRECTIONAL);
diff --git a/include/net/xdp_sock.h b/include/net/xdp_sock.h
index 48594740d67c..e86ec48ef627 100644
--- a/include/net/xdp_sock.h
+++ b/include/net/xdp_sock.h
@@ -118,8 +118,8 @@ int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp);
bool xsk_is_setup_for_bpf_map(struct xdp_sock *xs);
/* Used from netdev driver */
bool xsk_umem_has_addrs(struct xdp_umem *umem, u32 cnt);
-u64 *xsk_umem_peek_addr(struct xdp_umem *umem, u64 *addr);
-void xsk_umem_discard_addr(struct xdp_umem *umem);
+bool xsk_umem_peek_addr(struct xdp_umem *umem, u64 *addr);
+void xsk_umem_release_addr(struct xdp_umem *umem);
void xsk_umem_complete_tx(struct xdp_umem *umem, u32 nb_entries);
bool xsk_umem_consume_tx(struct xdp_umem *umem, struct xdp_desc *desc);
void xsk_umem_consume_tx_done(struct xdp_umem *umem);
@@ -197,7 +197,7 @@ static inline bool xsk_umem_has_addrs_rq(struct xdp_umem *umem, u32 cnt)
return xsk_umem_has_addrs(umem, cnt - rq->length);
}
-static inline u64 *xsk_umem_peek_addr_rq(struct xdp_umem *umem, u64 *addr)
+static inline bool xsk_umem_peek_addr_rq(struct xdp_umem *umem, u64 *addr)
{
struct xdp_umem_fq_reuse *rq = umem->fq_reuse;
@@ -208,12 +208,12 @@ static inline u64 *xsk_umem_peek_addr_rq(struct xdp_umem *umem, u64 *addr)
return addr;
}
-static inline void xsk_umem_discard_addr_rq(struct xdp_umem *umem)
+static inline void xsk_umem_release_addr_rq(struct xdp_umem *umem)
{
struct xdp_umem_fq_reuse *rq = umem->fq_reuse;
if (!rq->length)
- xsk_umem_discard_addr(umem);
+ xsk_umem_release_addr(umem);
else
rq->length--;
}
@@ -258,7 +258,7 @@ static inline u64 *xsk_umem_peek_addr(struct xdp_umem *umem, u64 *addr)
return NULL;
}
-static inline void xsk_umem_discard_addr(struct xdp_umem *umem)
+static inline void xsk_umem_release_addr(struct xdp_umem *umem)
{
}
@@ -332,7 +332,7 @@ static inline u64 *xsk_umem_peek_addr_rq(struct xdp_umem *umem, u64 *addr)
return NULL;
}
-static inline void xsk_umem_discard_addr_rq(struct xdp_umem *umem)
+static inline void xsk_umem_release_addr_rq(struct xdp_umem *umem)
{
}
diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c
index e45c27f5cfca..5560d60c1ff5 100644
--- a/net/xdp/xsk.c
+++ b/net/xdp/xsk.c
@@ -41,21 +41,21 @@ bool xsk_is_setup_for_bpf_map(struct xdp_sock *xs)
bool xsk_umem_has_addrs(struct xdp_umem *umem, u32 cnt)
{
- return xskq_has_addrs(umem->fq, cnt);
+ return xskq_cons_has_entries(umem->fq, cnt);
}
EXPORT_SYMBOL(xsk_umem_has_addrs);
-u64 *xsk_umem_peek_addr(struct xdp_umem *umem, u64 *addr)
+bool xsk_umem_peek_addr(struct xdp_umem *umem, u64 *addr)
{
- return xskq_peek_addr(umem->fq, addr, umem);
+ return xskq_cons_peek_addr(umem->fq, addr, umem);
}
EXPORT_SYMBOL(xsk_umem_peek_addr);
-void xsk_umem_discard_addr(struct xdp_umem *umem)
+void xsk_umem_release_addr(struct xdp_umem *umem)
{
- xskq_discard_addr(umem->fq);
+ xskq_cons_release(umem->fq);
}
-EXPORT_SYMBOL(xsk_umem_discard_addr);
+EXPORT_SYMBOL(xsk_umem_release_addr);
void xsk_set_rx_need_wakeup(struct xdp_umem *umem)
{
@@ -126,7 +126,7 @@ static void __xsk_rcv_memcpy(struct xdp_umem *umem, u64 addr, void *from_buf,
void *to_buf = xdp_umem_get_data(umem, addr);
addr = xsk_umem_add_offset_to_addr(addr);
- if (xskq_crosses_non_contig_pg(umem, addr, len + metalen)) {
+ if (xskq_cons_crosses_non_contig_pg(umem, addr, len + metalen)) {
void *next_pg_addr = umem->pages[(addr >> PAGE_SHIFT) + 1].addr;
u64 page_start = addr & ~(PAGE_SIZE - 1);
u64 first_len = PAGE_SIZE - (addr - page_start);
@@ -148,7 +148,7 @@ static int __xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len)
u32 metalen;
int err;
- if (!xskq_peek_addr(xs->umem->fq, &addr, xs->umem) ||
+ if (!xskq_cons_peek_addr(xs->umem->fq, &addr, xs->umem) ||
len > xs->umem->chunk_size_nohr - XDP_PACKET_HEADROOM) {
xs->rx_dropped++;
return -ENOSPC;
@@ -167,9 +167,9 @@ static int __xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len)
offset += metalen;
addr = xsk_umem_adjust_offset(xs->umem, addr, offset);
- err = xskq_produce_batch_desc(xs->rx, addr, len);
+ err = xskq_prod_reserve_desc(xs->rx, addr, len);
if (!err) {
- xskq_discard_addr(xs->umem->fq);
+ xskq_cons_release(xs->umem->fq);
xdp_return_buff(xdp);
return 0;
}
@@ -180,7 +180,7 @@ static int __xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len)
static int __xsk_rcv_zc(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len)
{
- int err = xskq_produce_batch_desc(xs->rx, (u64)xdp->handle, len);
+ int err = xskq_prod_reserve_desc(xs->rx, xdp->handle, len);
if (err)
xs->rx_dropped++;
@@ -216,7 +216,7 @@ static int xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
static void xsk_flush(struct xdp_sock *xs)
{
- xskq_produce_flush_desc(xs->rx);
+ xskq_prod_submit(xs->rx);
xs->sk.sk_data_ready(&xs->sk);
}
@@ -236,7 +236,7 @@ int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
goto out_unlock;
}
- if (!xskq_peek_addr(xs->umem->fq, &addr, xs->umem) ||
+ if (!xskq_cons_peek_addr(xs->umem->fq, &addr, xs->umem) ||
len > xs->umem->chunk_size_nohr - XDP_PACKET_HEADROOM) {
err = -ENOSPC;
goto out_drop;
@@ -247,12 +247,12 @@ int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
memcpy(buffer, xdp->data_meta, len + metalen);
addr = xsk_umem_adjust_offset(xs->umem, addr, metalen);
- err = xskq_produce_batch_desc(xs->rx, addr, len);
+ err = xskq_prod_reserve_desc(xs->rx, addr, len);
if (err)
goto out_drop;
- xskq_discard_addr(xs->umem->fq);
- xskq_produce_flush_desc(xs->rx);
+ xskq_cons_release(xs->umem->fq);
+ xskq_prod_submit(xs->rx);
spin_unlock_bh(&xs->rx_lock);
@@ -294,7 +294,7 @@ void __xsk_map_flush(void)
void xsk_umem_complete_tx(struct xdp_umem *umem, u32 nb_entries)
{
- xskq_produce_flush_addr_n(umem->cq, nb_entries);
+ xskq_prod_submit_n(umem->cq, nb_entries);
}
EXPORT_SYMBOL(xsk_umem_complete_tx);
@@ -316,13 +316,18 @@ bool xsk_umem_consume_tx(struct xdp_umem *umem, struct xdp_desc *desc)
rcu_read_lock();
list_for_each_entry_rcu(xs, &umem->xsk_list, list) {
- if (!xskq_peek_desc(xs->tx, desc, umem))
+ if (!xskq_cons_peek_desc(xs->tx, desc, umem))
continue;
- if (xskq_produce_addr_lazy(umem->cq, desc->addr))
+ /* This is the backpreassure mechanism for the Tx path.
+ * Reserve space in the completion queue and only proceed
+ * if there is space in it. This avoids having to implement
+ * any buffering in the Tx path.
+ */
+ if (xskq_prod_reserve_addr(umem->cq, desc->addr))
goto out;
- xskq_discard_desc(xs->tx);
+ xskq_cons_release(xs->tx);
rcu_read_unlock();
return true;
}
@@ -348,7 +353,7 @@ static void xsk_destruct_skb(struct sk_buff *skb)
unsigned long flags;
spin_lock_irqsave(&xs->tx_completion_lock, flags);
- WARN_ON_ONCE(xskq_produce_addr(xs->umem->cq, addr));
+ xskq_prod_submit_addr(xs->umem->cq, addr);
spin_unlock_irqrestore(&xs->tx_completion_lock, flags);
sock_wfree(skb);
@@ -368,7 +373,7 @@ static int xsk_generic_xmit(struct sock *sk)
if (xs->queue_id >= xs->dev->real_num_tx_queues)
goto out;
- while (xskq_peek_desc(xs->tx, &desc, xs->umem)) {
+ while (xskq_cons_peek_desc(xs->tx, &desc, xs->umem)) {
char *buffer;
u64 addr;
u32 len;
@@ -389,7 +394,12 @@ static int xsk_generic_xmit(struct sock *sk)
addr = desc.addr;
buffer = xdp_umem_get_data(xs->umem, addr);
err = skb_store_bits(skb, 0, buffer, len);
- if (unlikely(err) || xskq_reserve_addr(xs->umem->cq)) {
+ /* This is the backpreassure mechanism for the Tx path.
+ * Reserve space in the completion queue and only proceed
+ * if there is space in it. This avoids having to implement
+ * any buffering in the Tx path.
+ */
+ if (unlikely(err) || xskq_prod_reserve(xs->umem->cq)) {
kfree_skb(skb);
goto out;
}
@@ -401,7 +411,7 @@ static int xsk_generic_xmit(struct sock *sk)
skb->destructor = xsk_destruct_skb;
err = dev_direct_xmit(skb, xs->queue_id);
- xskq_discard_desc(xs->tx);
+ xskq_cons_release(xs->tx);
/* Ignore NET_XMIT_CN as packet might have been sent */
if (err == NET_XMIT_DROP || err == NETDEV_TX_BUSY) {
/* SKB completed but not sent */
@@ -470,9 +480,9 @@ static __poll_t xsk_poll(struct file *file, struct socket *sock,
__xsk_sendmsg(sk);
}
- if (xs->rx && !xskq_empty_desc(xs->rx))
+ if (xs->rx && !xskq_prod_is_empty(xs->rx))
mask |= EPOLLIN | EPOLLRDNORM;
- if (xs->tx && !xskq_full_desc(xs->tx))
+ if (xs->tx && !xskq_cons_is_full(xs->tx))
mask |= EPOLLOUT | EPOLLWRNORM;
return mask;
diff --git a/net/xdp/xsk_queue.c b/net/xdp/xsk_queue.c
index b66504592d9b..c90e9c1e3c63 100644
--- a/net/xdp/xsk_queue.c
+++ b/net/xdp/xsk_queue.c
@@ -18,14 +18,14 @@ void xskq_set_umem(struct xsk_queue *q, u64 size, u64 chunk_mask)
q->chunk_mask = chunk_mask;
}
-static u32 xskq_umem_get_ring_size(struct xsk_queue *q)
+static size_t xskq_get_ring_size(struct xsk_queue *q, bool umem_queue)
{
- return sizeof(struct xdp_umem_ring) + q->nentries * sizeof(u64);
-}
+ struct xdp_umem_ring *umem_ring;
+ struct xdp_rxtx_ring *rxtx_ring;
-static u32 xskq_rxtx_get_ring_size(struct xsk_queue *q)
-{
- return sizeof(struct xdp_ring) + q->nentries * sizeof(struct xdp_desc);
+ if (umem_queue)
+ return struct_size(umem_ring, desc, q->nentries);
+ return struct_size(rxtx_ring, desc, q->nentries);
}
struct xsk_queue *xskq_create(u32 nentries, bool umem_queue)
@@ -43,8 +43,7 @@ struct xsk_queue *xskq_create(u32 nentries, bool umem_queue)
gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN |
__GFP_COMP | __GFP_NORETRY;
- size = umem_queue ? xskq_umem_get_ring_size(q) :
- xskq_rxtx_get_ring_size(q);
+ size = xskq_get_ring_size(q, umem_queue);
q->ring = (struct xdp_ring *)__get_free_pages(gfp_flags,
get_order(size));
diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h
index eddae4688862..bec2af11853a 100644
--- a/net/xdp/xsk_queue.h
+++ b/net/xdp/xsk_queue.h
@@ -10,9 +10,6 @@
#include <linux/if_xdp.h>
#include <net/xdp_sock.h>
-#define RX_BATCH_SIZE 16
-#define LAZY_UPDATE_THRESHOLD 128
-
struct xdp_ring {
u32 producer ____cacheline_aligned_in_smp;
u32 consumer ____cacheline_aligned_in_smp;
@@ -36,10 +33,8 @@ struct xsk_queue {
u64 size;
u32 ring_mask;
u32 nentries;
- u32 prod_head;
- u32 prod_tail;
- u32 cons_head;
- u32 cons_tail;
+ u32 cached_prod;
+ u32 cached_cons;
struct xdp_ring *ring;
u64 invalid_descs;
};
@@ -86,56 +81,31 @@ struct xsk_queue {
* now and again after circling through the ring.
*/
-/* Common functions operating for both RXTX and umem queues */
-
-static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q)
-{
- return q ? q->invalid_descs : 0;
-}
-
-static inline u32 xskq_nb_avail(struct xsk_queue *q, u32 dcnt)
-{
- u32 entries = q->prod_tail - q->cons_tail;
-
- if (entries == 0) {
- /* Refresh the local pointer */
- q->prod_tail = READ_ONCE(q->ring->producer);
- entries = q->prod_tail - q->cons_tail;
- }
-
- return (entries > dcnt) ? dcnt : entries;
-}
-
-static inline u32 xskq_nb_free(struct xsk_queue *q, u32 producer, u32 dcnt)
-{
- u32 free_entries = q->nentries - (producer - q->cons_tail);
-
- if (free_entries >= dcnt)
- return free_entries;
-
- /* Refresh the local tail pointer */
- q->cons_tail = READ_ONCE(q->ring->consumer);
- return q->nentries - (producer - q->cons_tail);
-}
-
-static inline bool xskq_has_addrs(struct xsk_queue *q, u32 cnt)
-{
- u32 entries = q->prod_tail - q->cons_tail;
-
- if (entries >= cnt)
- return true;
-
- /* Refresh the local pointer. */
- q->prod_tail = READ_ONCE(q->ring->producer);
- entries = q->prod_tail - q->cons_tail;
-
- return entries >= cnt;
-}
+/* The operations on the rings are the following:
+ *
+ * producer consumer
+ *
+ * RESERVE entries PEEK in the ring for entries
+ * WRITE data into the ring READ data from the ring
+ * SUBMIT entries RELEASE entries
+ *
+ * The producer reserves one or more entries in the ring. It can then
+ * fill in these entries and finally submit them so that they can be
+ * seen and read by the consumer.
+ *
+ * The consumer peeks into the ring to see if the producer has written
+ * any new entries. If so, the producer can then read these entries
+ * and when it is done reading them release them back to the producer
+ * so that the producer can use these slots to fill in new entries.
+ *
+ * The function names below reflect these operations.
+ */
-/* UMEM queue */
+/* Functions that read and validate content from consumer rings. */
-static inline bool xskq_crosses_non_contig_pg(struct xdp_umem *umem, u64 addr,
- u64 length)
+static inline bool xskq_cons_crosses_non_contig_pg(struct xdp_umem *umem,
+ u64 addr,
+ u64 length)
{
bool cross_pg = (addr & (PAGE_SIZE - 1)) + length > PAGE_SIZE;
bool next_pg_contig =
@@ -145,9 +115,16 @@ static inline bool xskq_crosses_non_contig_pg(struct xdp_umem *umem, u64 addr,
return cross_pg && !next_pg_contig;
}
-static inline bool xskq_is_valid_addr(struct xsk_queue *q, u64 addr)
+static inline bool xskq_cons_is_valid_unaligned(struct xsk_queue *q,
+ u64 addr,
+ u64 length,
+ struct xdp_umem *umem)
{
- if (addr >= q->size) {
+ u64 base_addr = xsk_umem_extract_addr(addr);
+
+ addr = xsk_umem_add_offset_to_addr(addr);
+ if (base_addr >= q->size || addr >= q->size ||
+ xskq_cons_crosses_non_contig_pg(umem, addr, length)) {
q->invalid_descs++;
return false;
}
@@ -155,15 +132,9 @@ static inline bool xskq_is_valid_addr(struct xsk_queue *q, u64 addr)
return true;
}
-static inline bool xskq_is_valid_addr_unaligned(struct xsk_queue *q, u64 addr,
- u64 length,
- struct xdp_umem *umem)
+static inline bool xskq_cons_is_valid_addr(struct xsk_queue *q, u64 addr)
{
- u64 base_addr = xsk_umem_extract_addr(addr);
-
- addr = xsk_umem_add_offset_to_addr(addr);
- if (base_addr >= q->size || addr >= q->size ||
- xskq_crosses_non_contig_pg(umem, addr, length)) {
+ if (addr >= q->size) {
q->invalid_descs++;
return false;
}
@@ -171,204 +142,240 @@ static inline bool xskq_is_valid_addr_unaligned(struct xsk_queue *q, u64 addr,
return true;
}
-static inline u64 *xskq_validate_addr(struct xsk_queue *q, u64 *addr,
- struct xdp_umem *umem)
+static inline bool xskq_cons_read_addr(struct xsk_queue *q, u64 *addr,
+ struct xdp_umem *umem)
{
- while (q->cons_tail != q->cons_head) {
- struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
- unsigned int idx = q->cons_tail & q->ring_mask;
+ struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
- *addr = READ_ONCE(ring->desc[idx]) & q->chunk_mask;
+ while (q->cached_cons != q->cached_prod) {
+ u32 idx = q->cached_cons & q->ring_mask;
+
+ *addr = ring->desc[idx] & q->chunk_mask;
if (umem->flags & XDP_UMEM_UNALIGNED_CHUNK_FLAG) {
- if (xskq_is_valid_addr_unaligned(q, *addr,
+ if (xskq_cons_is_valid_unaligned(q, *addr,
umem->chunk_size_nohr,
umem))
- return addr;
+ return true;
goto out;
}
- if (xskq_is_valid_addr(q, *addr))
- return addr;
+ if (xskq_cons_is_valid_addr(q, *addr))
+ return true;
out:
- q->cons_tail++;
+ q->cached_cons++;
}
- return NULL;
+ return false;
}
-static inline u64 *xskq_peek_addr(struct xsk_queue *q, u64 *addr,
- struct xdp_umem *umem)
+static inline bool xskq_cons_is_valid_desc(struct xsk_queue *q,
+ struct xdp_desc *d,
+ struct xdp_umem *umem)
{
- if (q->cons_tail == q->cons_head) {
- smp_mb(); /* D, matches A */
- WRITE_ONCE(q->ring->consumer, q->cons_tail);
- q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE);
+ if (umem->flags & XDP_UMEM_UNALIGNED_CHUNK_FLAG) {
+ if (!xskq_cons_is_valid_unaligned(q, d->addr, d->len, umem))
+ return false;
+
+ if (d->len > umem->chunk_size_nohr || d->options) {
+ q->invalid_descs++;
+ return false;
+ }
- /* Order consumer and data */
- smp_rmb();
+ return true;
}
- return xskq_validate_addr(q, addr, umem);
-}
+ if (!xskq_cons_is_valid_addr(q, d->addr))
+ return false;
-static inline void xskq_discard_addr(struct xsk_queue *q)
-{
- q->cons_tail++;
+ if (((d->addr + d->len) & q->chunk_mask) != (d->addr & q->chunk_mask) ||
+ d->options) {
+ q->invalid_descs++;
+ return false;
+ }
+
+ return true;
}
-static inline int xskq_produce_addr(struct xsk_queue *q, u64 addr)
+static inline bool xskq_cons_read_desc(struct xsk_queue *q,
+ struct xdp_desc *desc,
+ struct xdp_umem *umem)
{
- struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
-
- if (xskq_nb_free(q, q->prod_tail, 1) == 0)
- return -ENOSPC;
+ while (q->cached_cons != q->cached_prod) {
+ struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
+ u32 idx = q->cached_cons & q->ring_mask;
- /* A, matches D */
- ring->desc[q->prod_tail++ & q->ring_mask] = addr;
+ *desc = ring->desc[idx];
+ if (xskq_cons_is_valid_desc(q, desc, umem))
+ return true;
- /* Order producer and data */
- smp_wmb(); /* B, matches C */
+ q->cached_cons++;
+ }
- WRITE_ONCE(q->ring->producer, q->prod_tail);
- return 0;
+ return false;
}
-static inline int xskq_produce_addr_lazy(struct xsk_queue *q, u64 addr)
+/* Functions for consumers */
+
+static inline void __xskq_cons_release(struct xsk_queue *q)
{
- struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
+ smp_mb(); /* D, matches A */
+ WRITE_ONCE(q->ring->consumer, q->cached_cons);
+}
- if (xskq_nb_free(q, q->prod_head, LAZY_UPDATE_THRESHOLD) == 0)
- return -ENOSPC;
+static inline void __xskq_cons_peek(struct xsk_queue *q)
+{
+ /* Refresh the local pointer */
+ q->cached_prod = READ_ONCE(q->ring->producer);
+ smp_rmb(); /* C, matches B */
+}
- /* A, matches D */
- ring->desc[q->prod_head++ & q->ring_mask] = addr;
- return 0;
+static inline void xskq_cons_get_entries(struct xsk_queue *q)
+{
+ __xskq_cons_release(q);
+ __xskq_cons_peek(q);
}
-static inline void xskq_produce_flush_addr_n(struct xsk_queue *q,
- u32 nb_entries)
+static inline bool xskq_cons_has_entries(struct xsk_queue *q, u32 cnt)
{
- /* Order producer and data */
- smp_wmb(); /* B, matches C */
+ u32 entries = q->cached_prod - q->cached_cons;
- q->prod_tail += nb_entries;
- WRITE_ONCE(q->ring->producer, q->prod_tail);
+ if (entries >= cnt)
+ return true;
+
+ __xskq_cons_peek(q);
+ entries = q->cached_prod - q->cached_cons;
+
+ return entries >= cnt;
}
-static inline int xskq_reserve_addr(struct xsk_queue *q)
+static inline bool xskq_cons_peek_addr(struct xsk_queue *q, u64 *addr,
+ struct xdp_umem *umem)
{
- if (xskq_nb_free(q, q->prod_head, 1) == 0)
- return -ENOSPC;
+ if (q->cached_prod == q->cached_cons)
+ xskq_cons_get_entries(q);
+ return xskq_cons_read_addr(q, addr, umem);
+}
- /* A, matches D */
- q->prod_head++;
- return 0;
+static inline bool xskq_cons_peek_desc(struct xsk_queue *q,
+ struct xdp_desc *desc,
+ struct xdp_umem *umem)
+{
+ if (q->cached_prod == q->cached_cons)
+ xskq_cons_get_entries(q);
+ return xskq_cons_read_desc(q, desc, umem);
}
-/* Rx/Tx queue */
+static inline void xskq_cons_release(struct xsk_queue *q)
+{
+ /* To improve performance, only update local state here.
+ * Reflect this to global state when we get new entries
+ * from the ring in xskq_cons_get_entries().
+ */
+ q->cached_cons++;
+}
-static inline bool xskq_is_valid_desc(struct xsk_queue *q, struct xdp_desc *d,
- struct xdp_umem *umem)
+static inline bool xskq_cons_is_full(struct xsk_queue *q)
{
- if (umem->flags & XDP_UMEM_UNALIGNED_CHUNK_FLAG) {
- if (!xskq_is_valid_addr_unaligned(q, d->addr, d->len, umem))
- return false;
+ /* No barriers needed since data is not accessed */
+ return READ_ONCE(q->ring->producer) - READ_ONCE(q->ring->consumer) ==
+ q->nentries;
+}
- if (d->len > umem->chunk_size_nohr || d->options) {
- q->invalid_descs++;
- return false;
- }
+/* Functions for producers */
- return true;
- }
+static inline bool xskq_prod_is_full(struct xsk_queue *q)
+{
+ u32 free_entries = q->nentries - (q->cached_prod - q->cached_cons);
- if (!xskq_is_valid_addr(q, d->addr))
+ if (free_entries)
return false;
- if (((d->addr + d->len) & q->chunk_mask) != (d->addr & q->chunk_mask) ||
- d->options) {
- q->invalid_descs++;
- return false;
- }
+ /* Refresh the local tail pointer */
+ q->cached_cons = READ_ONCE(q->ring->consumer);
+ free_entries = q->nentries - (q->cached_prod - q->cached_cons);
- return true;
+ return !free_entries;
}
-static inline struct xdp_desc *xskq_validate_desc(struct xsk_queue *q,
- struct xdp_desc *desc,
- struct xdp_umem *umem)
+static inline int xskq_prod_reserve(struct xsk_queue *q)
{
- while (q->cons_tail != q->cons_head) {
- struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
- unsigned int idx = q->cons_tail & q->ring_mask;
-
- *desc = READ_ONCE(ring->desc[idx]);
- if (xskq_is_valid_desc(q, desc, umem))
- return desc;
-
- q->cons_tail++;
- }
+ if (xskq_prod_is_full(q))
+ return -ENOSPC;
- return NULL;
+ /* A, matches D */
+ q->cached_prod++;
+ return 0;
}
-static inline struct xdp_desc *xskq_peek_desc(struct xsk_queue *q,
- struct xdp_desc *desc,
- struct xdp_umem *umem)
+static inline int xskq_prod_reserve_addr(struct xsk_queue *q, u64 addr)
{
- if (q->cons_tail == q->cons_head) {
- smp_mb(); /* D, matches A */
- WRITE_ONCE(q->ring->consumer, q->cons_tail);
- q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE);
-
- /* Order consumer and data */
- smp_rmb(); /* C, matches B */
- }
+ struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
- return xskq_validate_desc(q, desc, umem);
-}
+ if (xskq_prod_is_full(q))
+ return -ENOSPC;
-static inline void xskq_discard_desc(struct xsk_queue *q)
-{
- q->cons_tail++;
+ /* A, matches D */
+ ring->desc[q->cached_prod++ & q->ring_mask] = addr;
+ return 0;
}
-static inline int xskq_produce_batch_desc(struct xsk_queue *q,
- u64 addr, u32 len)
+static inline int xskq_prod_reserve_desc(struct xsk_queue *q,
+ u64 addr, u32 len)
{
struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
- unsigned int idx;
+ u32 idx;
- if (xskq_nb_free(q, q->prod_head, 1) == 0)
+ if (xskq_prod_is_full(q))
return -ENOSPC;
/* A, matches D */
- idx = (q->prod_head++) & q->ring_mask;
+ idx = q->cached_prod++ & q->ring_mask;
ring->desc[idx].addr = addr;
ring->desc[idx].len = len;
return 0;
}
-static inline void xskq_produce_flush_desc(struct xsk_queue *q)
+static inline void __xskq_prod_submit(struct xsk_queue *q, u32 idx)
{
- /* Order producer and data */
smp_wmb(); /* B, matches C */
- q->prod_tail = q->prod_head;
- WRITE_ONCE(q->ring->producer, q->prod_tail);
+ WRITE_ONCE(q->ring->producer, idx);
+}
+
+static inline void xskq_prod_submit(struct xsk_queue *q)
+{
+ __xskq_prod_submit(q, q->cached_prod);
+}
+
+static inline void xskq_prod_submit_addr(struct xsk_queue *q, u64 addr)
+{
+ struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
+ u32 idx = q->ring->producer;
+
+ ring->desc[idx++ & q->ring_mask] = addr;
+
+ __xskq_prod_submit(q, idx);
}
-static inline bool xskq_full_desc(struct xsk_queue *q)
+static inline void xskq_prod_submit_n(struct xsk_queue *q, u32 nb_entries)
{
- return xskq_nb_avail(q, q->nentries) == q->nentries;
+ __xskq_prod_submit(q, q->ring->producer + nb_entries);
}
-static inline bool xskq_empty_desc(struct xsk_queue *q)
+static inline bool xskq_prod_is_empty(struct xsk_queue *q)
{
- return xskq_nb_free(q, q->prod_tail, q->nentries) == q->nentries;
+ /* No barriers needed since data is not accessed */
+ return READ_ONCE(q->ring->consumer) == READ_ONCE(q->ring->producer);
+}
+
+/* For both producers and consumers */
+
+static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q)
+{
+ return q ? q->invalid_descs : 0;
}
void xskq_set_umem(struct xsk_queue *q, u64 size, u64 chunk_mask);