aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/bcast.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/bcast.c')
-rw-r--r--net/tipc/bcast.c57
1 files changed, 35 insertions, 22 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 3eaa931e2e8c..81b1fef1f5e0 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -79,6 +79,13 @@ static void tipc_bclink_unlock(struct net *net)
tipc_link_reset_all(node);
}
+void tipc_bclink_input(struct net *net)
+{
+ struct tipc_net *tn = net_generic(net, tipc_net_id);
+
+ tipc_sk_mcast_rcv(net, &tn->bclink->arrvq, &tn->bclink->inputq);
+}
+
uint tipc_bclink_get_mtu(void)
{
return MAX_PKT_DEFAULT_MCAST;
@@ -356,7 +363,7 @@ static void bclink_peek_nack(struct net *net, struct tipc_msg *msg)
tipc_node_unlock(n_ptr);
}
-/* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster
+/* tipc_bclink_xmit - deliver buffer chain to all nodes in cluster
* and to identified node local sockets
* @net: the applicable net namespace
* @list: chain of buffers containing message
@@ -371,6 +378,8 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
int rc = 0;
int bc = 0;
struct sk_buff *skb;
+ struct sk_buff_head arrvq;
+ struct sk_buff_head inputq;
/* Prepare clone of message for local node */
skb = tipc_msg_reassemble(list);
@@ -379,7 +388,7 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
return -EHOSTUNREACH;
}
- /* Broadcast to all other nodes */
+ /* Broadcast to all nodes */
if (likely(bclink)) {
tipc_bclink_lock(net);
if (likely(bclink->bcast_nodes.count)) {
@@ -399,12 +408,15 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
if (unlikely(!bc))
__skb_queue_purge(list);
- /* Deliver message clone */
- if (likely(!rc))
- tipc_sk_mcast_rcv(net, skb);
- else
+ if (unlikely(rc)) {
kfree_skb(skb);
-
+ return rc;
+ }
+ /* Deliver message clone */
+ __skb_queue_head_init(&arrvq);
+ skb_queue_head_init(&inputq);
+ __skb_queue_tail(&arrvq, skb);
+ tipc_sk_mcast_rcv(net, &arrvq, &inputq);
return rc;
}
@@ -449,7 +461,7 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
int deferred = 0;
int pos = 0;
struct sk_buff *iskb;
- struct sk_buff_head msgs;
+ struct sk_buff_head *arrvq, *inputq;
/* Screen out unwanted broadcast messages */
if (msg_mc_netid(msg) != tn->net_id)
@@ -486,6 +498,8 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
/* Handle in-sequence broadcast message */
seqno = msg_seqno(msg);
next_in = mod(node->bclink.last_in + 1);
+ arrvq = &tn->bclink->arrvq;
+ inputq = &tn->bclink->inputq;
if (likely(seqno == next_in)) {
receive:
@@ -493,21 +507,26 @@ receive:
if (likely(msg_isdata(msg))) {
tipc_bclink_lock(net);
bclink_accept_pkt(node, seqno);
+ spin_lock_bh(&inputq->lock);
+ __skb_queue_tail(arrvq, buf);
+ spin_unlock_bh(&inputq->lock);
+ node->action_flags |= TIPC_BCAST_MSG_EVT;
tipc_bclink_unlock(net);
tipc_node_unlock(node);
- if (likely(msg_mcast(msg)))
- tipc_sk_mcast_rcv(net, buf);
- else
- kfree_skb(buf);
} else if (msg_user(msg) == MSG_BUNDLER) {
tipc_bclink_lock(net);
bclink_accept_pkt(node, seqno);
bcl->stats.recv_bundles++;
bcl->stats.recv_bundled += msg_msgcnt(msg);
+ pos = 0;
+ while (tipc_msg_extract(buf, &iskb, &pos)) {
+ spin_lock_bh(&inputq->lock);
+ __skb_queue_tail(arrvq, iskb);
+ spin_unlock_bh(&inputq->lock);
+ }
+ node->action_flags |= TIPC_BCAST_MSG_EVT;
tipc_bclink_unlock(net);
tipc_node_unlock(node);
- while (tipc_msg_extract(buf, &iskb, &pos))
- tipc_sk_mcast_rcv(net, iskb);
} else if (msg_user(msg) == MSG_FRAGMENTER) {
tipc_buf_append(&node->bclink.reasm_buf, &buf);
if (unlikely(!buf && !node->bclink.reasm_buf))
@@ -523,14 +542,6 @@ receive:
}
tipc_bclink_unlock(net);
tipc_node_unlock(node);
- } else if (msg_user(msg) == NAME_DISTRIBUTOR) {
- tipc_bclink_lock(net);
- bclink_accept_pkt(node, seqno);
- tipc_bclink_unlock(net);
- tipc_node_unlock(node);
- skb_queue_head_init(&msgs);
- skb_queue_tail(&msgs, buf);
- tipc_named_rcv(net, &msgs);
} else {
tipc_bclink_lock(net);
bclink_accept_pkt(node, seqno);
@@ -950,6 +961,8 @@ int tipc_bclink_init(struct net *net)
skb_queue_head_init(&bcl->wakeupq);
bcl->next_out_no = 1;
spin_lock_init(&bclink->node.lock);
+ __skb_queue_head_init(&bclink->arrvq);
+ skb_queue_head_init(&bclink->inputq);
bcl->owner = &bclink->node;
bcl->owner->net = net;
bcl->max_pkt = MAX_PKT_DEFAULT_MCAST;