aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/group.c
diff options
context:
space:
mode:
authorJon Maloy <jon.maloy@ericsson.com>2017-10-13 11:04:25 +0200
committerDavid S. Miller <davem@davemloft.net>2017-10-13 08:46:00 -0700
commitae236fb208a6fbbd2e7a6913385e8fb78ac807f8 (patch)
tree451b5798223b599434923a90243a94372d7c6cf1 /net/tipc/group.c
parenttipc: add second source address to recvmsg()/recvfrom() (diff)
downloadlinux-dev-ae236fb208a6fbbd2e7a6913385e8fb78ac807f8.tar.xz
linux-dev-ae236fb208a6fbbd2e7a6913385e8fb78ac807f8.zip
tipc: receive group membership events via member socket
Like with any other service, group members' availability can be subscribed for by connecting to be topology server. However, because the events arrive via a different socket than the member socket, there is a real risk that membership events my arrive out of synch with the actual JOIN/LEAVE action. I.e., it is possible to receive the first messages from a new member before the corresponding JOIN event arrives, just as it is possible to receive the last messages from a leaving member after the LEAVE event has already been received. Since each member socket is internally also subscribing for membership events, we now fix this problem by passing those events on to the user via the member socket. We leverage the already present member synch- ronization protocol to guarantee correct message/event order. An event is delivered to the user as an empty message where the two source addresses identify the new/lost member. Furthermore, we set the MSG_OOB bit in the message flags to mark it as an event. If the event is an indication about a member loss we also set the MSG_EOR bit, so it can be distinguished from a member addition event. Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Acked-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/group.c')
-rw-r--r--net/tipc/group.c60
1 files changed, 47 insertions, 13 deletions
diff --git a/net/tipc/group.c b/net/tipc/group.c
index beb214a3420c..1bfa9348b26d 100644
--- a/net/tipc/group.c
+++ b/net/tipc/group.c
@@ -59,6 +59,7 @@ enum mbr_state {
struct tipc_member {
struct rb_node tree_node;
struct list_head list;
+ struct sk_buff *event_msg;
u32 node;
u32 port;
u32 instance;
@@ -79,6 +80,7 @@ struct tipc_group {
u16 member_cnt;
u16 bc_snd_nxt;
bool loopback;
+ bool events;
};
static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
@@ -117,6 +119,7 @@ struct tipc_group *tipc_group_create(struct net *net, u32 portid,
grp->instance = mreq->instance;
grp->scope = mreq->scope;
grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK;
+ grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS;
if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid))
return grp;
kfree(grp);
@@ -279,6 +282,13 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
if (!msg_in_group(hdr))
goto drop;
+ if (mtyp == TIPC_GRP_MEMBER_EVT) {
+ if (!grp->events)
+ goto drop;
+ __skb_queue_tail(inputq, skb);
+ return;
+ }
+
m = tipc_group_find_member(grp, node, port);
if (!tipc_group_is_receiver(m))
goto drop;
@@ -311,6 +321,7 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
}
void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
+ struct sk_buff_head *inputq,
struct sk_buff_head *xmitq)
{
u32 node = msg_orignode(hdr);
@@ -332,10 +343,12 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr);
/* Wait until PUBLISH event is received */
- if (m->state == MBR_DISCOVERED)
+ if (m->state == MBR_DISCOVERED) {
m->state = MBR_JOINING;
- else if (m->state == MBR_PUBLISHED)
+ } else if (m->state == MBR_PUBLISHED) {
m->state = MBR_JOINED;
+ __skb_queue_tail(inputq, m->event_msg);
+ }
return;
case GRP_LEAVE_MSG:
if (!m)
@@ -347,6 +360,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
return;
}
/* Otherwise deliver already received WITHDRAW event */
+ __skb_queue_tail(inputq, m->event_msg);
tipc_group_delete_member(grp, m);
return;
default:
@@ -354,16 +368,17 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
}
}
-/* tipc_group_member_evt() - receive and handle a member up/down event
- */
void tipc_group_member_evt(struct tipc_group *grp,
struct sk_buff *skb,
+ struct sk_buff_head *inputq,
struct sk_buff_head *xmitq)
{
struct tipc_msg *hdr = buf_msg(skb);
struct tipc_event *evt = (void *)msg_data(hdr);
+ u32 instance = evt->found_lower;
u32 node = evt->port.node;
u32 port = evt->port.ref;
+ int event = evt->event;
struct tipc_member *m;
struct net *net;
u32 self;
@@ -376,32 +391,51 @@ void tipc_group_member_evt(struct tipc_group *grp,
if (!grp->loopback && node == self && port == grp->portid)
goto drop;
+ /* Convert message before delivery to user */
+ msg_set_hdr_sz(hdr, GROUP_H_SIZE);
+ msg_set_user(hdr, TIPC_CRITICAL_IMPORTANCE);
+ msg_set_type(hdr, TIPC_GRP_MEMBER_EVT);
+ msg_set_origport(hdr, port);
+ msg_set_orignode(hdr, node);
+ msg_set_nametype(hdr, grp->type);
+ msg_set_grp_evt(hdr, event);
+
m = tipc_group_find_member(grp, node, port);
- if (evt->event == TIPC_PUBLISHED) {
+ if (event == TIPC_PUBLISHED) {
if (!m)
m = tipc_group_create_member(grp, node, port,
MBR_DISCOVERED);
if (!m)
goto drop;
- /* Wait if JOIN message not yet received */
- if (m->state == MBR_DISCOVERED)
+ /* Hold back event if JOIN message not yet received */
+ if (m->state == MBR_DISCOVERED) {
+ m->event_msg = skb;
m->state = MBR_PUBLISHED;
- else
+ } else {
+ __skb_queue_tail(inputq, skb);
m->state = MBR_JOINED;
- m->instance = evt->found_lower;
+ }
+ m->instance = instance;
+ TIPC_SKB_CB(skb)->orig_member = m->instance;
tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
- } else if (evt->event == TIPC_WITHDRAWN) {
+ } else if (event == TIPC_WITHDRAWN) {
if (!m)
goto drop;
- /* Keep back event if more messages might be expected */
- if (m->state != MBR_LEAVING && tipc_node_is_up(net, node))
+ TIPC_SKB_CB(skb)->orig_member = m->instance;
+
+ /* Hold back event if more messages might be expected */
+ if (m->state != MBR_LEAVING && tipc_node_is_up(net, node)) {
+ m->event_msg = skb;
m->state = MBR_LEAVING;
- else
+ } else {
+ __skb_queue_tail(inputq, skb);
tipc_group_delete_member(grp, m);
+ }
}
+ return;
drop:
kfree_skb(skb);
}