aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/socket.c
diff options
context:
space:
mode:
authorJon Maloy <jon.maloy@ericsson.com>2017-10-13 11:04:26 +0200
committerDavid S. Miller <davem@davemloft.net>2017-10-13 08:46:00 -0700
commitb7d42635517fde2b095deddd0fba37be2302a285 (patch)
tree531940dc481a0e08ca7a2bbd0a328452344b82e1 /net/tipc/socket.c
parenttipc: receive group membership events via member socket (diff)
downloadlinux-dev-b7d42635517fde2b095deddd0fba37be2302a285.tar.xz
linux-dev-b7d42635517fde2b095deddd0fba37be2302a285.zip
tipc: introduce flow control for group broadcast messages
We introduce an end-to-end flow control mechanism for group broadcast messages. This ensures that no messages are ever lost because of destination receive buffer overflow, with minimal impact on performance. For now, the algorithm is based on the assumption that there is only one active transmitter at any moment in time. Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Acked-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/socket.c')
-rw-r--r--net/tipc/socket.c48
1 files changed, 36 insertions, 12 deletions
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 0a2eac309177..50145c95ac96 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -201,6 +201,11 @@ static bool tsk_conn_cong(struct tipc_sock *tsk)
return tsk->snt_unacked > tsk->snd_win;
}
+static u16 tsk_blocks(int len)
+{
+ return ((len / FLOWCTL_BLK_SZ) + 1);
+}
+
/* tsk_blocks(): translate a buffer size in bytes to number of
* advertisable blocks, taking into account the ratio truesize(len)/len
* We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ
@@ -831,6 +836,7 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
struct tipc_group *grp = tsk->group;
struct tipc_nlist *dsts = tipc_group_dests(grp);
struct tipc_mc_method *method = &tsk->mc_method;
+ int blks = tsk_blocks(MCAST_H_SIZE + dlen);
struct tipc_msg *hdr = &tsk->phdr;
int mtu = tipc_bcast_get_mtu(net);
struct sk_buff_head pkts;
@@ -839,14 +845,15 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
if (!dsts->local && !dsts->remote)
return -EHOSTUNREACH;
- /* Block or return if any destination link is congested */
- rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
+ /* Block or return if any destination link or member is congested */
+ rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt &&
+ !tipc_group_bc_cong(grp, blks));
if (unlikely(rc))
return rc;
/* Complete message header */
msg_set_type(hdr, TIPC_GRP_BCAST_MSG);
- msg_set_hdr_sz(hdr, MCAST_H_SIZE);
+ msg_set_hdr_sz(hdr, GROUP_H_SIZE);
msg_set_destport(hdr, 0);
msg_set_destnode(hdr, 0);
msg_set_nameinst(hdr, 0);
@@ -864,9 +871,8 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
if (unlikely(rc))
return rc;
- /* Update broadcast sequence number */
- tipc_group_update_bc_members(tsk->group);
-
+ /* Update broadcast sequence number and send windows */
+ tipc_group_update_bc_members(tsk->group, blks);
return dlen;
}
@@ -1024,7 +1030,7 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
return -EMSGSIZE;
- if (unlikely(grp))
+ if (unlikely(grp && !dest))
return tipc_send_group_bcast(sock, m, dlen, timeout);
if (unlikely(!dest)) {
@@ -1420,6 +1426,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
bool connected = !tipc_sk_type_connectionless(sk);
struct tipc_sock *tsk = tipc_sk(sk);
int rc, err, hlen, dlen, copy;
+ struct sk_buff_head xmitq;
struct tipc_msg *hdr;
struct sk_buff *skb;
bool grp_evt;
@@ -1436,8 +1443,8 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
}
timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
+ /* Step rcv queue to first msg with data or error; wait if necessary */
do {
- /* Look at first msg in receive queue; wait if necessary */
rc = tipc_wait_for_rcvmsg(sock, &timeout);
if (unlikely(rc))
goto exit;
@@ -1485,12 +1492,21 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
if (unlikely(flags & MSG_PEEK))
goto exit;
+ /* Send group flow control advertisement when applicable */
+ if (tsk->group && msg_in_group(hdr) && !grp_evt) {
+ skb_queue_head_init(&xmitq);
+ tipc_group_update_rcv_win(tsk->group, tsk_blocks(hlen + dlen),
+ msg_orignode(hdr), msg_origport(hdr),
+ &xmitq);
+ tipc_node_distr_xmit(sock_net(sk), &xmitq);
+ }
+
tsk_advance_rx_queue(sk);
if (likely(!connected))
goto exit;
- /* Send connection flow control ack when applicable */
+ /* Send connection flow control advertisement when applicable */
tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)
tipc_sk_send_ack(tsk);
@@ -1650,6 +1666,7 @@ static void tipc_sk_proto_rcv(struct sock *sk,
struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_msg *hdr = buf_msg(skb);
struct tipc_group *grp = tsk->group;
+ bool wakeup = false;
switch (msg_user(hdr)) {
case CONN_MANAGER:
@@ -1658,19 +1675,23 @@ static void tipc_sk_proto_rcv(struct sock *sk,
case SOCK_WAKEUP:
tipc_dest_del(&tsk->cong_links, msg_orignode(hdr), 0);
tsk->cong_link_cnt--;
- sk->sk_write_space(sk);
+ wakeup = true;
break;
case GROUP_PROTOCOL:
- tipc_group_proto_rcv(grp, hdr, inputq, xmitq);
+ tipc_group_proto_rcv(grp, &wakeup, hdr, inputq, xmitq);
break;
case TOP_SRV:
- tipc_group_member_evt(tsk->group, skb, inputq, xmitq);
+ tipc_group_member_evt(tsk->group, &wakeup, &sk->sk_rcvbuf,
+ skb, inputq, xmitq);
skb = NULL;
break;
default:
break;
}
+ if (wakeup)
+ sk->sk_write_space(sk);
+
kfree_skb(skb);
}
@@ -1785,6 +1806,9 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_msg *hdr = buf_msg(skb);
+ if (unlikely(msg_in_group(hdr)))
+ return sk->sk_rcvbuf;
+
if (unlikely(!msg_connected(hdr)))
return sk->sk_rcvbuf << msg_importance(hdr);