From 2276f58ac5890e58d2b6a48b95493faff7347e3a Mon Sep 17 00:00:00 2001 From: Paolo Abeni Date: Tue, 16 May 2017 11:20:14 +0200 Subject: udp: use a separate rx queue for packet reception under udp flood the sk_receive_queue spinlock is heavily contended. This patch try to reduce the contention on such lock adding a second receive queue to the udp sockets; recvmsg() looks first in such queue and, only if empty, tries to fetch the data from sk_receive_queue. The latter is spliced into the newly added queue every time the receive path has to acquire the sk_receive_queue lock. The accounting of forward allocated memory is still protected with the sk_receive_queue lock, so udp_rmem_release() needs to acquire both locks when the forward deficit is flushed. On specific scenarios we can end up acquiring and releasing the sk_receive_queue lock multiple times; that will be covered by the next patch Suggested-by: Eric Dumazet Signed-off-by: Paolo Abeni Acked-by: Eric Dumazet Signed-off-by: David S. Miller --- net/ipv4/udp.c | 138 ++++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 123 insertions(+), 15 deletions(-) (limited to 'net/ipv4') diff --git a/net/ipv4/udp.c b/net/ipv4/udp.c index ea6e4cff9faf..492c76be9230 100644 --- a/net/ipv4/udp.c +++ b/net/ipv4/udp.c @@ -1167,19 +1167,24 @@ out: static void udp_rmem_release(struct sock *sk, int size, int partial) { struct udp_sock *up = udp_sk(sk); + struct sk_buff_head *sk_queue; int amt; if (likely(partial)) { up->forward_deficit += size; size = up->forward_deficit; if (size < (sk->sk_rcvbuf >> 2) && - !skb_queue_empty(&sk->sk_receive_queue)) + !skb_queue_empty(&up->reader_queue)) return; } else { size += up->forward_deficit; } up->forward_deficit = 0; + /* acquire the sk_receive_queue for fwd allocated memory scheduling */ + sk_queue = &sk->sk_receive_queue; + spin_lock(&sk_queue->lock); + sk->sk_forward_alloc += size; amt = (sk->sk_forward_alloc - partial) & ~(SK_MEM_QUANTUM - 1); sk->sk_forward_alloc -= amt; @@ -1188,9 +1193,14 @@ static void udp_rmem_release(struct sock *sk, int size, int partial) __sk_mem_reduce_allocated(sk, amt >> SK_MEM_QUANTUM_SHIFT); atomic_sub(size, &sk->sk_rmem_alloc); + + /* this can save us from acquiring the rx queue lock on next receive */ + skb_queue_splice_tail_init(sk_queue, &up->reader_queue); + + spin_unlock(&sk_queue->lock); } -/* Note: called with sk_receive_queue.lock held. +/* Note: called with reader_queue.lock held. * Instead of using skb->truesize here, find a copy of it in skb->dev_scratch * This avoids a cache line miss while receive_queue lock is held. * Look at __udp_enqueue_schedule_skb() to find where this copy is done. @@ -1306,10 +1316,12 @@ EXPORT_SYMBOL_GPL(__udp_enqueue_schedule_skb); void udp_destruct_sock(struct sock *sk) { /* reclaim completely the forward allocated memory */ + struct udp_sock *up = udp_sk(sk); unsigned int total = 0; struct sk_buff *skb; - while ((skb = __skb_dequeue(&sk->sk_receive_queue)) != NULL) { + skb_queue_splice_tail_init(&sk->sk_receive_queue, &up->reader_queue); + while ((skb = __skb_dequeue(&up->reader_queue)) != NULL) { total += skb->truesize; kfree_skb(skb); } @@ -1321,6 +1333,7 @@ EXPORT_SYMBOL_GPL(udp_destruct_sock); int udp_init_sock(struct sock *sk) { + skb_queue_head_init(&udp_sk(sk)->reader_queue); sk->sk_destruct = udp_destruct_sock; return 0; } @@ -1338,6 +1351,26 @@ void skb_consume_udp(struct sock *sk, struct sk_buff *skb, int len) } EXPORT_SYMBOL_GPL(skb_consume_udp); +static struct sk_buff *__first_packet_length(struct sock *sk, + struct sk_buff_head *rcvq, + int *total) +{ + struct sk_buff *skb; + + while ((skb = skb_peek(rcvq)) != NULL && + udp_lib_checksum_complete(skb)) { + __UDP_INC_STATS(sock_net(sk), UDP_MIB_CSUMERRORS, + IS_UDPLITE(sk)); + __UDP_INC_STATS(sock_net(sk), UDP_MIB_INERRORS, + IS_UDPLITE(sk)); + atomic_inc(&sk->sk_drops); + __skb_unlink(skb, rcvq); + *total += skb->truesize; + kfree_skb(skb); + } + return skb; +} + /** * first_packet_length - return length of first packet in receive queue * @sk: socket @@ -1347,22 +1380,20 @@ EXPORT_SYMBOL_GPL(skb_consume_udp); */ static int first_packet_length(struct sock *sk) { - struct sk_buff_head *rcvq = &sk->sk_receive_queue; + struct sk_buff_head *rcvq = &udp_sk(sk)->reader_queue; + struct sk_buff_head *sk_queue = &sk->sk_receive_queue; struct sk_buff *skb; int total = 0; int res; spin_lock_bh(&rcvq->lock); - while ((skb = skb_peek(rcvq)) != NULL && - udp_lib_checksum_complete(skb)) { - __UDP_INC_STATS(sock_net(sk), UDP_MIB_CSUMERRORS, - IS_UDPLITE(sk)); - __UDP_INC_STATS(sock_net(sk), UDP_MIB_INERRORS, - IS_UDPLITE(sk)); - atomic_inc(&sk->sk_drops); - __skb_unlink(skb, rcvq); - total += skb->truesize; - kfree_skb(skb); + skb = __first_packet_length(sk, rcvq, &total); + if (!skb && !skb_queue_empty(sk_queue)) { + spin_lock(&sk_queue->lock); + skb_queue_splice_tail_init(sk_queue, rcvq); + spin_unlock(&sk_queue->lock); + + skb = __first_packet_length(sk, rcvq, &total); } res = skb ? skb->len : -1; if (total) @@ -1400,6 +1431,79 @@ int udp_ioctl(struct sock *sk, int cmd, unsigned long arg) } EXPORT_SYMBOL(udp_ioctl); +struct sk_buff *__skb_recv_udp(struct sock *sk, unsigned int flags, + int noblock, int *peeked, int *off, int *err) +{ + struct sk_buff_head *sk_queue = &sk->sk_receive_queue; + struct sk_buff_head *queue; + struct sk_buff *last; + long timeo; + int error; + + queue = &udp_sk(sk)->reader_queue; + flags |= noblock ? MSG_DONTWAIT : 0; + timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT); + do { + struct sk_buff *skb; + + error = sock_error(sk); + if (error) + break; + + error = -EAGAIN; + *peeked = 0; + do { + int _off = *off; + + spin_lock_bh(&queue->lock); + skb = __skb_try_recv_from_queue(sk, queue, flags, + udp_skb_destructor, + peeked, &_off, err, + &last); + if (skb) { + spin_unlock_bh(&queue->lock); + *off = _off; + return skb; + } + + if (skb_queue_empty(sk_queue)) { + spin_unlock_bh(&queue->lock); + goto busy_check; + } + + /* refill the reader queue and walk it again */ + _off = *off; + spin_lock(&sk_queue->lock); + skb_queue_splice_tail_init(sk_queue, queue); + spin_unlock(&sk_queue->lock); + + skb = __skb_try_recv_from_queue(sk, queue, flags, + udp_skb_destructor, + peeked, &_off, err, + &last); + spin_unlock_bh(&queue->lock); + if (skb) { + *off = _off; + return skb; + } + +busy_check: + if (!sk_can_busy_loop(sk)) + break; + + sk_busy_loop(sk, flags & MSG_DONTWAIT); + } while (!skb_queue_empty(sk_queue)); + + /* sk_queue is empty, reader_queue may contain peeked packets */ + } while (timeo && + !__skb_wait_for_more_packets(sk, &error, &timeo, + (struct sk_buff *)sk_queue)); + + *err = error; + return NULL; +} +EXPORT_SYMBOL_GPL(__skb_recv_udp); + /* * This should be easy, if there is something there we * return it, otherwise we block. @@ -1490,7 +1594,8 @@ try_again: return err; csum_copy_err: - if (!__sk_queue_drop_skb(sk, skb, flags, udp_skb_destructor)) { + if (!__sk_queue_drop_skb(sk, &udp_sk(sk)->reader_queue, skb, flags, + udp_skb_destructor)) { UDP_INC_STATS(sock_net(sk), UDP_MIB_CSUMERRORS, is_udplite); UDP_INC_STATS(sock_net(sk), UDP_MIB_INERRORS, is_udplite); } @@ -2325,6 +2430,9 @@ unsigned int udp_poll(struct file *file, struct socket *sock, poll_table *wait) unsigned int mask = datagram_poll(file, sock, wait); struct sock *sk = sock->sk; + if (!skb_queue_empty(&udp_sk(sk)->reader_queue)) + mask |= POLLIN | POLLRDNORM; + sock_rps_record_flow(sk); /* Check for false positives due to checksum errors */ -- cgit v1.2.3-59-g8ed1b From 6dfb4367cd911d2b03878fffa045d545ba4507f6 Mon Sep 17 00:00:00 2001 From: Paolo Abeni Date: Tue, 16 May 2017 11:20:15 +0200 Subject: udp: keep the sk_receive_queue held when splicing On packet reception, when we are forced to splice the sk_receive_queue, we can keep the related lock held, so that we can avoid re-acquiring it, if fwd memory scheduling is required. v1 -> v2: the rx_queue_lock_held param in udp_rmem_release() is now a bool Signed-off-by: Paolo Abeni Acked-by: Eric Dumazet Signed-off-by: David S. Miller --- net/ipv4/udp.c | 36 ++++++++++++++++++++++++++---------- 1 file changed, 26 insertions(+), 10 deletions(-) (limited to 'net/ipv4') diff --git a/net/ipv4/udp.c b/net/ipv4/udp.c index 492c76be9230..7bd56c9889b3 100644 --- a/net/ipv4/udp.c +++ b/net/ipv4/udp.c @@ -1164,7 +1164,8 @@ out: } /* fully reclaim rmem/fwd memory allocated for skb */ -static void udp_rmem_release(struct sock *sk, int size, int partial) +static void udp_rmem_release(struct sock *sk, int size, int partial, + bool rx_queue_lock_held) { struct udp_sock *up = udp_sk(sk); struct sk_buff_head *sk_queue; @@ -1181,9 +1182,13 @@ static void udp_rmem_release(struct sock *sk, int size, int partial) } up->forward_deficit = 0; - /* acquire the sk_receive_queue for fwd allocated memory scheduling */ + /* acquire the sk_receive_queue for fwd allocated memory scheduling, + * if the called don't held it already + */ sk_queue = &sk->sk_receive_queue; - spin_lock(&sk_queue->lock); + if (!rx_queue_lock_held) + spin_lock(&sk_queue->lock); + sk->sk_forward_alloc += size; amt = (sk->sk_forward_alloc - partial) & ~(SK_MEM_QUANTUM - 1); @@ -1197,7 +1202,8 @@ static void udp_rmem_release(struct sock *sk, int size, int partial) /* this can save us from acquiring the rx queue lock on next receive */ skb_queue_splice_tail_init(sk_queue, &up->reader_queue); - spin_unlock(&sk_queue->lock); + if (!rx_queue_lock_held) + spin_unlock(&sk_queue->lock); } /* Note: called with reader_queue.lock held. @@ -1207,10 +1213,16 @@ static void udp_rmem_release(struct sock *sk, int size, int partial) */ void udp_skb_destructor(struct sock *sk, struct sk_buff *skb) { - udp_rmem_release(sk, skb->dev_scratch, 1); + udp_rmem_release(sk, skb->dev_scratch, 1, false); } EXPORT_SYMBOL(udp_skb_destructor); +/* as above, but the caller held the rx queue lock, too */ +void udp_skb_dtor_locked(struct sock *sk, struct sk_buff *skb) +{ + udp_rmem_release(sk, skb->dev_scratch, 1, true); +} + /* Idea of busylocks is to let producers grab an extra spinlock * to relieve pressure on the receive_queue spinlock shared by consumer. * Under flood, this means that only one producer can be in line @@ -1325,7 +1337,7 @@ void udp_destruct_sock(struct sock *sk) total += skb->truesize; kfree_skb(skb); } - udp_rmem_release(sk, total, 0); + udp_rmem_release(sk, total, 0, true); inet_sock_destruct(sk); } @@ -1397,7 +1409,7 @@ static int first_packet_length(struct sock *sk) } res = skb ? skb->len : -1; if (total) - udp_rmem_release(sk, total, 1); + udp_rmem_release(sk, total, 1, false); spin_unlock_bh(&rcvq->lock); return res; } @@ -1471,16 +1483,20 @@ struct sk_buff *__skb_recv_udp(struct sock *sk, unsigned int flags, goto busy_check; } - /* refill the reader queue and walk it again */ + /* refill the reader queue and walk it again + * keep both queues locked to avoid re-acquiring + * the sk_receive_queue lock if fwd memory scheduling + * is needed. + */ _off = *off; spin_lock(&sk_queue->lock); skb_queue_splice_tail_init(sk_queue, queue); - spin_unlock(&sk_queue->lock); skb = __skb_try_recv_from_queue(sk, queue, flags, - udp_skb_destructor, + udp_skb_dtor_locked, peeked, &_off, err, &last); + spin_unlock(&sk_queue->lock); spin_unlock_bh(&queue->lock); if (skb) { *off = _off; -- cgit v1.2.3-59-g8ed1b