aboutsummaryrefslogtreecommitdiffstats
path: root/net/xdp/xsk.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/xdp/xsk.c')
-rw-r--r--net/xdp/xsk.c809
1 files changed, 546 insertions, 263 deletions
diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c
index 356f90e4522b..9f0561b67c12 100644
--- a/net/xdp/xsk.c
+++ b/net/xdp/xsk.c
@@ -22,170 +22,186 @@
#include <linux/net.h>
#include <linux/netdevice.h>
#include <linux/rculist.h>
-#include <net/xdp_sock.h>
+#include <net/xdp_sock_drv.h>
+#include <net/busy_poll.h>
#include <net/xdp.h>
#include "xsk_queue.h"
#include "xdp_umem.h"
#include "xsk.h"
-#define TX_BATCH_SIZE 16
+#define TX_BATCH_SIZE 32
static DEFINE_PER_CPU(struct list_head, xskmap_flush_list);
-bool xsk_is_setup_for_bpf_map(struct xdp_sock *xs)
+void xsk_set_rx_need_wakeup(struct xsk_buff_pool *pool)
{
- return READ_ONCE(xs->rx) && READ_ONCE(xs->umem) &&
- READ_ONCE(xs->umem->fq);
-}
-
-bool xsk_umem_has_addrs(struct xdp_umem *umem, u32 cnt)
-{
- return xskq_cons_has_entries(umem->fq, cnt);
-}
-EXPORT_SYMBOL(xsk_umem_has_addrs);
-
-bool xsk_umem_peek_addr(struct xdp_umem *umem, u64 *addr)
-{
- return xskq_cons_peek_addr(umem->fq, addr, umem);
-}
-EXPORT_SYMBOL(xsk_umem_peek_addr);
-
-void xsk_umem_release_addr(struct xdp_umem *umem)
-{
- xskq_cons_release(umem->fq);
-}
-EXPORT_SYMBOL(xsk_umem_release_addr);
-
-void xsk_set_rx_need_wakeup(struct xdp_umem *umem)
-{
- if (umem->need_wakeup & XDP_WAKEUP_RX)
+ if (pool->cached_need_wakeup & XDP_WAKEUP_RX)
return;
- umem->fq->ring->flags |= XDP_RING_NEED_WAKEUP;
- umem->need_wakeup |= XDP_WAKEUP_RX;
+ pool->fq->ring->flags |= XDP_RING_NEED_WAKEUP;
+ pool->cached_need_wakeup |= XDP_WAKEUP_RX;
}
EXPORT_SYMBOL(xsk_set_rx_need_wakeup);
-void xsk_set_tx_need_wakeup(struct xdp_umem *umem)
+void xsk_set_tx_need_wakeup(struct xsk_buff_pool *pool)
{
struct xdp_sock *xs;
- if (umem->need_wakeup & XDP_WAKEUP_TX)
+ if (pool->cached_need_wakeup & XDP_WAKEUP_TX)
return;
rcu_read_lock();
- list_for_each_entry_rcu(xs, &umem->xsk_list, list) {
+ list_for_each_entry_rcu(xs, &pool->xsk_tx_list, tx_list) {
xs->tx->ring->flags |= XDP_RING_NEED_WAKEUP;
}
rcu_read_unlock();
- umem->need_wakeup |= XDP_WAKEUP_TX;
+ pool->cached_need_wakeup |= XDP_WAKEUP_TX;
}
EXPORT_SYMBOL(xsk_set_tx_need_wakeup);
-void xsk_clear_rx_need_wakeup(struct xdp_umem *umem)
+void xsk_clear_rx_need_wakeup(struct xsk_buff_pool *pool)
{
- if (!(umem->need_wakeup & XDP_WAKEUP_RX))
+ if (!(pool->cached_need_wakeup & XDP_WAKEUP_RX))
return;
- umem->fq->ring->flags &= ~XDP_RING_NEED_WAKEUP;
- umem->need_wakeup &= ~XDP_WAKEUP_RX;
+ pool->fq->ring->flags &= ~XDP_RING_NEED_WAKEUP;
+ pool->cached_need_wakeup &= ~XDP_WAKEUP_RX;
}
EXPORT_SYMBOL(xsk_clear_rx_need_wakeup);
-void xsk_clear_tx_need_wakeup(struct xdp_umem *umem)
+void xsk_clear_tx_need_wakeup(struct xsk_buff_pool *pool)
{
struct xdp_sock *xs;
- if (!(umem->need_wakeup & XDP_WAKEUP_TX))
+ if (!(pool->cached_need_wakeup & XDP_WAKEUP_TX))
return;
rcu_read_lock();
- list_for_each_entry_rcu(xs, &umem->xsk_list, list) {
+ list_for_each_entry_rcu(xs, &pool->xsk_tx_list, tx_list) {
xs->tx->ring->flags &= ~XDP_RING_NEED_WAKEUP;
}
rcu_read_unlock();
- umem->need_wakeup &= ~XDP_WAKEUP_TX;
+ pool->cached_need_wakeup &= ~XDP_WAKEUP_TX;
}
EXPORT_SYMBOL(xsk_clear_tx_need_wakeup);
-bool xsk_umem_uses_need_wakeup(struct xdp_umem *umem)
+bool xsk_uses_need_wakeup(struct xsk_buff_pool *pool)
+{
+ return pool->uses_need_wakeup;
+}
+EXPORT_SYMBOL(xsk_uses_need_wakeup);
+
+struct xsk_buff_pool *xsk_get_pool_from_qid(struct net_device *dev,
+ u16 queue_id)
+{
+ if (queue_id < dev->real_num_rx_queues)
+ return dev->_rx[queue_id].pool;
+ if (queue_id < dev->real_num_tx_queues)
+ return dev->_tx[queue_id].pool;
+
+ return NULL;
+}
+EXPORT_SYMBOL(xsk_get_pool_from_qid);
+
+void xsk_clear_pool_at_qid(struct net_device *dev, u16 queue_id)
{
- return umem->flags & XDP_UMEM_USES_NEED_WAKEUP;
+ if (queue_id < dev->num_rx_queues)
+ dev->_rx[queue_id].pool = NULL;
+ if (queue_id < dev->num_tx_queues)
+ dev->_tx[queue_id].pool = NULL;
}
-EXPORT_SYMBOL(xsk_umem_uses_need_wakeup);
-/* If a buffer crosses a page boundary, we need to do 2 memcpy's, one for
- * each page. This is only required in copy mode.
+/* The buffer pool is stored both in the _rx struct and the _tx struct as we do
+ * not know if the device has more tx queues than rx, or the opposite.
+ * This might also change during run time.
*/
-static void __xsk_rcv_memcpy(struct xdp_umem *umem, u64 addr, void *from_buf,
- u32 len, u32 metalen)
+int xsk_reg_pool_at_qid(struct net_device *dev, struct xsk_buff_pool *pool,
+ u16 queue_id)
{
- void *to_buf = xdp_umem_get_data(umem, addr);
+ if (queue_id >= max_t(unsigned int,
+ dev->real_num_rx_queues,
+ dev->real_num_tx_queues))
+ return -EINVAL;
- addr = xsk_umem_add_offset_to_addr(addr);
- if (xskq_cons_crosses_non_contig_pg(umem, addr, len + metalen)) {
- void *next_pg_addr = umem->pages[(addr >> PAGE_SHIFT) + 1].addr;
- u64 page_start = addr & ~(PAGE_SIZE - 1);
- u64 first_len = PAGE_SIZE - (addr - page_start);
+ if (queue_id < dev->real_num_rx_queues)
+ dev->_rx[queue_id].pool = pool;
+ if (queue_id < dev->real_num_tx_queues)
+ dev->_tx[queue_id].pool = pool;
- memcpy(to_buf, from_buf, first_len + metalen);
- memcpy(next_pg_addr, from_buf + first_len, len - first_len);
+ return 0;
+}
- return;
+static int __xsk_rcv_zc(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len)
+{
+ struct xdp_buff_xsk *xskb = container_of(xdp, struct xdp_buff_xsk, xdp);
+ u64 addr;
+ int err;
+
+ addr = xp_get_handle(xskb);
+ err = xskq_prod_reserve_desc(xs->rx, addr, len);
+ if (err) {
+ xs->rx_queue_full++;
+ return err;
}
- memcpy(to_buf, from_buf, len + metalen);
+ xp_release(xskb);
+ return 0;
}
-static int __xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len)
+static void xsk_copy_xdp(struct xdp_buff *to, struct xdp_buff *from, u32 len)
{
- u64 offset = xs->umem->headroom;
- u64 addr, memcpy_addr;
- void *from_buf;
+ void *from_buf, *to_buf;
u32 metalen;
+
+ if (unlikely(xdp_data_meta_unsupported(from))) {
+ from_buf = from->data;
+ to_buf = to->data;
+ metalen = 0;
+ } else {
+ from_buf = from->data_meta;
+ metalen = from->data - from->data_meta;
+ to_buf = to->data - metalen;
+ }
+
+ memcpy(to_buf, from_buf, len + metalen);
+}
+
+static int __xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
+{
+ struct xdp_buff *xsk_xdp;
int err;
+ u32 len;
- if (!xskq_cons_peek_addr(xs->umem->fq, &addr, xs->umem) ||
- len > xs->umem->chunk_size_nohr - XDP_PACKET_HEADROOM) {
+ len = xdp->data_end - xdp->data;
+ if (len > xsk_pool_get_rx_frame_size(xs->pool)) {
xs->rx_dropped++;
return -ENOSPC;
}
- if (unlikely(xdp_data_meta_unsupported(xdp))) {
- from_buf = xdp->data;
- metalen = 0;
- } else {
- from_buf = xdp->data_meta;
- metalen = xdp->data - xdp->data_meta;
+ xsk_xdp = xsk_buff_alloc(xs->pool);
+ if (!xsk_xdp) {
+ xs->rx_dropped++;
+ return -ENOMEM;
}
- memcpy_addr = xsk_umem_adjust_offset(xs->umem, addr, offset);
- __xsk_rcv_memcpy(xs->umem, memcpy_addr, from_buf, len, metalen);
-
- offset += metalen;
- addr = xsk_umem_adjust_offset(xs->umem, addr, offset);
- err = xskq_prod_reserve_desc(xs->rx, addr, len);
- if (!err) {
- xskq_cons_release(xs->umem->fq);
- xdp_return_buff(xdp);
- return 0;
+ xsk_copy_xdp(xsk_xdp, xdp, len);
+ err = __xsk_rcv_zc(xs, xsk_xdp, len);
+ if (err) {
+ xsk_buff_free(xsk_xdp);
+ return err;
}
-
- xs->rx_dropped++;
- return err;
+ return 0;
}
-static int __xsk_rcv_zc(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len)
+static bool xsk_tx_writeable(struct xdp_sock *xs)
{
- int err = xskq_prod_reserve_desc(xs->rx, xdp->handle, len);
+ if (xskq_cons_present_entries(xs->tx) > xs->tx->nentries / 2)
+ return false;
- if (err)
- xs->rx_dropped++;
-
- return err;
+ return true;
}
static bool xsk_is_bound(struct xdp_sock *xs)
@@ -198,72 +214,56 @@ static bool xsk_is_bound(struct xdp_sock *xs)
return false;
}
-static int xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
+static int xsk_rcv_check(struct xdp_sock *xs, struct xdp_buff *xdp)
{
- u32 len;
-
if (!xsk_is_bound(xs))
- return -EINVAL;
+ return -ENXIO;
if (xs->dev != xdp->rxq->dev || xs->queue_id != xdp->rxq->queue_index)
return -EINVAL;
- len = xdp->data_end - xdp->data;
-
- return (xdp->rxq->mem.type == MEM_TYPE_ZERO_COPY) ?
- __xsk_rcv_zc(xs, xdp, len) : __xsk_rcv(xs, xdp, len);
+ sk_mark_napi_id_once_xdp(&xs->sk, xdp);
+ return 0;
}
static void xsk_flush(struct xdp_sock *xs)
{
xskq_prod_submit(xs->rx);
- __xskq_cons_release(xs->umem->fq);
+ __xskq_cons_release(xs->pool->fq);
sock_def_readable(&xs->sk);
}
int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
{
- u32 metalen = xdp->data - xdp->data_meta;
- u32 len = xdp->data_end - xdp->data;
- u64 offset = xs->umem->headroom;
- void *buffer;
- u64 addr;
int err;
spin_lock_bh(&xs->rx_lock);
-
- if (xs->dev != xdp->rxq->dev || xs->queue_id != xdp->rxq->queue_index) {
- err = -EINVAL;
- goto out_unlock;
- }
-
- if (!xskq_cons_peek_addr(xs->umem->fq, &addr, xs->umem) ||
- len > xs->umem->chunk_size_nohr - XDP_PACKET_HEADROOM) {
- err = -ENOSPC;
- goto out_drop;
+ err = xsk_rcv_check(xs, xdp);
+ if (!err) {
+ err = __xsk_rcv(xs, xdp);
+ xsk_flush(xs);
}
+ spin_unlock_bh(&xs->rx_lock);
+ return err;
+}
- addr = xsk_umem_adjust_offset(xs->umem, addr, offset);
- buffer = xdp_umem_get_data(xs->umem, addr);
- memcpy(buffer, xdp->data_meta, len + metalen);
+static int xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
+{
+ int err;
+ u32 len;
- addr = xsk_umem_adjust_offset(xs->umem, addr, metalen);
- err = xskq_prod_reserve_desc(xs->rx, addr, len);
+ err = xsk_rcv_check(xs, xdp);
if (err)
- goto out_drop;
-
- xskq_cons_release(xs->umem->fq);
- xskq_prod_submit(xs->rx);
-
- spin_unlock_bh(&xs->rx_lock);
+ return err;
- xs->sk.sk_data_ready(&xs->sk);
- return 0;
+ if (xdp->rxq->mem.type == MEM_TYPE_XSK_BUFF_POOL) {
+ len = xdp->data_end - xdp->data;
+ return __xsk_rcv_zc(xs, xdp, len);
+ }
-out_drop:
- xs->rx_dropped++;
-out_unlock:
- spin_unlock_bh(&xs->rx_lock);
+ err = __xsk_rcv(xs, xdp);
+ if (!err)
+ xdp_return_buff(xdp);
return err;
}
@@ -293,40 +293,43 @@ void __xsk_map_flush(void)
}
}
-void xsk_umem_complete_tx(struct xdp_umem *umem, u32 nb_entries)
+void xsk_tx_completed(struct xsk_buff_pool *pool, u32 nb_entries)
{
- xskq_prod_submit_n(umem->cq, nb_entries);
+ xskq_prod_submit_n(pool->cq, nb_entries);
}
-EXPORT_SYMBOL(xsk_umem_complete_tx);
+EXPORT_SYMBOL(xsk_tx_completed);
-void xsk_umem_consume_tx_done(struct xdp_umem *umem)
+void xsk_tx_release(struct xsk_buff_pool *pool)
{
struct xdp_sock *xs;
rcu_read_lock();
- list_for_each_entry_rcu(xs, &umem->xsk_list, list) {
+ list_for_each_entry_rcu(xs, &pool->xsk_tx_list, tx_list) {
__xskq_cons_release(xs->tx);
- xs->sk.sk_write_space(&xs->sk);
+ if (xsk_tx_writeable(xs))
+ xs->sk.sk_write_space(&xs->sk);
}
rcu_read_unlock();
}
-EXPORT_SYMBOL(xsk_umem_consume_tx_done);
+EXPORT_SYMBOL(xsk_tx_release);
-bool xsk_umem_consume_tx(struct xdp_umem *umem, struct xdp_desc *desc)
+bool xsk_tx_peek_desc(struct xsk_buff_pool *pool, struct xdp_desc *desc)
{
struct xdp_sock *xs;
rcu_read_lock();
- list_for_each_entry_rcu(xs, &umem->xsk_list, list) {
- if (!xskq_cons_peek_desc(xs->tx, desc, umem))
+ list_for_each_entry_rcu(xs, &pool->xsk_tx_list, tx_list) {
+ if (!xskq_cons_peek_desc(xs->tx, desc, pool)) {
+ xs->tx->queue_empty_descs++;
continue;
+ }
- /* This is the backpreassure mechanism for the Tx path.
+ /* This is the backpressure mechanism for the Tx path.
* Reserve space in the completion queue and only proceed
* if there is space in it. This avoids having to implement
* any buffering in the Tx path.
*/
- if (xskq_prod_reserve_addr(umem->cq, desc->addr))
+ if (xskq_prod_reserve_addr(pool->cq, desc->addr))
goto out;
xskq_cons_release(xs->tx);
@@ -338,23 +341,70 @@ out:
rcu_read_unlock();
return false;
}
-EXPORT_SYMBOL(xsk_umem_consume_tx);
+EXPORT_SYMBOL(xsk_tx_peek_desc);
-static int xsk_wakeup(struct xdp_sock *xs, u8 flags)
+static u32 xsk_tx_peek_release_fallback(struct xsk_buff_pool *pool, u32 max_entries)
{
- struct net_device *dev = xs->dev;
- int err;
+ struct xdp_desc *descs = pool->tx_descs;
+ u32 nb_pkts = 0;
+
+ while (nb_pkts < max_entries && xsk_tx_peek_desc(pool, &descs[nb_pkts]))
+ nb_pkts++;
+
+ xsk_tx_release(pool);
+ return nb_pkts;
+}
+
+u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, u32 nb_pkts)
+{
+ struct xdp_sock *xs;
rcu_read_lock();
- err = dev->netdev_ops->ndo_xsk_wakeup(dev, xs->queue_id, flags);
- rcu_read_unlock();
+ if (!list_is_singular(&pool->xsk_tx_list)) {
+ /* Fallback to the non-batched version */
+ rcu_read_unlock();
+ return xsk_tx_peek_release_fallback(pool, nb_pkts);
+ }
- return err;
+ xs = list_first_or_null_rcu(&pool->xsk_tx_list, struct xdp_sock, tx_list);
+ if (!xs) {
+ nb_pkts = 0;
+ goto out;
+ }
+
+ nb_pkts = xskq_cons_nb_entries(xs->tx, nb_pkts);
+
+ /* This is the backpressure mechanism for the Tx path. Try to
+ * reserve space in the completion queue for all packets, but
+ * if there are fewer slots available, just process that many
+ * packets. This avoids having to implement any buffering in
+ * the Tx path.
+ */
+ nb_pkts = xskq_prod_nb_free(pool->cq, nb_pkts);
+ if (!nb_pkts)
+ goto out;
+
+ nb_pkts = xskq_cons_read_desc_batch(xs->tx, pool, nb_pkts);
+ if (!nb_pkts) {
+ xs->tx->queue_empty_descs++;
+ goto out;
+ }
+
+ __xskq_cons_release(xs->tx);
+ xskq_prod_write_addr_batch(pool->cq, pool->tx_descs, nb_pkts);
+ xs->sk.sk_write_space(&xs->sk);
+
+out:
+ rcu_read_unlock();
+ return nb_pkts;
}
+EXPORT_SYMBOL(xsk_tx_peek_release_desc_batch);
-static int xsk_zc_xmit(struct xdp_sock *xs)
+static int xsk_wakeup(struct xdp_sock *xs, u8 flags)
{
- return xsk_wakeup(xs, XDP_WAKEUP_TX);
+ struct net_device *dev = xs->dev;
+
+ return dev->netdev_ops->ndo_xsk_wakeup(dev, xs->queue_id, flags);
}
static void xsk_destruct_skb(struct sk_buff *skb)
@@ -363,13 +413,104 @@ static void xsk_destruct_skb(struct sk_buff *skb)
struct xdp_sock *xs = xdp_sk(skb->sk);
unsigned long flags;
- spin_lock_irqsave(&xs->tx_completion_lock, flags);
- xskq_prod_submit_addr(xs->umem->cq, addr);
- spin_unlock_irqrestore(&xs->tx_completion_lock, flags);
+ spin_lock_irqsave(&xs->pool->cq_lock, flags);
+ xskq_prod_submit_addr(xs->pool->cq, addr);
+ spin_unlock_irqrestore(&xs->pool->cq_lock, flags);
sock_wfree(skb);
}
+static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs,
+ struct xdp_desc *desc)
+{
+ struct xsk_buff_pool *pool = xs->pool;
+ u32 hr, len, ts, offset, copy, copied;
+ struct sk_buff *skb;
+ struct page *page;
+ void *buffer;
+ int err, i;
+ u64 addr;
+
+ hr = max(NET_SKB_PAD, L1_CACHE_ALIGN(xs->dev->needed_headroom));
+
+ skb = sock_alloc_send_skb(&xs->sk, hr, 1, &err);
+ if (unlikely(!skb))
+ return ERR_PTR(err);
+
+ skb_reserve(skb, hr);
+
+ addr = desc->addr;
+ len = desc->len;
+ ts = pool->unaligned ? len : pool->chunk_size;
+
+ buffer = xsk_buff_raw_get_data(pool, addr);
+ offset = offset_in_page(buffer);
+ addr = buffer - pool->addrs;
+
+ for (copied = 0, i = 0; copied < len; i++) {
+ page = pool->umem->pgs[addr >> PAGE_SHIFT];
+ get_page(page);
+
+ copy = min_t(u32, PAGE_SIZE - offset, len - copied);
+ skb_fill_page_desc(skb, i, page, offset, copy);
+
+ copied += copy;
+ addr += copy;
+ offset = 0;
+ }
+
+ skb->len += len;
+ skb->data_len += len;
+ skb->truesize += ts;
+
+ refcount_add(ts, &xs->sk.sk_wmem_alloc);
+
+ return skb;
+}
+
+static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
+ struct xdp_desc *desc)
+{
+ struct net_device *dev = xs->dev;
+ struct sk_buff *skb;
+
+ if (dev->priv_flags & IFF_TX_SKB_NO_LINEAR) {
+ skb = xsk_build_skb_zerocopy(xs, desc);
+ if (IS_ERR(skb))
+ return skb;
+ } else {
+ u32 hr, tr, len;
+ void *buffer;
+ int err;
+
+ hr = max(NET_SKB_PAD, L1_CACHE_ALIGN(dev->needed_headroom));
+ tr = dev->needed_tailroom;
+ len = desc->len;
+
+ skb = sock_alloc_send_skb(&xs->sk, hr + len + tr, 1, &err);
+ if (unlikely(!skb))
+ return ERR_PTR(err);
+
+ skb_reserve(skb, hr);
+ skb_put(skb, len);
+
+ buffer = xsk_buff_raw_get_data(xs->pool, desc->addr);
+ err = skb_store_bits(skb, 0, buffer, len);
+ if (unlikely(err)) {
+ kfree_skb(skb);
+ return ERR_PTR(err);
+ }
+ }
+
+ skb->dev = dev;
+ skb->priority = xs->sk.sk_priority;
+ skb->mark = xs->sk.sk_mark;
+ skb_shinfo(skb)->destructor_arg = (void *)(long)desc->addr;
+ skb->destructor = xsk_destruct_skb;
+
+ return skb;
+}
+
static int xsk_generic_xmit(struct sock *sk)
{
struct xdp_sock *xs = xdp_sk(sk);
@@ -377,54 +518,63 @@ static int xsk_generic_xmit(struct sock *sk)
bool sent_frame = false;
struct xdp_desc desc;
struct sk_buff *skb;
+ unsigned long flags;
int err = 0;
mutex_lock(&xs->mutex);
- if (xs->queue_id >= xs->dev->real_num_tx_queues)
+ /* Since we dropped the RCU read lock, the socket state might have changed. */
+ if (unlikely(!xsk_is_bound(xs))) {
+ err = -ENXIO;
goto out;
+ }
- while (xskq_cons_peek_desc(xs->tx, &desc, xs->umem)) {
- char *buffer;
- u64 addr;
- u32 len;
+ if (xs->queue_id >= xs->dev->real_num_tx_queues)
+ goto out;
+ while (xskq_cons_peek_desc(xs->tx, &desc, xs->pool)) {
if (max_batch-- == 0) {
err = -EAGAIN;
goto out;
}
- len = desc.len;
- skb = sock_alloc_send_skb(sk, len, 1, &err);
- if (unlikely(!skb)) {
- err = -EAGAIN;
- goto out;
- }
-
- skb_put(skb, len);
- addr = desc.addr;
- buffer = xdp_umem_get_data(xs->umem, addr);
- err = skb_store_bits(skb, 0, buffer, len);
- /* This is the backpreassure mechanism for the Tx path.
+ /* This is the backpressure mechanism for the Tx path.
* Reserve space in the completion queue and only proceed
* if there is space in it. This avoids having to implement
* any buffering in the Tx path.
*/
- if (unlikely(err) || xskq_prod_reserve(xs->umem->cq)) {
- kfree_skb(skb);
+ spin_lock_irqsave(&xs->pool->cq_lock, flags);
+ if (xskq_prod_reserve(xs->pool->cq)) {
+ spin_unlock_irqrestore(&xs->pool->cq_lock, flags);
+ goto out;
+ }
+ spin_unlock_irqrestore(&xs->pool->cq_lock, flags);
+
+ skb = xsk_build_skb(xs, &desc);
+ if (IS_ERR(skb)) {
+ err = PTR_ERR(skb);
+ spin_lock_irqsave(&xs->pool->cq_lock, flags);
+ xskq_prod_cancel(xs->pool->cq);
+ spin_unlock_irqrestore(&xs->pool->cq_lock, flags);
goto out;
}
- skb->dev = xs->dev;
- skb->priority = sk->sk_priority;
- skb->mark = sk->sk_mark;
- skb_shinfo(skb)->destructor_arg = (void *)(long)desc.addr;
- skb->destructor = xsk_destruct_skb;
+ err = __dev_direct_xmit(skb, xs->queue_id);
+ if (err == NETDEV_TX_BUSY) {
+ /* Tell user-space to retry the send */
+ skb->destructor = sock_wfree;
+ spin_lock_irqsave(&xs->pool->cq_lock, flags);
+ xskq_prod_cancel(xs->pool->cq);
+ spin_unlock_irqrestore(&xs->pool->cq_lock, flags);
+ /* Free skb without triggering the perf drop trace */
+ consume_skb(skb);
+ err = -EAGAIN;
+ goto out;
+ }
- err = dev_direct_xmit(skb, xs->queue_id);
xskq_cons_release(xs->tx);
/* Ignore NET_XMIT_CN as packet might have been sent */
- if (err == NET_XMIT_DROP || err == NETDEV_TX_BUSY) {
+ if (err == NET_XMIT_DROP) {
/* SKB completed but not sent */
err = -EBUSY;
goto out;
@@ -433,66 +583,157 @@ static int xsk_generic_xmit(struct sock *sk)
sent_frame = true;
}
+ xs->tx->queue_empty_descs++;
+
out:
if (sent_frame)
- sk->sk_write_space(sk);
+ if (xsk_tx_writeable(xs))
+ sk->sk_write_space(sk);
mutex_unlock(&xs->mutex);
return err;
}
-static int __xsk_sendmsg(struct sock *sk)
+static int xsk_xmit(struct sock *sk)
{
struct xdp_sock *xs = xdp_sk(sk);
+ int ret;
if (unlikely(!(xs->dev->flags & IFF_UP)))
return -ENETDOWN;
if (unlikely(!xs->tx))
return -ENOBUFS;
- return xs->zc ? xsk_zc_xmit(xs) : xsk_generic_xmit(sk);
+ if (xs->zc)
+ return xsk_wakeup(xs, XDP_WAKEUP_TX);
+
+ /* Drop the RCU lock since the SKB path might sleep. */
+ rcu_read_unlock();
+ ret = xsk_generic_xmit(sk);
+ /* Reaquire RCU lock before going into common code. */
+ rcu_read_lock();
+
+ return ret;
}
-static int xsk_sendmsg(struct socket *sock, struct msghdr *m, size_t total_len)
+static bool xsk_no_wakeup(struct sock *sk)
+{
+#ifdef CONFIG_NET_RX_BUSY_POLL
+ /* Prefer busy-polling, skip the wakeup. */
+ return READ_ONCE(sk->sk_prefer_busy_poll) && READ_ONCE(sk->sk_ll_usec) &&
+ READ_ONCE(sk->sk_napi_id) >= MIN_NAPI_ID;
+#else
+ return false;
+#endif
+}
+
+static int __xsk_sendmsg(struct socket *sock, struct msghdr *m, size_t total_len)
{
bool need_wait = !(m->msg_flags & MSG_DONTWAIT);
struct sock *sk = sock->sk;
struct xdp_sock *xs = xdp_sk(sk);
+ struct xsk_buff_pool *pool;
if (unlikely(!xsk_is_bound(xs)))
return -ENXIO;
if (unlikely(need_wait))
return -EOPNOTSUPP;
- return __xsk_sendmsg(sk);
+ if (sk_can_busy_loop(sk)) {
+ if (xs->zc)
+ __sk_mark_napi_id_once(sk, xsk_pool_get_napi_id(xs->pool));
+ sk_busy_loop(sk, 1); /* only support non-blocking sockets */
+ }
+
+ if (xs->zc && xsk_no_wakeup(sk))
+ return 0;
+
+ pool = xs->pool;
+ if (pool->cached_need_wakeup & XDP_WAKEUP_TX)
+ return xsk_xmit(sk);
+ return 0;
+}
+
+static int xsk_sendmsg(struct socket *sock, struct msghdr *m, size_t total_len)
+{
+ int ret;
+
+ rcu_read_lock();
+ ret = __xsk_sendmsg(sock, m, total_len);
+ rcu_read_unlock();
+
+ return ret;
+}
+
+static int __xsk_recvmsg(struct socket *sock, struct msghdr *m, size_t len, int flags)
+{
+ bool need_wait = !(flags & MSG_DONTWAIT);
+ struct sock *sk = sock->sk;
+ struct xdp_sock *xs = xdp_sk(sk);
+
+ if (unlikely(!xsk_is_bound(xs)))
+ return -ENXIO;
+ if (unlikely(!(xs->dev->flags & IFF_UP)))
+ return -ENETDOWN;
+ if (unlikely(!xs->rx))
+ return -ENOBUFS;
+ if (unlikely(need_wait))
+ return -EOPNOTSUPP;
+
+ if (sk_can_busy_loop(sk))
+ sk_busy_loop(sk, 1); /* only support non-blocking sockets */
+
+ if (xsk_no_wakeup(sk))
+ return 0;
+
+ if (xs->pool->cached_need_wakeup & XDP_WAKEUP_RX && xs->zc)
+ return xsk_wakeup(xs, XDP_WAKEUP_RX);
+ return 0;
+}
+
+static int xsk_recvmsg(struct socket *sock, struct msghdr *m, size_t len, int flags)
+{
+ int ret;
+
+ rcu_read_lock();
+ ret = __xsk_recvmsg(sock, m, len, flags);
+ rcu_read_unlock();
+
+ return ret;
}
static __poll_t xsk_poll(struct file *file, struct socket *sock,
struct poll_table_struct *wait)
{
- __poll_t mask = datagram_poll(file, sock, wait);
+ __poll_t mask = 0;
struct sock *sk = sock->sk;
struct xdp_sock *xs = xdp_sk(sk);
- struct xdp_umem *umem;
+ struct xsk_buff_pool *pool;
- if (unlikely(!xsk_is_bound(xs)))
+ sock_poll_wait(file, sock, wait);
+
+ rcu_read_lock();
+ if (unlikely(!xsk_is_bound(xs))) {
+ rcu_read_unlock();
return mask;
+ }
- umem = xs->umem;
+ pool = xs->pool;
- if (umem->need_wakeup) {
+ if (pool->cached_need_wakeup) {
if (xs->zc)
- xsk_wakeup(xs, umem->need_wakeup);
+ xsk_wakeup(xs, pool->cached_need_wakeup);
else
/* Poll needs to drive Tx also in copy mode */
- __xsk_sendmsg(sk);
+ xsk_xmit(sk);
}
if (xs->rx && !xskq_prod_is_empty(xs->rx))
mask |= EPOLLIN | EPOLLRDNORM;
- if (xs->tx && !xskq_cons_is_full(xs->tx))
+ if (xs->tx && xsk_tx_writeable(xs))
mask |= EPOLLOUT | EPOLLWRNORM;
+ rcu_read_unlock();
return mask;
}
@@ -523,14 +764,13 @@ static void xsk_unbind_dev(struct xdp_sock *xs)
WRITE_ONCE(xs->state, XSK_UNBOUND);
/* Wait for driver to stop using the xdp socket. */
- xdp_del_sk_umem(xs->umem, xs);
- xs->dev = NULL;
+ xp_del_xsk(xs->pool, xs);
synchronize_net();
dev_put(dev);
}
static struct xsk_map *xsk_get_map_list_entry(struct xdp_sock *xs,
- struct xdp_sock ***map_entry)
+ struct xdp_sock __rcu ***map_entry)
{
struct xsk_map *map = NULL;
struct xsk_map_node *node;
@@ -541,7 +781,7 @@ static struct xsk_map *xsk_get_map_list_entry(struct xdp_sock *xs,
node = list_first_entry_or_null(&xs->map_list, struct xsk_map_node,
node);
if (node) {
- WARN_ON(xsk_map_inc(node->map));
+ bpf_map_inc(&node->map->map);
map = node->map;
*map_entry = node->map_entry;
}
@@ -566,12 +806,12 @@ static void xsk_delete_from_maps(struct xdp_sock *xs)
* might be updates to the map between
* xsk_get_map_list_entry() and xsk_map_try_sock_delete().
*/
- struct xdp_sock **map_entry = NULL;
+ struct xdp_sock __rcu **map_entry = NULL;
struct xsk_map *map;
while ((map = xsk_get_map_list_entry(xs, &map_entry))) {
xsk_map_try_sock_delete(map, xs, map_entry);
- xsk_map_put(map);
+ bpf_map_put(&map->map);
}
}
@@ -590,9 +830,7 @@ static int xsk_release(struct socket *sock)
sk_del_node_init_rcu(sk);
mutex_unlock(&net->xdp.lock);
- local_bh_disable();
sock_prot_inuse_add(net, sk->sk_prot, -1);
- local_bh_enable();
xsk_delete_from_maps(xs);
mutex_lock(&xs->mutex);
@@ -601,6 +839,8 @@ static int xsk_release(struct socket *sock)
xskq_destroy(xs->rx);
xskq_destroy(xs->tx);
+ xskq_destroy(xs->fq_tmp);
+ xskq_destroy(xs->cq_tmp);
sock_orphan(sk);
sock->sk = NULL;
@@ -628,22 +868,9 @@ static struct socket *xsk_lookup_xsk_from_fd(int fd)
return sock;
}
-/* Check if umem pages are contiguous.
- * If zero-copy mode, use the DMA address to do the page contiguity check
- * For all other modes we use addr (kernel virtual address)
- * Store the result in the low bits of addr.
- */
-static void xsk_check_page_contiguity(struct xdp_umem *umem, u32 flags)
+static bool xsk_validate_queues(struct xdp_sock *xs)
{
- struct xdp_umem_page *pgs = umem->pages;
- int i, is_contig;
-
- for (i = 0; i < umem->npgs - 1; i++) {
- is_contig = (flags & XDP_ZEROCOPY) ?
- (pgs[i].dma + PAGE_SIZE == pgs[i + 1].dma) :
- (pgs[i].addr + PAGE_SIZE == pgs[i + 1].addr);
- pgs[i].addr += is_contig << XSK_NEXT_PG_CONTIG_SHIFT;
- }
+ return xs->fq_tmp && xs->cq_tmp;
}
static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
@@ -714,38 +941,83 @@ static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
sockfd_put(sock);
goto out_unlock;
}
- if (umem_xs->dev != dev || umem_xs->queue_id != qid) {
- err = -EINVAL;
- sockfd_put(sock);
- goto out_unlock;
+
+ if (umem_xs->queue_id != qid || umem_xs->dev != dev) {
+ /* Share the umem with another socket on another qid
+ * and/or device.
+ */
+ xs->pool = xp_create_and_assign_umem(xs,
+ umem_xs->umem);
+ if (!xs->pool) {
+ err = -ENOMEM;
+ sockfd_put(sock);
+ goto out_unlock;
+ }
+
+ err = xp_assign_dev_shared(xs->pool, umem_xs, dev,
+ qid);
+ if (err) {
+ xp_destroy(xs->pool);
+ xs->pool = NULL;
+ sockfd_put(sock);
+ goto out_unlock;
+ }
+ } else {
+ /* Share the buffer pool with the other socket. */
+ if (xs->fq_tmp || xs->cq_tmp) {
+ /* Do not allow setting your own fq or cq. */
+ err = -EINVAL;
+ sockfd_put(sock);
+ goto out_unlock;
+ }
+
+ xp_get_pool(umem_xs->pool);
+ xs->pool = umem_xs->pool;
+
+ /* If underlying shared umem was created without Tx
+ * ring, allocate Tx descs array that Tx batching API
+ * utilizes
+ */
+ if (xs->tx && !xs->pool->tx_descs) {
+ err = xp_alloc_tx_descs(xs->pool, xs);
+ if (err) {
+ xp_put_pool(xs->pool);
+ sockfd_put(sock);
+ goto out_unlock;
+ }
+ }
}
xdp_get_umem(umem_xs->umem);
WRITE_ONCE(xs->umem, umem_xs->umem);
sockfd_put(sock);
- } else if (!xs->umem || !xdp_umem_validate_queues(xs->umem)) {
+ } else if (!xs->umem || !xsk_validate_queues(xs)) {
err = -EINVAL;
goto out_unlock;
} else {
/* This xsk has its own umem. */
- xskq_set_umem(xs->umem->fq, xs->umem->size,
- xs->umem->chunk_mask);
- xskq_set_umem(xs->umem->cq, xs->umem->size,
- xs->umem->chunk_mask);
-
- err = xdp_umem_assign_dev(xs->umem, dev, qid, flags);
- if (err)
+ xs->pool = xp_create_and_assign_umem(xs, xs->umem);
+ if (!xs->pool) {
+ err = -ENOMEM;
goto out_unlock;
+ }
- xsk_check_page_contiguity(xs->umem, flags);
+ err = xp_assign_dev(xs->pool, dev, qid, flags);
+ if (err) {
+ xp_destroy(xs->pool);
+ xs->pool = NULL;
+ goto out_unlock;
+ }
}
+ /* FQ and CQ are now owned by the buffer pool and cleaned up with it. */
+ xs->fq_tmp = NULL;
+ xs->cq_tmp = NULL;
+
xs->dev = dev;
xs->zc = xs->umem->zc;
xs->queue_id = qid;
- xskq_set_umem(xs->rx, xs->umem->size, xs->umem->chunk_mask);
- xskq_set_umem(xs->tx, xs->umem->size, xs->umem->chunk_mask);
- xdp_add_sk_umem(xs->umem, xs);
+ xp_add_xsk(xs->pool, xs);
out_unlock:
if (err) {
@@ -771,7 +1043,7 @@ struct xdp_umem_reg_v1 {
};
static int xsk_setsockopt(struct socket *sock, int level, int optname,
- char __user *optval, unsigned int optlen)
+ sockptr_t optval, unsigned int optlen)
{
struct sock *sk = sock->sk;
struct xdp_sock *xs = xdp_sk(sk);
@@ -789,7 +1061,7 @@ static int xsk_setsockopt(struct socket *sock, int level, int optname,
if (optlen < sizeof(entries))
return -EINVAL;
- if (copy_from_user(&entries, optval, sizeof(entries)))
+ if (copy_from_sockptr(&entries, optval, sizeof(entries)))
return -EFAULT;
mutex_lock(&xs->mutex);
@@ -816,7 +1088,7 @@ static int xsk_setsockopt(struct socket *sock, int level, int optname,
else if (optlen < sizeof(mr))
mr_size = sizeof(struct xdp_umem_reg_v1);
- if (copy_from_user(&mr, optval, mr_size))
+ if (copy_from_sockptr(&mr, optval, mr_size))
return -EFAULT;
mutex_lock(&xs->mutex);
@@ -843,7 +1115,7 @@ static int xsk_setsockopt(struct socket *sock, int level, int optname,
struct xsk_queue **q;
int entries;
- if (copy_from_user(&entries, optval, sizeof(entries)))
+ if (copy_from_sockptr(&entries, optval, sizeof(entries)))
return -EFAULT;
mutex_lock(&xs->mutex);
@@ -851,13 +1123,9 @@ static int xsk_setsockopt(struct socket *sock, int level, int optname,
mutex_unlock(&xs->mutex);
return -EBUSY;
}
- if (!xs->umem) {
- mutex_unlock(&xs->mutex);
- return -EINVAL;
- }
- q = (optname == XDP_UMEM_FILL_RING) ? &xs->umem->fq :
- &xs->umem->cq;
+ q = (optname == XDP_UMEM_FILL_RING) ? &xs->fq_tmp :
+ &xs->cq_tmp;
err = xsk_init_queue(entries, q, true);
mutex_unlock(&xs->mutex);
return err;
@@ -883,6 +1151,12 @@ static void xsk_enter_umem_offsets(struct xdp_ring_offset_v1 *ring)
ring->desc = offsetof(struct xdp_umem_ring, desc);
}
+struct xdp_statistics_v1 {
+ __u64 rx_dropped;
+ __u64 rx_invalid_descs;
+ __u64 tx_invalid_descs;
+};
+
static int xsk_getsockopt(struct socket *sock, int level, int optname,
char __user *optval, int __user *optlen)
{
@@ -901,20 +1175,36 @@ static int xsk_getsockopt(struct socket *sock, int level, int optname,
switch (optname) {
case XDP_STATISTICS:
{
- struct xdp_statistics stats;
+ struct xdp_statistics stats = {};
+ bool extra_stats = true;
+ size_t stats_size;
- if (len < sizeof(stats))
+ if (len < sizeof(struct xdp_statistics_v1)) {
return -EINVAL;
+ } else if (len < sizeof(stats)) {
+ extra_stats = false;
+ stats_size = sizeof(struct xdp_statistics_v1);
+ } else {
+ stats_size = sizeof(stats);
+ }
mutex_lock(&xs->mutex);
stats.rx_dropped = xs->rx_dropped;
+ if (extra_stats) {
+ stats.rx_ring_full = xs->rx_queue_full;
+ stats.rx_fill_ring_empty_descs =
+ xs->pool ? xskq_nb_queue_empty_descs(xs->pool->fq) : 0;
+ stats.tx_ring_empty_descs = xskq_nb_queue_empty_descs(xs->tx);
+ } else {
+ stats.rx_dropped += xs->rx_queue_full;
+ }
stats.rx_invalid_descs = xskq_nb_invalid_descs(xs->rx);
stats.tx_invalid_descs = xskq_nb_invalid_descs(xs->tx);
mutex_unlock(&xs->mutex);
- if (copy_to_user(optval, &stats, sizeof(stats)))
+ if (copy_to_user(optval, &stats, stats_size))
return -EFAULT;
- if (put_user(sizeof(stats), optlen))
+ if (put_user(stats_size, optlen))
return -EFAULT;
return 0;
@@ -1005,7 +1295,6 @@ static int xsk_mmap(struct file *file, struct socket *sock,
unsigned long size = vma->vm_end - vma->vm_start;
struct xdp_sock *xs = xdp_sk(sock->sk);
struct xsk_queue *q = NULL;
- struct xdp_umem *umem;
unsigned long pfn;
struct page *qpg;
@@ -1017,16 +1306,12 @@ static int xsk_mmap(struct file *file, struct socket *sock,
} else if (offset == XDP_PGOFF_TX_RING) {
q = READ_ONCE(xs->tx);
} else {
- umem = READ_ONCE(xs->umem);
- if (!umem)
- return -EINVAL;
-
/* Matches the smp_wmb() in XDP_UMEM_REG */
smp_rmb();
if (offset == XDP_UMEM_PGOFF_FILL_RING)
- q = READ_ONCE(umem->fq);
+ q = READ_ONCE(xs->fq_tmp);
else if (offset == XDP_UMEM_PGOFF_COMPLETION_RING)
- q = READ_ONCE(umem->cq);
+ q = READ_ONCE(xs->cq_tmp);
}
if (!q)
@@ -1060,12 +1345,12 @@ static int xsk_notifier(struct notifier_block *this,
if (xs->dev == dev) {
sk->sk_err = ENETDOWN;
if (!sock_flag(sk, SOCK_DEAD))
- sk->sk_error_report(sk);
+ sk_error_report(sk);
xsk_unbind_dev(xs);
- /* Clear device references in umem. */
- xdp_umem_clear_dev(xs->umem);
+ /* Clear device references. */
+ xp_clear_dev(xs->pool);
}
mutex_unlock(&xs->mutex);
}
@@ -1097,7 +1382,7 @@ static const struct proto_ops xsk_proto_ops = {
.setsockopt = xsk_setsockopt,
.getsockopt = xsk_getsockopt,
.sendmsg = xsk_sendmsg,
- .recvmsg = sock_no_recvmsg,
+ .recvmsg = xsk_recvmsg,
.mmap = xsk_mmap,
.sendpage = sock_no_sendpage,
};
@@ -1109,7 +1394,8 @@ static void xsk_destruct(struct sock *sk)
if (!sock_flag(sk, SOCK_DEAD))
return;
- xdp_put_umem(xs->umem);
+ if (!xp_put_pool(xs->pool))
+ xdp_put_umem(xs->umem, !xs->pool);
sk_refcnt_debug_dec(sk);
}
@@ -1117,8 +1403,8 @@ static void xsk_destruct(struct sock *sk)
static int xsk_create(struct net *net, struct socket *sock, int protocol,
int kern)
{
- struct sock *sk;
struct xdp_sock *xs;
+ struct sock *sk;
if (!ns_capable(net->user_ns, CAP_NET_RAW))
return -EPERM;
@@ -1149,7 +1435,6 @@ static int xsk_create(struct net *net, struct socket *sock, int protocol,
xs->state = XSK_READY;
mutex_init(&xs->mutex);
spin_lock_init(&xs->rx_lock);
- spin_lock_init(&xs->tx_completion_lock);
INIT_LIST_HEAD(&xs->map_list);
spin_lock_init(&xs->map_list_lock);
@@ -1158,9 +1443,7 @@ static int xsk_create(struct net *net, struct socket *sock, int protocol,
sk_add_node_rcu(sk, &net->xdp.list);
mutex_unlock(&net->xdp.lock);
- local_bh_disable();
sock_prot_inuse_add(net, &xsk_proto, 1);
- local_bh_enable();
return 0;
}