aboutsummaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
authorJakub Kicinski <kuba@kernel.org>2020-11-30 17:55:26 -0800
committerJakub Kicinski <kuba@kernel.org>2020-11-30 17:55:26 -0800
commit5f3e915c36d59c94a917e207df4361f23d9c821d (patch)
tree22796b798d2a4ed571c1bf4f48e1a7c8576ec0f3
parentMerge branch 'dpaa_eth-add-xdp-support' (diff)
parentmptcp: use mptcp release_cb for delayed tasks (diff)
downloadwireguard-linux-5f3e915c36d59c94a917e207df4361f23d9c821d.tar.xz
wireguard-linux-5f3e915c36d59c94a917e207df4361f23d9c821d.zip
Merge branch 'mptcp-avoid-workqueue-usage-for-data'
Paolo Abeni says: ==================== mptcp: avoid workqueue usage for data The current locking schema used to protect the MPTCP data-path requires the usage of the MPTCP workqueue to process the incoming data, depending on trylock result. The above poses scalability limits and introduces random delays in MPTCP-level acks. With this series we use a single spinlock to protect the MPTCP data-path, removing the need for workqueue and delayed ack usage. This additionally reduces the number of atomic operations required per packet and cleans-up considerably the poll/wake-up code. ==================== Link: https://lore.kernel.org/r/cover.1606413118.git.pabeni@redhat.com Signed-off-by: Jakub Kicinski <kuba@kernel.org>
-rw-r--r--include/net/sock.h1
-rw-r--r--net/core/sock.c2
-rw-r--r--net/mptcp/mptcp_diag.c2
-rw-r--r--net/mptcp/options.c47
-rw-r--r--net/mptcp/protocol.c733
-rw-r--r--net/mptcp/protocol.h34
-rw-r--r--net/mptcp/subflow.c14
7 files changed, 598 insertions, 235 deletions
diff --git a/include/net/sock.h b/include/net/sock.h
index 80469c2c448d..f59764614e30 100644
--- a/include/net/sock.h
+++ b/include/net/sock.h
@@ -1590,6 +1590,7 @@ static inline void lock_sock(struct sock *sk)
lock_sock_nested(sk, 0);
}
+void __lock_sock(struct sock *sk);
void __release_sock(struct sock *sk);
void release_sock(struct sock *sk);
diff --git a/net/core/sock.c b/net/core/sock.c
index 9badbe7bb4e4..f0f096852876 100644
--- a/net/core/sock.c
+++ b/net/core/sock.c
@@ -2486,7 +2486,7 @@ bool sk_page_frag_refill(struct sock *sk, struct page_frag *pfrag)
}
EXPORT_SYMBOL(sk_page_frag_refill);
-static void __lock_sock(struct sock *sk)
+void __lock_sock(struct sock *sk)
__releases(&sk->sk_lock.slock)
__acquires(&sk->sk_lock.slock)
{
diff --git a/net/mptcp/mptcp_diag.c b/net/mptcp/mptcp_diag.c
index 5f390a97f556..b70ae4ba3000 100644
--- a/net/mptcp/mptcp_diag.c
+++ b/net/mptcp/mptcp_diag.c
@@ -140,7 +140,7 @@ static void mptcp_diag_get_info(struct sock *sk, struct inet_diag_msg *r,
info->mptcpi_flags = flags;
info->mptcpi_token = READ_ONCE(msk->token);
info->mptcpi_write_seq = READ_ONCE(msk->write_seq);
- info->mptcpi_snd_una = atomic64_read(&msk->snd_una);
+ info->mptcpi_snd_una = READ_ONCE(msk->snd_una);
info->mptcpi_rcv_nxt = READ_ONCE(msk->ack_seq);
unlock_sock_fast(sk, slow);
}
diff --git a/net/mptcp/options.c b/net/mptcp/options.c
index 8a59b3e44599..6b7b4b67f18c 100644
--- a/net/mptcp/options.c
+++ b/net/mptcp/options.c
@@ -830,18 +830,20 @@ static u64 expand_ack(u64 old_ack, u64 cur_ack, bool use_64bit)
}
static void ack_update_msk(struct mptcp_sock *msk,
- const struct sock *ssk,
+ struct sock *ssk,
struct mptcp_options_received *mp_opt)
{
- u64 new_snd_una, snd_una, old_snd_una = atomic64_read(&msk->snd_una);
- u64 new_wnd_end, wnd_end, old_wnd_end = atomic64_read(&msk->wnd_end);
- u64 snd_nxt = READ_ONCE(msk->snd_nxt);
+ u64 new_wnd_end, new_snd_una, snd_nxt = READ_ONCE(msk->snd_nxt);
struct sock *sk = (struct sock *)msk;
+ u64 old_snd_una;
+
+ mptcp_data_lock(sk);
/* avoid ack expansion on update conflict, to reduce the risk of
* wrongly expanding to a future ack sequence number, which is way
* more dangerous than missing an ack
*/
+ old_snd_una = msk->snd_una;
new_snd_una = expand_ack(old_snd_una, mp_opt->data_ack, mp_opt->ack64);
/* ACK for data not even sent yet? Ignore. */
@@ -850,26 +852,16 @@ static void ack_update_msk(struct mptcp_sock *msk,
new_wnd_end = new_snd_una + tcp_sk(ssk)->snd_wnd;
- while (after64(new_wnd_end, old_wnd_end)) {
- wnd_end = old_wnd_end;
- old_wnd_end = atomic64_cmpxchg(&msk->wnd_end, wnd_end,
- new_wnd_end);
- if (old_wnd_end == wnd_end) {
- if (mptcp_send_head(sk))
- mptcp_schedule_work(sk);
- break;
- }
+ if (after64(new_wnd_end, msk->wnd_end)) {
+ msk->wnd_end = new_wnd_end;
+ __mptcp_wnd_updated(sk, ssk);
}
- while (after64(new_snd_una, old_snd_una)) {
- snd_una = old_snd_una;
- old_snd_una = atomic64_cmpxchg(&msk->snd_una, snd_una,
- new_snd_una);
- if (old_snd_una == snd_una) {
- mptcp_data_acked(sk);
- break;
- }
+ if (after64(new_snd_una, old_snd_una)) {
+ msk->snd_una = new_snd_una;
+ __mptcp_data_acked(sk);
}
+ mptcp_data_unlock(sk);
}
bool mptcp_update_rcv_data_fin(struct mptcp_sock *msk, u64 data_fin_seq, bool use_64bit)
@@ -922,8 +914,19 @@ void mptcp_incoming_options(struct sock *sk, struct sk_buff *skb)
struct mptcp_options_received mp_opt;
struct mptcp_ext *mpext;
- if (__mptcp_check_fallback(msk))
+ if (__mptcp_check_fallback(msk)) {
+ /* Keep it simple and unconditionally trigger send data cleanup and
+ * pending queue spooling. We will need to acquire the data lock
+ * for more accurate checks, and once the lock is acquired, such
+ * helpers are cheap.
+ */
+ mptcp_data_lock(subflow->conn);
+ if (mptcp_send_head(subflow->conn))
+ __mptcp_wnd_updated(subflow->conn, sk);
+ __mptcp_data_acked(subflow->conn);
+ mptcp_data_unlock(subflow->conn);
return;
+ }
mptcp_get_options(skb, &mp_opt);
if (!check_fully_established(msk, sk, subflow, skb, &mp_opt))
diff --git a/net/mptcp/protocol.c b/net/mptcp/protocol.c
index 16e9cb1c79cc..221f7cdd416b 100644
--- a/net/mptcp/protocol.c
+++ b/net/mptcp/protocol.c
@@ -60,7 +60,7 @@ static struct socket *__mptcp_nmpc_socket(const struct mptcp_sock *msk)
/* Returns end sequence number of the receiver's advertised window */
static u64 mptcp_wnd_end(const struct mptcp_sock *msk)
{
- return atomic64_read(&msk->wnd_end);
+ return READ_ONCE(msk->wnd_end);
}
static bool mptcp_is_tcpsk(struct sock *sk)
@@ -348,17 +348,22 @@ static void mptcp_close_wake_up(struct sock *sk)
sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
}
-static void mptcp_check_data_fin_ack(struct sock *sk)
+static bool mptcp_pending_data_fin_ack(struct sock *sk)
{
struct mptcp_sock *msk = mptcp_sk(sk);
- if (__mptcp_check_fallback(msk))
- return;
+ return !__mptcp_check_fallback(msk) &&
+ ((1 << sk->sk_state) &
+ (TCPF_FIN_WAIT1 | TCPF_CLOSING | TCPF_LAST_ACK)) &&
+ msk->write_seq == READ_ONCE(msk->snd_una);
+}
+
+static void mptcp_check_data_fin_ack(struct sock *sk)
+{
+ struct mptcp_sock *msk = mptcp_sk(sk);
/* Look for an acknowledged DATA_FIN */
- if (((1 << sk->sk_state) &
- (TCPF_FIN_WAIT1 | TCPF_CLOSING | TCPF_LAST_ACK)) &&
- msk->write_seq == atomic64_read(&msk->snd_una)) {
+ if (mptcp_pending_data_fin_ack(sk)) {
mptcp_stop_timer(sk);
WRITE_ONCE(msk->snd_data_fin_enable, 0);
@@ -453,15 +458,15 @@ static bool mptcp_subflow_cleanup_rbuf(struct sock *ssk)
static void mptcp_cleanup_rbuf(struct mptcp_sock *msk)
{
+ struct sock *ack_hint = READ_ONCE(msk->ack_hint);
struct mptcp_subflow_context *subflow;
/* if the hinted ssk is still active, try to use it */
- if (likely(msk->ack_hint)) {
+ if (likely(ack_hint)) {
mptcp_for_each_subflow(msk, subflow) {
struct sock *ssk = mptcp_subflow_tcp_sock(subflow);
- if (msk->ack_hint == ssk &&
- mptcp_subflow_cleanup_rbuf(ssk))
+ if (ack_hint == ssk && mptcp_subflow_cleanup_rbuf(ssk))
return;
}
}
@@ -614,13 +619,13 @@ static bool __mptcp_move_skbs_from_subflow(struct mptcp_sock *msk,
break;
}
} while (more_data_avail);
- msk->ack_hint = ssk;
+ WRITE_ONCE(msk->ack_hint, ssk);
*bytes += moved;
return done;
}
-static bool mptcp_ofo_queue(struct mptcp_sock *msk)
+static bool __mptcp_ofo_queue(struct mptcp_sock *msk)
{
struct sock *sk = (struct sock *)msk;
struct sk_buff *skb, *tail;
@@ -666,34 +671,27 @@ static bool mptcp_ofo_queue(struct mptcp_sock *msk)
/* In most cases we will be able to lock the mptcp socket. If its already
* owned, we need to defer to the work queue to avoid ABBA deadlock.
*/
-static bool move_skbs_to_msk(struct mptcp_sock *msk, struct sock *ssk)
+static void move_skbs_to_msk(struct mptcp_sock *msk, struct sock *ssk)
{
struct sock *sk = (struct sock *)msk;
unsigned int moved = 0;
- if (READ_ONCE(sk->sk_lock.owned))
- return false;
-
- if (unlikely(!spin_trylock_bh(&sk->sk_lock.slock)))
- return false;
-
- /* must re-check after taking the lock */
- if (!READ_ONCE(sk->sk_lock.owned)) {
- __mptcp_move_skbs_from_subflow(msk, ssk, &moved);
- mptcp_ofo_queue(msk);
+ if (inet_sk_state_load(sk) == TCP_CLOSE)
+ return;
- /* If the moves have caught up with the DATA_FIN sequence number
- * it's time to ack the DATA_FIN and change socket state, but
- * this is not a good place to change state. Let the workqueue
- * do it.
- */
- if (mptcp_pending_data_fin(sk, NULL))
- mptcp_schedule_work(sk);
- }
+ mptcp_data_lock(sk);
- spin_unlock_bh(&sk->sk_lock.slock);
+ __mptcp_move_skbs_from_subflow(msk, ssk, &moved);
+ __mptcp_ofo_queue(msk);
- return moved > 0;
+ /* If the moves have caught up with the DATA_FIN sequence number
+ * it's time to ack the DATA_FIN and change socket state, but
+ * this is not a good place to change state. Let the workqueue
+ * do it.
+ */
+ if (mptcp_pending_data_fin(sk, NULL))
+ mptcp_schedule_work(sk);
+ mptcp_data_unlock(sk);
}
void mptcp_data_ready(struct sock *sk, struct sock *ssk)
@@ -771,16 +769,6 @@ bool mptcp_schedule_work(struct sock *sk)
return false;
}
-void mptcp_data_acked(struct sock *sk)
-{
- mptcp_reset_timer(sk);
-
- if ((test_bit(MPTCP_NOSPACE, &mptcp_sk(sk)->flags) ||
- mptcp_send_head(sk) ||
- (inet_sk_state_load(sk) != TCP_ESTABLISHED)))
- mptcp_schedule_work(sk);
-}
-
void mptcp_subflow_eof(struct sock *sk)
{
if (!test_and_set_bit(MPTCP_WORK_EOF, &mptcp_sk(sk)->flags))
@@ -825,16 +813,6 @@ static void mptcp_check_for_eof(struct mptcp_sock *msk)
mptcp_close_wake_up(sk);
}
-static bool mptcp_ext_cache_refill(struct mptcp_sock *msk)
-{
- const struct sock *sk = (const struct sock *)msk;
-
- if (!msk->cached_ext)
- msk->cached_ext = __skb_ext_alloc(sk->sk_allocation);
-
- return !!msk->cached_ext;
-}
-
static struct sock *mptcp_subflow_recv_lookup(const struct mptcp_sock *msk)
{
struct mptcp_subflow_context *subflow;
@@ -873,6 +851,121 @@ static bool mptcp_frag_can_collapse_to(const struct mptcp_sock *msk,
df->data_seq + df->data_len == msk->write_seq;
}
+static int mptcp_wmem_with_overhead(struct sock *sk, int size)
+{
+ struct mptcp_sock *msk = mptcp_sk(sk);
+ int ret, skbs;
+
+ ret = size + ((sizeof(struct mptcp_data_frag) * size) >> PAGE_SHIFT);
+ skbs = (msk->tx_pending_data + size) / msk->size_goal_cache;
+ if (skbs < msk->skb_tx_cache.qlen)
+ return ret;
+
+ return ret + (skbs - msk->skb_tx_cache.qlen) * SKB_TRUESIZE(MAX_TCP_HEADER);
+}
+
+static void __mptcp_wmem_reserve(struct sock *sk, int size)
+{
+ int amount = mptcp_wmem_with_overhead(sk, size);
+ struct mptcp_sock *msk = mptcp_sk(sk);
+
+ WARN_ON_ONCE(msk->wmem_reserved);
+ if (amount <= sk->sk_forward_alloc)
+ goto reserve;
+
+ /* under memory pressure try to reserve at most a single page
+ * otherwise try to reserve the full estimate and fallback
+ * to a single page before entering the error path
+ */
+ if ((tcp_under_memory_pressure(sk) && amount > PAGE_SIZE) ||
+ !sk_wmem_schedule(sk, amount)) {
+ if (amount <= PAGE_SIZE)
+ goto nomem;
+
+ amount = PAGE_SIZE;
+ if (!sk_wmem_schedule(sk, amount))
+ goto nomem;
+ }
+
+reserve:
+ msk->wmem_reserved = amount;
+ sk->sk_forward_alloc -= amount;
+ return;
+
+nomem:
+ /* we will wait for memory on next allocation */
+ msk->wmem_reserved = -1;
+}
+
+static void __mptcp_update_wmem(struct sock *sk)
+{
+ struct mptcp_sock *msk = mptcp_sk(sk);
+
+ if (!msk->wmem_reserved)
+ return;
+
+ if (msk->wmem_reserved < 0)
+ msk->wmem_reserved = 0;
+ if (msk->wmem_reserved > 0) {
+ sk->sk_forward_alloc += msk->wmem_reserved;
+ msk->wmem_reserved = 0;
+ }
+}
+
+static bool mptcp_wmem_alloc(struct sock *sk, int size)
+{
+ struct mptcp_sock *msk = mptcp_sk(sk);
+
+ /* check for pre-existing error condition */
+ if (msk->wmem_reserved < 0)
+ return false;
+
+ if (msk->wmem_reserved >= size)
+ goto account;
+
+ mptcp_data_lock(sk);
+ if (!sk_wmem_schedule(sk, size)) {
+ mptcp_data_unlock(sk);
+ return false;
+ }
+
+ sk->sk_forward_alloc -= size;
+ msk->wmem_reserved += size;
+ mptcp_data_unlock(sk);
+
+account:
+ msk->wmem_reserved -= size;
+ return true;
+}
+
+static void mptcp_wmem_uncharge(struct sock *sk, int size)
+{
+ struct mptcp_sock *msk = mptcp_sk(sk);
+
+ if (msk->wmem_reserved < 0)
+ msk->wmem_reserved = 0;
+ msk->wmem_reserved += size;
+}
+
+static void mptcp_mem_reclaim_partial(struct sock *sk)
+{
+ struct mptcp_sock *msk = mptcp_sk(sk);
+
+ /* if we are experiencing a transint allocation error,
+ * the forward allocation memory has been already
+ * released
+ */
+ if (msk->wmem_reserved < 0)
+ return;
+
+ mptcp_data_lock(sk);
+ sk->sk_forward_alloc += msk->wmem_reserved;
+ sk_mem_reclaim_partial(sk);
+ msk->wmem_reserved = sk->sk_forward_alloc;
+ sk->sk_forward_alloc = 0;
+ mptcp_data_unlock(sk);
+}
+
static void dfrag_uncharge(struct sock *sk, int len)
{
sk_mem_uncharge(sk, len);
@@ -888,7 +981,7 @@ static void dfrag_clear(struct sock *sk, struct mptcp_data_frag *dfrag)
put_page(dfrag->page);
}
-static void mptcp_clean_una(struct sock *sk)
+static void __mptcp_clean_una(struct sock *sk)
{
struct mptcp_sock *msk = mptcp_sk(sk);
struct mptcp_data_frag *dtmp, *dfrag;
@@ -899,10 +992,9 @@ static void mptcp_clean_una(struct sock *sk)
* plain TCP
*/
if (__mptcp_check_fallback(msk))
- atomic64_set(&msk->snd_una, msk->snd_nxt);
-
- snd_una = atomic64_read(&msk->snd_una);
+ msk->snd_una = READ_ONCE(msk->snd_nxt);
+ snd_una = msk->snd_una;
list_for_each_entry_safe(dfrag, dtmp, &msk->rtx_queue, list) {
if (after64(dfrag->data_seq + dfrag->data_len, snd_una))
break;
@@ -930,36 +1022,34 @@ static void mptcp_clean_una(struct sock *sk)
}
out:
- if (cleaned)
- sk_mem_reclaim_partial(sk);
-}
-
-static void mptcp_clean_una_wakeup(struct sock *sk)
-{
- struct mptcp_sock *msk = mptcp_sk(sk);
+ if (cleaned) {
+ if (tcp_under_memory_pressure(sk)) {
+ __mptcp_update_wmem(sk);
+ sk_mem_reclaim_partial(sk);
+ }
- mptcp_clean_una(sk);
+ if (sk_stream_is_writeable(sk)) {
+ /* pairs with memory barrier in mptcp_poll */
+ smp_mb();
+ if (test_and_clear_bit(MPTCP_NOSPACE, &msk->flags))
+ sk_stream_write_space(sk);
+ }
+ }
- /* Only wake up writers if a subflow is ready */
- if (sk_stream_is_writeable(sk)) {
- clear_bit(MPTCP_NOSPACE, &msk->flags);
- sk_stream_write_space(sk);
+ if (snd_una == READ_ONCE(msk->snd_nxt)) {
+ if (msk->timer_ival)
+ mptcp_stop_timer(sk);
+ } else {
+ mptcp_reset_timer(sk);
}
}
-/* ensure we get enough memory for the frag hdr, beyond some minimal amount of
- * data
- */
-static bool mptcp_page_frag_refill(struct sock *sk, struct page_frag *pfrag)
+static void mptcp_enter_memory_pressure(struct sock *sk)
{
struct mptcp_subflow_context *subflow;
struct mptcp_sock *msk = mptcp_sk(sk);
bool first = true;
- if (likely(skb_page_frag_refill(32U + sizeof(struct mptcp_data_frag),
- pfrag, sk->sk_allocation)))
- return true;
-
sk_stream_moderate_sndbuf(sk);
mptcp_for_each_subflow(msk, subflow) {
struct sock *ssk = mptcp_subflow_tcp_sock(subflow);
@@ -969,6 +1059,18 @@ static bool mptcp_page_frag_refill(struct sock *sk, struct page_frag *pfrag)
sk_stream_moderate_sndbuf(ssk);
first = false;
}
+}
+
+/* ensure we get enough memory for the frag hdr, beyond some minimal amount of
+ * data
+ */
+static bool mptcp_page_frag_refill(struct sock *sk, struct page_frag *pfrag)
+{
+ if (likely(skb_page_frag_refill(32U + sizeof(struct mptcp_data_frag),
+ pfrag, sk->sk_allocation)))
+ return true;
+
+ mptcp_enter_memory_pressure(sk);
return false;
}
@@ -1015,6 +1117,128 @@ static int mptcp_check_allowed_size(struct mptcp_sock *msk, u64 data_seq,
return avail_size;
}
+static bool __mptcp_add_ext(struct sk_buff *skb, gfp_t gfp)
+{
+ struct skb_ext *mpext = __skb_ext_alloc(gfp);
+
+ if (!mpext)
+ return false;
+ __skb_ext_set(skb, SKB_EXT_MPTCP, mpext);
+ return true;
+}
+
+static struct sk_buff *__mptcp_do_alloc_tx_skb(struct sock *sk, gfp_t gfp)
+{
+ struct sk_buff *skb;
+
+ skb = alloc_skb_fclone(MAX_TCP_HEADER, gfp);
+ if (likely(skb)) {
+ if (likely(__mptcp_add_ext(skb, gfp))) {
+ skb_reserve(skb, MAX_TCP_HEADER);
+ skb->reserved_tailroom = skb->end - skb->tail;
+ return skb;
+ }
+ __kfree_skb(skb);
+ } else {
+ mptcp_enter_memory_pressure(sk);
+ }
+ return NULL;
+}
+
+static bool mptcp_tx_cache_refill(struct sock *sk, int size,
+ struct sk_buff_head *skbs, int *total_ts)
+{
+ struct mptcp_sock *msk = mptcp_sk(sk);
+ struct sk_buff *skb;
+ int space_needed;
+
+ if (unlikely(tcp_under_memory_pressure(sk))) {
+ mptcp_mem_reclaim_partial(sk);
+
+ /* under pressure pre-allocate at most a single skb */
+ if (msk->skb_tx_cache.qlen)
+ return true;
+ space_needed = msk->size_goal_cache;
+ } else {
+ space_needed = msk->tx_pending_data + size -
+ msk->skb_tx_cache.qlen * msk->size_goal_cache;
+ }
+
+ while (space_needed > 0) {
+ skb = __mptcp_do_alloc_tx_skb(sk, sk->sk_allocation);
+ if (unlikely(!skb)) {
+ /* under memory pressure, try to pass the caller a
+ * single skb to allow forward progress
+ */
+ while (skbs->qlen > 1) {
+ skb = __skb_dequeue_tail(skbs);
+ __kfree_skb(skb);
+ }
+ return skbs->qlen > 0;
+ }
+
+ *total_ts += skb->truesize;
+ __skb_queue_tail(skbs, skb);
+ space_needed -= msk->size_goal_cache;
+ }
+ return true;
+}
+
+static bool __mptcp_alloc_tx_skb(struct sock *sk, struct sock *ssk, gfp_t gfp)
+{
+ struct mptcp_sock *msk = mptcp_sk(sk);
+ struct sk_buff *skb;
+
+ if (ssk->sk_tx_skb_cache) {
+ skb = ssk->sk_tx_skb_cache;
+ if (unlikely(!skb_ext_find(skb, SKB_EXT_MPTCP) &&
+ !__mptcp_add_ext(skb, gfp)))
+ return false;
+ return true;
+ }
+
+ skb = skb_peek(&msk->skb_tx_cache);
+ if (skb) {
+ if (likely(sk_wmem_schedule(ssk, skb->truesize))) {
+ skb = __skb_dequeue(&msk->skb_tx_cache);
+ if (WARN_ON_ONCE(!skb))
+ return false;
+
+ mptcp_wmem_uncharge(sk, skb->truesize);
+ ssk->sk_tx_skb_cache = skb;
+ return true;
+ }
+
+ /* over memory limit, no point to try to allocate a new skb */
+ return false;
+ }
+
+ skb = __mptcp_do_alloc_tx_skb(sk, gfp);
+ if (!skb)
+ return false;
+
+ if (likely(sk_wmem_schedule(ssk, skb->truesize))) {
+ ssk->sk_tx_skb_cache = skb;
+ return true;
+ }
+ kfree_skb(skb);
+ return false;
+}
+
+static bool mptcp_must_reclaim_memory(struct sock *sk, struct sock *ssk)
+{
+ return !ssk->sk_tx_skb_cache &&
+ !skb_peek(&mptcp_sk(sk)->skb_tx_cache) &&
+ tcp_under_memory_pressure(sk);
+}
+
+static bool mptcp_alloc_tx_skb(struct sock *sk, struct sock *ssk)
+{
+ if (unlikely(mptcp_must_reclaim_memory(sk, ssk)))
+ mptcp_mem_reclaim_partial(sk);
+ return __mptcp_alloc_tx_skb(sk, ssk, sk->sk_allocation);
+}
+
static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
struct mptcp_data_frag *dfrag,
struct mptcp_sendmsg_info *info)
@@ -1026,7 +1250,7 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
struct sk_buff *skb, *tail;
bool can_collapse = false;
int avail_size;
- size_t ret;
+ size_t ret = 0;
pr_debug("msk=%p ssk=%p sending dfrag at seq=%lld len=%d already sent=%d",
msk, ssk, dfrag->data_seq, dfrag->data_len, info->sent);
@@ -1034,6 +1258,7 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
/* compute send limit */
info->mss_now = tcp_send_mss(ssk, &info->size_goal, info->flags);
avail_size = info->size_goal;
+ msk->size_goal_cache = info->size_goal;
skb = tcp_write_queue_tail(ssk);
if (skb) {
/* Limit the write to the size available in the
@@ -1054,10 +1279,12 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
/* Zero window and all data acked? Probe. */
avail_size = mptcp_check_allowed_size(msk, data_seq, avail_size);
if (avail_size == 0) {
- if (skb || atomic64_read(&msk->snd_una) != msk->snd_nxt)
+ u64 snd_una = READ_ONCE(msk->snd_una);
+
+ if (skb || snd_una != msk->snd_nxt)
return 0;
zero_window_probe = true;
- data_seq = atomic64_read(&msk->snd_una) - 1;
+ data_seq = snd_una - 1;
avail_size = 1;
}
@@ -1082,8 +1309,11 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
goto out;
}
- mpext = __skb_ext_set(tail, SKB_EXT_MPTCP, msk->cached_ext);
- msk->cached_ext = NULL;
+ mpext = skb_ext_find(tail, SKB_EXT_MPTCP);
+ if (WARN_ON_ONCE(!mpext)) {
+ /* should never reach here, stream corrupted */
+ return -EINVAL;
+ }
memset(mpext, 0, sizeof(*mpext));
mpext->data_seq = data_seq;
@@ -1107,31 +1337,6 @@ out:
return ret;
}
-static void mptcp_nospace(struct mptcp_sock *msk)
-{
- struct mptcp_subflow_context *subflow;
-
- set_bit(MPTCP_NOSPACE, &msk->flags);
- smp_mb__after_atomic(); /* msk->flags is changed by write_space cb */
-
- mptcp_for_each_subflow(msk, subflow) {
- struct sock *ssk = mptcp_subflow_tcp_sock(subflow);
- bool ssk_writeable = sk_stream_is_writeable(ssk);
- struct socket *sock = READ_ONCE(ssk->sk_socket);
-
- if (ssk_writeable || !sock)
- continue;
-
- /* enables ssk->write_space() callbacks */
- set_bit(SOCK_NOSPACE, &sock->flags);
- }
-
- /* mptcp_data_acked() could run just before we set the NOSPACE bit,
- * so explicitly check for snd_una value
- */
- mptcp_clean_una((struct sock *)msk);
-}
-
#define MPTCP_SEND_BURST_SIZE ((1 << 16) - \
sizeof(struct tcphdr) - \
MAX_TCP_OPTION_SPACE - \
@@ -1156,9 +1361,6 @@ static struct sock *mptcp_subflow_get_send(struct mptcp_sock *msk,
sock_owned_by_me((struct sock *)msk);
*sndbuf = 0;
- if (!mptcp_ext_cache_refill(msk))
- return NULL;
-
if (__mptcp_check_fallback(msk)) {
if (!msk->first)
return NULL;
@@ -1267,6 +1469,15 @@ static void mptcp_push_pending(struct sock *sk, unsigned int flags)
if (ssk != prev_ssk || !prev_ssk)
lock_sock(ssk);
+ /* keep it simple and always provide a new skb for the
+ * subflow, even if we will not use it when collapsing
+ * on the pending one
+ */
+ if (!mptcp_alloc_tx_skb(sk, ssk)) {
+ mptcp_push_release(sk, ssk, &info);
+ goto out;
+ }
+
ret = mptcp_sendmsg_frag(sk, ssk, dfrag, &info);
if (ret <= 0) {
mptcp_push_release(sk, ssk, &info);
@@ -1277,6 +1488,7 @@ static void mptcp_push_pending(struct sock *sk, unsigned int flags)
dfrag->already_sent += ret;
msk->snd_nxt += ret;
msk->snd_burst -= ret;
+ msk->tx_pending_data -= ret;
copied += ret;
len -= ret;
}
@@ -1296,6 +1508,63 @@ out:
}
}
+static void __mptcp_subflow_push_pending(struct sock *sk, struct sock *ssk)
+{
+ struct mptcp_sock *msk = mptcp_sk(sk);
+ struct mptcp_sendmsg_info info;
+ struct mptcp_data_frag *dfrag;
+ int len, copied = 0;
+
+ info.flags = 0;
+ while ((dfrag = mptcp_send_head(sk))) {
+ info.sent = dfrag->already_sent;
+ info.limit = dfrag->data_len;
+ len = dfrag->data_len - dfrag->already_sent;
+ while (len > 0) {
+ int ret = 0;
+
+ /* do auto tuning */
+ if (!(sk->sk_userlocks & SOCK_SNDBUF_LOCK) &&
+ ssk->sk_sndbuf > READ_ONCE(sk->sk_sndbuf))
+ WRITE_ONCE(sk->sk_sndbuf, ssk->sk_sndbuf);
+
+ if (unlikely(mptcp_must_reclaim_memory(sk, ssk))) {
+ __mptcp_update_wmem(sk);
+ sk_mem_reclaim_partial(sk);
+ }
+ if (!__mptcp_alloc_tx_skb(sk, ssk, GFP_ATOMIC))
+ goto out;
+
+ ret = mptcp_sendmsg_frag(sk, ssk, dfrag, &info);
+ if (ret <= 0)
+ goto out;
+
+ info.sent += ret;
+ dfrag->already_sent += ret;
+ msk->snd_nxt += ret;
+ msk->snd_burst -= ret;
+ msk->tx_pending_data -= ret;
+ copied += ret;
+ len -= ret;
+ }
+ WRITE_ONCE(msk->first_pending, mptcp_send_next(sk));
+ }
+
+out:
+ /* __mptcp_alloc_tx_skb could have released some wmem and we are
+ * not going to flush it via release_sock()
+ */
+ __mptcp_update_wmem(sk);
+ if (copied) {
+ mptcp_set_timeout(sk, ssk);
+ tcp_push(ssk, 0, info.mss_now, tcp_sk(ssk)->nonagle,
+ info.size_goal);
+ if (msk->snd_data_fin_enable &&
+ msk->snd_nxt + 1 == msk->write_seq)
+ mptcp_schedule_work(sk);
+ }
+}
+
static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
{
struct mptcp_sock *msk = mptcp_sk(sk);
@@ -1307,7 +1576,7 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
if (msg->msg_flags & ~(MSG_MORE | MSG_DONTWAIT | MSG_NOSIGNAL))
return -EOPNOTSUPP;
- lock_sock(sk);
+ mptcp_lock_sock(sk, __mptcp_wmem_reserve(sk, len));
timeo = sock_sndtimeo(sk, msg->msg_flags & MSG_DONTWAIT);
@@ -1318,11 +1587,11 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
}
pfrag = sk_page_frag(sk);
- mptcp_clean_una(sk);
while (msg_data_left(msg)) {
+ int total_ts, frag_truesize = 0;
struct mptcp_data_frag *dfrag;
- int frag_truesize = 0;
+ struct sk_buff_head skbs;
bool dfrag_collapsed;
size_t psize, offset;
@@ -1337,11 +1606,9 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
dfrag = mptcp_pending_tail(sk);
dfrag_collapsed = mptcp_frag_can_collapse_to(msk, pfrag, dfrag);
if (!dfrag_collapsed) {
- if (!sk_stream_memory_free(sk)) {
- mptcp_push_pending(sk, msg->msg_flags);
- if (!sk_stream_memory_free(sk))
- goto wait_for_memory;
- }
+ if (!sk_stream_memory_free(sk))
+ goto wait_for_memory;
+
if (!mptcp_page_frag_refill(sk, pfrag))
goto wait_for_memory;
@@ -1356,11 +1623,20 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
offset = dfrag->offset + dfrag->data_len;
psize = pfrag->size - offset;
psize = min_t(size_t, psize, msg_data_left(msg));
- if (!sk_wmem_schedule(sk, psize + frag_truesize))
+ total_ts = psize + frag_truesize;
+ __skb_queue_head_init(&skbs);
+ if (!mptcp_tx_cache_refill(sk, psize, &skbs, &total_ts))
goto wait_for_memory;
+ if (!mptcp_wmem_alloc(sk, total_ts)) {
+ __skb_queue_purge(&skbs);
+ goto wait_for_memory;
+ }
+
+ skb_queue_splice_tail(&skbs, &msk->skb_tx_cache);
if (copy_page_from_iter(dfrag->page, offset, psize,
&msg->msg_iter) != psize) {
+ mptcp_wmem_uncharge(sk, psize + frag_truesize);
ret = -EFAULT;
goto out;
}
@@ -1376,7 +1652,6 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
* Note: we charge such data both to sk and ssk
*/
sk_wmem_queued_add(sk, frag_truesize);
- sk->sk_forward_alloc -= frag_truesize;
if (!dfrag_collapsed) {
get_page(dfrag->page);
list_add_tail(&dfrag->list, &msk->rtx_queue);
@@ -1387,21 +1662,20 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
dfrag->data_seq, dfrag->data_len, dfrag->already_sent,
!dfrag_collapsed);
- if (!mptcp_ext_cache_refill(msk))
- goto wait_for_memory;
continue;
wait_for_memory:
- mptcp_nospace(msk);
- if (mptcp_timer_pending(sk))
- mptcp_reset_timer(sk);
+ set_bit(MPTCP_NOSPACE, &msk->flags);
+ mptcp_push_pending(sk, msg->msg_flags);
ret = sk_stream_wait_memory(sk, &timeo);
if (ret)
goto out;
}
- if (copied)
+ if (copied) {
+ msk->tx_pending_data += copied;
mptcp_push_pending(sk, msg->msg_flags);
+ }
out:
release_sock(sk);
@@ -1427,11 +1701,10 @@ static int __mptcp_recvmsg_mskq(struct mptcp_sock *msk,
struct msghdr *msg,
size_t len)
{
- struct sock *sk = (struct sock *)msk;
struct sk_buff *skb;
int copied = 0;
- while ((skb = skb_peek(&sk->sk_receive_queue)) != NULL) {
+ while ((skb = skb_peek(&msk->receive_queue)) != NULL) {
u32 offset = MPTCP_SKB_CB(skb)->offset;
u32 data_len = skb->len - offset;
u32 count = min_t(size_t, len - copied, data_len);
@@ -1451,7 +1724,10 @@ static int __mptcp_recvmsg_mskq(struct mptcp_sock *msk,
break;
}
- __skb_unlink(skb, &sk->sk_receive_queue);
+ /* we will bulk release the skb memory later */
+ skb->destructor = NULL;
+ msk->rmem_released += skb->truesize;
+ __skb_unlink(skb, &msk->receive_queue);
__kfree_skb(skb);
if (copied >= len)
@@ -1559,25 +1835,47 @@ new_measure:
msk->rcvq_space.time = mstamp;
}
+static void __mptcp_update_rmem(struct sock *sk)
+{
+ struct mptcp_sock *msk = mptcp_sk(sk);
+
+ if (!msk->rmem_released)
+ return;
+
+ atomic_sub(msk->rmem_released, &sk->sk_rmem_alloc);
+ sk_mem_uncharge(sk, msk->rmem_released);
+ msk->rmem_released = 0;
+}
+
+static void __mptcp_splice_receive_queue(struct sock *sk)
+{
+ struct mptcp_sock *msk = mptcp_sk(sk);
+
+ skb_queue_splice_tail_init(&sk->sk_receive_queue, &msk->receive_queue);
+}
+
static bool __mptcp_move_skbs(struct mptcp_sock *msk, unsigned int rcv)
{
+ struct sock *sk = (struct sock *)msk;
unsigned int moved = 0;
- bool done;
-
- /* avoid looping forever below on racing close */
- if (((struct sock *)msk)->sk_state == TCP_CLOSE)
- return false;
+ bool ret, done;
__mptcp_flush_join_list(msk);
do {
struct sock *ssk = mptcp_subflow_recv_lookup(msk);
bool slowpath;
- if (!ssk)
+ /* we can have data pending in the subflows only if the msk
+ * receive buffer was full at subflow_data_ready() time,
+ * that is an unlikely slow path.
+ */
+ if (likely(!ssk))
break;
slowpath = lock_sock_fast(ssk);
+ mptcp_data_lock(sk);
done = __mptcp_move_skbs_from_subflow(msk, ssk, &moved);
+ mptcp_data_unlock(sk);
if (moved && rcv) {
WRITE_ONCE(msk->rmem_pending, min(rcv, moved));
tcp_cleanup_rbuf(ssk, 1);
@@ -1586,11 +1884,19 @@ static bool __mptcp_move_skbs(struct mptcp_sock *msk, unsigned int rcv)
unlock_sock_fast(ssk, slowpath);
} while (!done);
- if (mptcp_ofo_queue(msk) || moved > 0) {
- mptcp_check_data_fin((struct sock *)msk);
- return true;
+ /* acquire the data lock only if some input data is pending */
+ ret = moved > 0;
+ if (!RB_EMPTY_ROOT(&msk->out_of_order_queue) ||
+ !skb_queue_empty_lockless(&sk->sk_receive_queue)) {
+ mptcp_data_lock(sk);
+ __mptcp_update_rmem(sk);
+ ret |= __mptcp_ofo_queue(msk);
+ __mptcp_splice_receive_queue(sk);
+ mptcp_data_unlock(sk);
}
- return false;
+ if (ret)
+ mptcp_check_data_fin((struct sock *)msk);
+ return !skb_queue_empty(&msk->receive_queue);
}
static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
@@ -1604,7 +1910,7 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
if (msg->msg_flags & ~(MSG_WAITALL | MSG_DONTWAIT))
return -EOPNOTSUPP;
- lock_sock(sk);
+ mptcp_lock_sock(sk, __mptcp_splice_receive_queue(sk));
if (unlikely(sk->sk_state == TCP_LISTEN)) {
copied = -ENOTCONN;
goto out_err;
@@ -1614,7 +1920,6 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
len = min_t(size_t, len, INT_MAX);
target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
- __mptcp_flush_join_list(msk);
for (;;) {
int bytes_read, old_space;
@@ -1628,7 +1933,7 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
copied += bytes_read;
- if (skb_queue_empty(&sk->sk_receive_queue) &&
+ if (skb_queue_empty(&msk->receive_queue) &&
__mptcp_move_skbs(msk, len - copied))
continue;
@@ -1659,8 +1964,14 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
if (test_and_clear_bit(MPTCP_WORK_EOF, &msk->flags))
mptcp_check_for_eof(msk);
- if (sk->sk_shutdown & RCV_SHUTDOWN)
+ if (sk->sk_shutdown & RCV_SHUTDOWN) {
+ /* race breaker: the shutdown could be after the
+ * previous receive queue check
+ */
+ if (__mptcp_move_skbs(msk, len - copied))
+ continue;
break;
+ }
if (sk->sk_state == TCP_CLOSE) {
copied = -ENOTCONN;
@@ -1682,7 +1993,8 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
mptcp_wait_data(sk, &timeo);
}
- if (skb_queue_empty(&sk->sk_receive_queue)) {
+ if (skb_queue_empty_lockless(&sk->sk_receive_queue) &&
+ skb_queue_empty(&msk->receive_queue)) {
/* entire backlog drained, clear DATA_READY. */
clear_bit(MPTCP_DATA_READY, &msk->flags);
@@ -1698,7 +2010,7 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
out_err:
pr_debug("msk=%p data_ready=%d rx queue empty=%d copied=%d",
msk, test_bit(MPTCP_DATA_READY, &msk->flags),
- skb_queue_empty(&sk->sk_receive_queue), copied);
+ skb_queue_empty_lockless(&sk->sk_receive_queue), copied);
mptcp_rcv_space_adjust(msk, copied);
release_sock(sk);
@@ -1709,12 +2021,8 @@ static void mptcp_retransmit_handler(struct sock *sk)
{
struct mptcp_sock *msk = mptcp_sk(sk);
- if (atomic64_read(&msk->snd_una) == READ_ONCE(msk->snd_nxt)) {
- mptcp_stop_timer(sk);
- } else {
- set_bit(MPTCP_WORK_RTX, &msk->flags);
- mptcp_schedule_work(sk);
- }
+ set_bit(MPTCP_WORK_RTX, &msk->flags);
+ mptcp_schedule_work(sk);
}
static void mptcp_retransmit_timer(struct timer_list *t)
@@ -1915,21 +2223,18 @@ static void mptcp_worker(struct work_struct *work)
if (unlikely(state == TCP_CLOSE))
goto unlock;
- mptcp_clean_una_wakeup(sk);
mptcp_check_data_fin_ack(sk);
__mptcp_flush_join_list(msk);
if (test_and_clear_bit(MPTCP_WORK_CLOSE_SUBFLOW, &msk->flags))
__mptcp_close_subflow(msk);
- if (mptcp_send_head(sk))
- mptcp_push_pending(sk, 0);
-
if (msk->pm.status)
pm_work(msk);
if (test_and_clear_bit(MPTCP_WORK_EOF, &msk->flags))
mptcp_check_for_eof(msk);
+ __mptcp_check_send_data_fin(sk);
mptcp_check_data_fin(sk);
/* if the msk data is completely acked, or the socket timedout,
@@ -1951,9 +2256,6 @@ static void mptcp_worker(struct work_struct *work)
if (!dfrag)
goto unlock;
- if (!mptcp_ext_cache_refill(msk))
- goto reset_unlock;
-
ssk = mptcp_subflow_get_retrans(msk);
if (!ssk)
goto reset_unlock;
@@ -1964,6 +2266,9 @@ static void mptcp_worker(struct work_struct *work)
info.sent = 0;
info.limit = dfrag->already_sent;
while (info.sent < dfrag->already_sent) {
+ if (!mptcp_alloc_tx_skb(sk, ssk))
+ break;
+
ret = mptcp_sendmsg_frag(sk, ssk, dfrag, &info);
if (ret <= 0)
break;
@@ -1971,9 +2276,6 @@ static void mptcp_worker(struct work_struct *work)
MPTCP_INC_STATS(sock_net(sk), MPTCP_MIB_RETRANSSEGS);
copied += ret;
info.sent += ret;
-
- if (!mptcp_ext_cache_refill(msk))
- break;
}
if (copied)
tcp_push(ssk, 0, info.mss_now, tcp_sk(ssk)->nonagle,
@@ -2001,8 +2303,14 @@ static int __mptcp_init_sock(struct sock *sk)
INIT_LIST_HEAD(&msk->join_list);
INIT_LIST_HEAD(&msk->rtx_queue);
INIT_WORK(&msk->work, mptcp_worker);
+ __skb_queue_head_init(&msk->receive_queue);
+ __skb_queue_head_init(&msk->skb_tx_cache);
msk->out_of_order_queue = RB_ROOT;
msk->first_pending = NULL;
+ msk->wmem_reserved = 0;
+ msk->rmem_released = 0;
+ msk->tx_pending_data = 0;
+ msk->size_goal_cache = TCP_BASE_MSS;
msk->ack_hint = NULL;
msk->first = NULL;
@@ -2046,12 +2354,15 @@ static void __mptcp_clear_xmit(struct sock *sk)
{
struct mptcp_sock *msk = mptcp_sk(sk);
struct mptcp_data_frag *dtmp, *dfrag;
-
- sk_stop_timer(sk, &msk->sk.icsk_retransmit_timer);
+ struct sk_buff *skb;
WRITE_ONCE(msk->first_pending, NULL);
list_for_each_entry_safe(dfrag, dtmp, &msk->rtx_queue, list)
dfrag_clear(sk, dfrag);
+ while ((skb = __skb_dequeue(&msk->skb_tx_cache)) != NULL) {
+ sk->sk_forward_alloc += skb->truesize;
+ kfree_skb(skb);
+ }
}
static void mptcp_cancel_work(struct sock *sk)
@@ -2186,7 +2497,7 @@ static void __mptcp_destroy_sock(struct sock *sk)
spin_unlock_bh(&msk->join_list_lock);
list_splice_init(&msk->conn_list, &conn_list);
- __mptcp_clear_xmit(sk);
+ sk_stop_timer(sk, &msk->sk.icsk_retransmit_timer);
sk_stop_timer(sk, &sk->sk_timer);
msk->pm.status = 0;
@@ -2197,6 +2508,8 @@ static void __mptcp_destroy_sock(struct sock *sk)
sk->sk_prot->destroy(sk);
+ WARN_ON_ONCE(msk->wmem_reserved);
+ WARN_ON_ONCE(msk->rmem_released);
sk_stream_kill_queues(sk);
xfrm_sk_free_policy(sk);
sk_refcnt_debug_release(sk);
@@ -2326,8 +2639,8 @@ struct sock *mptcp_sk_clone(const struct sock *sk,
msk->write_seq = subflow_req->idsn + 1;
msk->snd_nxt = msk->write_seq;
- atomic64_set(&msk->snd_una, msk->write_seq);
- atomic64_set(&msk->wnd_end, msk->snd_nxt + req->rsk_rcv_wnd);
+ msk->snd_una = msk->write_seq;
+ msk->wnd_end = msk->snd_nxt + req->rsk_rcv_wnd;
if (mp_opt->mp_capable) {
msk->can_ack = true;
@@ -2363,7 +2676,7 @@ void mptcp_rcv_space_init(struct mptcp_sock *msk, const struct sock *ssk)
if (msk->rcvq_space.space == 0)
msk->rcvq_space.space = TCP_INIT_CWND * TCP_MSS_DEFAULT;
- atomic64_set(&msk->wnd_end, msk->snd_nxt + tcp_sk(ssk)->snd_wnd);
+ WRITE_ONCE(msk->wnd_end, msk->snd_nxt + tcp_sk(ssk)->snd_wnd);
}
static struct sock *mptcp_accept(struct sock *sk, int flags, int *err,
@@ -2414,6 +2727,13 @@ static struct sock *mptcp_accept(struct sock *sk, int flags, int *err,
void mptcp_destroy_common(struct mptcp_sock *msk)
{
+ struct sock *sk = (struct sock *)msk;
+
+ __mptcp_clear_xmit(sk);
+
+ /* move to sk_receive_queue, sk_stream_kill_queues will purge it */
+ skb_queue_splice_tail_init(&msk->receive_queue, &sk->sk_receive_queue);
+
skb_rbtree_purge(&msk->out_of_order_queue);
mptcp_token_destroy(msk);
mptcp_pm_free_anno_list(msk);
@@ -2423,9 +2743,6 @@ static void mptcp_destroy(struct sock *sk)
{
struct mptcp_sock *msk = mptcp_sk(sk);
- if (msk->cached_ext)
- __skb_ext_put(msk->cached_ext);
-
mptcp_destroy_common(msk);
sk_sockets_allocated_dec(sk);
}
@@ -2540,15 +2857,58 @@ static int mptcp_getsockopt(struct sock *sk, int level, int optname,
return -EOPNOTSUPP;
}
+void __mptcp_data_acked(struct sock *sk)
+{
+ if (!sock_owned_by_user(sk))
+ __mptcp_clean_una(sk);
+ else
+ set_bit(MPTCP_CLEAN_UNA, &mptcp_sk(sk)->flags);
+
+ if (mptcp_pending_data_fin_ack(sk))
+ mptcp_schedule_work(sk);
+}
+
+void __mptcp_wnd_updated(struct sock *sk, struct sock *ssk)
+{
+ if (!mptcp_send_head(sk))
+ return;
+
+ if (!sock_owned_by_user(sk))
+ __mptcp_subflow_push_pending(sk, ssk);
+ else
+ set_bit(MPTCP_PUSH_PENDING, &mptcp_sk(sk)->flags);
+}
+
#define MPTCP_DEFERRED_ALL (TCPF_WRITE_TIMER_DEFERRED)
-/* this is very alike tcp_release_cb() but we must handle differently a
- * different set of events
- */
+/* processes deferred events and flush wmem */
static void mptcp_release_cb(struct sock *sk)
{
unsigned long flags, nflags;
+ /* push_pending may touch wmem_reserved, do it before the later
+ * cleanup
+ */
+ if (test_and_clear_bit(MPTCP_CLEAN_UNA, &mptcp_sk(sk)->flags))
+ __mptcp_clean_una(sk);
+ if (test_and_clear_bit(MPTCP_PUSH_PENDING, &mptcp_sk(sk)->flags)) {
+ /* mptcp_push_pending() acquires the subflow socket lock
+ *
+ * 1) can't be invoked in atomic scope
+ * 2) must avoid ABBA deadlock with msk socket spinlock: the RX
+ * datapath acquires the msk socket spinlock while helding
+ * the subflow socket lock
+ */
+
+ spin_unlock_bh(&sk->sk_lock.slock);
+ mptcp_push_pending(sk, 0);
+ spin_lock_bh(&sk->sk_lock.slock);
+ }
+
+ /* clear any wmem reservation and errors */
+ __mptcp_update_wmem(sk);
+ __mptcp_update_rmem(sk);
+
do {
flags = sk->sk_tsq_flags;
if (!(flags & MPTCP_DEFERRED_ALL))
@@ -2619,7 +2979,7 @@ void mptcp_finish_connect(struct sock *ssk)
WRITE_ONCE(msk->ack_seq, ack_seq);
WRITE_ONCE(msk->rcv_wnd_sent, ack_seq);
WRITE_ONCE(msk->can_ack, 1);
- atomic64_set(&msk->snd_una, msk->write_seq);
+ WRITE_ONCE(msk->snd_una, msk->write_seq);
mptcp_pm_new_connection(msk, 0);
@@ -2880,24 +3240,9 @@ static __poll_t mptcp_check_readable(struct mptcp_sock *msk)
0;
}
-static bool __mptcp_check_writeable(struct mptcp_sock *msk)
-{
- struct sock *sk = (struct sock *)msk;
- bool mptcp_writable;
-
- mptcp_clean_una(sk);
- mptcp_writable = sk_stream_is_writeable(sk);
- if (!mptcp_writable)
- mptcp_nospace(msk);
-
- return mptcp_writable;
-}
-
static __poll_t mptcp_check_writeable(struct mptcp_sock *msk)
{
struct sock *sk = (struct sock *)msk;
- __poll_t ret = 0;
- bool slow;
if (unlikely(sk->sk_shutdown & SEND_SHUTDOWN))
return 0;
@@ -2905,12 +3250,12 @@ static __poll_t mptcp_check_writeable(struct mptcp_sock *msk)
if (sk_stream_is_writeable(sk))
return EPOLLOUT | EPOLLWRNORM;
- slow = lock_sock_fast(sk);
- if (__mptcp_check_writeable(msk))
- ret = EPOLLOUT | EPOLLWRNORM;
+ set_bit(MPTCP_NOSPACE, &msk->flags);
+ smp_mb__after_atomic(); /* msk->flags is changed by write_space cb */
+ if (sk_stream_is_writeable(sk))
+ return EPOLLOUT | EPOLLWRNORM;
- unlock_sock_fast(sk, slow);
- return ret;
+ return 0;
}
static __poll_t mptcp_poll(struct file *file, struct socket *sock,
diff --git a/net/mptcp/protocol.h b/net/mptcp/protocol.h
index 82d5626323b1..fc56e730fb35 100644
--- a/net/mptcp/protocol.h
+++ b/net/mptcp/protocol.h
@@ -91,6 +91,8 @@
#define MPTCP_WORK_EOF 3
#define MPTCP_FALLBACK_DONE 4
#define MPTCP_WORK_CLOSE_SUBFLOW 5
+#define MPTCP_PUSH_PENDING 6
+#define MPTCP_CLEAN_UNA 7
static inline bool before64(__u64 seq1, __u64 seq2)
{
@@ -218,14 +220,16 @@ struct mptcp_sock {
u64 ack_seq;
u64 rcv_wnd_sent;
u64 rcv_data_fin_seq;
+ int wmem_reserved;
struct sock *last_snd;
int snd_burst;
int old_wspace;
- atomic64_t snd_una;
- atomic64_t wnd_end;
+ u64 snd_una;
+ u64 wnd_end;
unsigned long timer_ival;
u32 token;
int rmem_pending;
+ int rmem_released;
unsigned long flags;
bool can_ack;
bool fully_established;
@@ -237,11 +241,14 @@ struct mptcp_sock {
struct work_struct work;
struct sk_buff *ooo_last_skb;
struct rb_root out_of_order_queue;
+ struct sk_buff_head receive_queue;
+ struct sk_buff_head skb_tx_cache; /* this is wmem accounted */
+ int tx_pending_data;
+ int size_goal_cache;
struct list_head conn_list;
struct list_head rtx_queue;
struct mptcp_data_frag *first_pending;
struct list_head join_list;
- struct skb_ext *cached_ext; /* for the next sendmsg */
struct socket *subflow; /* outgoing connect/listener/!mp_capable */
struct sock *first;
struct mptcp_pm_data pm;
@@ -253,6 +260,22 @@ struct mptcp_sock {
} rcvq_space;
};
+#define mptcp_lock_sock(___sk, cb) do { \
+ struct sock *__sk = (___sk); /* silence macro reuse warning */ \
+ might_sleep(); \
+ spin_lock_bh(&__sk->sk_lock.slock); \
+ if (__sk->sk_lock.owned) \
+ __lock_sock(__sk); \
+ cb; \
+ __sk->sk_lock.owned = 1; \
+ spin_unlock(&__sk->sk_lock.slock); \
+ mutex_acquire(&__sk->sk_lock.dep_map, 0, 0, _RET_IP_); \
+ local_bh_enable(); \
+} while (0)
+
+#define mptcp_data_lock(sk) spin_lock_bh(&(sk)->sk_lock.slock)
+#define mptcp_data_unlock(sk) spin_unlock_bh(&(sk)->sk_lock.slock)
+
#define mptcp_for_each_subflow(__msk, __subflow) \
list_for_each_entry(__subflow, &((__msk)->conn_list), node)
@@ -300,7 +323,7 @@ static inline struct mptcp_data_frag *mptcp_rtx_tail(const struct sock *sk)
{
struct mptcp_sock *msk = mptcp_sk(sk);
- if (!before64(msk->snd_nxt, atomic64_read(&msk->snd_una)))
+ if (!before64(msk->snd_nxt, READ_ONCE(msk->snd_una)))
return NULL;
return list_last_entry(&msk->rtx_queue, struct mptcp_data_frag, list);
@@ -474,7 +497,8 @@ void mptcp_rcv_space_init(struct mptcp_sock *msk, const struct sock *ssk);
void mptcp_data_ready(struct sock *sk, struct sock *ssk);
bool mptcp_finish_join(struct sock *sk);
bool mptcp_schedule_work(struct sock *sk);
-void mptcp_data_acked(struct sock *sk);
+void __mptcp_wnd_updated(struct sock *sk, struct sock *ssk);
+void __mptcp_data_acked(struct sock *sk);
void mptcp_subflow_eof(struct sock *sk);
bool mptcp_update_rcv_data_fin(struct mptcp_sock *msk, u64 data_fin_seq, bool use_64bit);
void __mptcp_flush_join_list(struct mptcp_sock *msk);
diff --git a/net/mptcp/subflow.c b/net/mptcp/subflow.c
index 2e5c3f4da3a4..023c856424b7 100644
--- a/net/mptcp/subflow.c
+++ b/net/mptcp/subflow.c
@@ -995,19 +995,9 @@ static void subflow_data_ready(struct sock *sk)
mptcp_data_ready(parent, sk);
}
-static void subflow_write_space(struct sock *sk)
+static void subflow_write_space(struct sock *ssk)
{
- struct mptcp_subflow_context *subflow = mptcp_subflow_ctx(sk);
- struct socket *sock = READ_ONCE(sk->sk_socket);
- struct sock *parent = subflow->conn;
-
- if (!sk_stream_is_writeable(sk))
- return;
-
- if (sock && sk_stream_is_writeable(parent))
- clear_bit(SOCK_NOSPACE, &sock->flags);
-
- sk_stream_write_space(parent);
+ /* we take action in __mptcp_clean_una() */
}
static struct inet_connection_sock_af_ops *