diff options
Diffstat (limited to 'net/tipc')
| -rw-r--r-- | net/tipc/bearer.c | 101 | ||||
| -rw-r--r-- | net/tipc/bearer.h | 15 | ||||
| -rw-r--r-- | net/tipc/core.c | 8 | ||||
| -rw-r--r-- | net/tipc/discover.c | 7 | ||||
| -rw-r--r-- | net/tipc/discover.h | 2 | ||||
| -rw-r--r-- | net/tipc/link.c | 52 | ||||
| -rw-r--r-- | net/tipc/link.h | 2 | ||||
| -rw-r--r-- | net/tipc/msg.h | 29 | ||||
| -rw-r--r-- | net/tipc/netlink_compat.c | 2 | ||||
| -rw-r--r-- | net/tipc/node.c | 33 | ||||
| -rw-r--r-- | net/tipc/node.h | 6 | ||||
| -rw-r--r-- | net/tipc/server.c | 27 | ||||
| -rw-r--r-- | net/tipc/server.h | 4 | ||||
| -rw-r--r-- | net/tipc/socket.c | 149 | ||||
| -rw-r--r-- | net/tipc/socket.h | 17 | ||||
| -rw-r--r-- | net/tipc/subscr.c | 7 | 
16 files changed, 288 insertions, 173 deletions
diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c index 27a5406213c6..6f11c62bc8f9 100644 --- a/net/tipc/bearer.c +++ b/net/tipc/bearer.c @@ -205,6 +205,7 @@ static int tipc_enable_bearer(struct net *net, const char *name,  	struct tipc_bearer *b;  	struct tipc_media *m;  	struct tipc_bearer_names b_names; +	struct sk_buff *skb;  	char addr_string[16];  	u32 bearer_id;  	u32 with_this_prio; @@ -301,7 +302,7 @@ restart:  	b->net_plane = bearer_id + 'A';  	b->priority = priority; -	res = tipc_disc_create(net, b, &b->bcast_addr); +	res = tipc_disc_create(net, b, &b->bcast_addr, &skb);  	if (res) {  		bearer_disable(net, b);  		pr_warn("Bearer <%s> rejected, discovery object creation failed\n", @@ -310,7 +311,8 @@ restart:  	}  	rcu_assign_pointer(tn->bearer_list[bearer_id], b); - +	if (skb) +		tipc_bearer_xmit_skb(net, bearer_id, skb, &b->bcast_addr);  	pr_info("Enabled bearer <%s>, discovery domain %s, priority %u\n",  		name,  		tipc_addr_string_fill(addr_string, disc_domain), priority); @@ -335,23 +337,16 @@ static int tipc_reset_bearer(struct net *net, struct tipc_bearer *b)   */  static void bearer_disable(struct net *net, struct tipc_bearer *b)  { -	struct tipc_net *tn = net_generic(net, tipc_net_id); -	u32 i; +	struct tipc_net *tn = tipc_net(net); +	int bearer_id = b->identity;  	pr_info("Disabling bearer <%s>\n", b->name);  	b->media->disable_media(b); - -	tipc_node_delete_links(net, b->identity); +	tipc_node_delete_links(net, bearer_id);  	RCU_INIT_POINTER(b->media_ptr, NULL);  	if (b->link_req)  		tipc_disc_delete(b->link_req); - -	for (i = 0; i < MAX_BEARERS; i++) { -		if (b == rtnl_dereference(tn->bearer_list[i])) { -			RCU_INIT_POINTER(tn->bearer_list[i], NULL); -			break; -		} -	} +	RCU_INIT_POINTER(tn->bearer_list[bearer_id], NULL);  	kfree_rcu(b, rcu);  } @@ -394,7 +389,7 @@ void tipc_disable_l2_media(struct tipc_bearer *b)  /**   * tipc_l2_send_msg - send a TIPC packet out over an L2 interface - * @buf: the packet to be sent + * @skb: the packet to be sent   * @b: the bearer through which the packet is to be sent   * @dest: peer destination address   */ @@ -403,17 +398,21 @@ int tipc_l2_send_msg(struct net *net, struct sk_buff *skb,  {  	struct net_device *dev;  	int delta; +	void *tipc_ptr;  	dev = (struct net_device *)rcu_dereference_rtnl(b->media_ptr);  	if (!dev)  		return 0; +	/* Send RESET message even if bearer is detached from device */ +	tipc_ptr = rtnl_dereference(dev->tipc_ptr); +	if (unlikely(!tipc_ptr && !msg_is_reset(buf_msg(skb)))) +		goto drop; +  	delta = dev->hard_header_len - skb_headroom(skb);  	if ((delta > 0) && -	    pskb_expand_head(skb, SKB_DATA_ALIGN(delta), 0, GFP_ATOMIC)) { -		kfree_skb(skb); -		return 0; -	} +	    pskb_expand_head(skb, SKB_DATA_ALIGN(delta), 0, GFP_ATOMIC)) +		goto drop;  	skb_reset_network_header(skb);  	skb->dev = dev; @@ -422,6 +421,9 @@ int tipc_l2_send_msg(struct net *net, struct sk_buff *skb,  			dev->dev_addr, skb->len);  	dev_queue_xmit(skb);  	return 0; +drop: +	kfree_skb(skb); +	return 0;  }  int tipc_bearer_mtu(struct net *net, u32 bearer_id) @@ -450,6 +452,8 @@ void tipc_bearer_xmit_skb(struct net *net, u32 bearer_id,  	b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);  	if (likely(b))  		b->media->send_msg(net, skb, b, dest); +	else +		kfree_skb(skb);  	rcu_read_unlock();  } @@ -468,11 +472,11 @@ void tipc_bearer_xmit(struct net *net, u32 bearer_id,  	rcu_read_lock();  	b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]); -	if (likely(b)) { -		skb_queue_walk_safe(xmitq, skb, tmp) { -			__skb_dequeue(xmitq); -			b->media->send_msg(net, skb, b, dst); -		} +	if (unlikely(!b)) +		__skb_queue_purge(xmitq); +	skb_queue_walk_safe(xmitq, skb, tmp) { +		__skb_dequeue(xmitq); +		b->media->send_msg(net, skb, b, dst);  	}  	rcu_read_unlock();  } @@ -490,14 +494,14 @@ void tipc_bearer_bc_xmit(struct net *net, u32 bearer_id,  	rcu_read_lock();  	b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]); -	if (likely(b)) { -		skb_queue_walk_safe(xmitq, skb, tmp) { -			hdr = buf_msg(skb); -			msg_set_non_seq(hdr, 1); -			msg_set_mc_netid(hdr, net_id); -			__skb_dequeue(xmitq); -			b->media->send_msg(net, skb, b, &b->bcast_addr); -		} +	if (unlikely(!b)) +		__skb_queue_purge(xmitq); +	skb_queue_walk_safe(xmitq, skb, tmp) { +		hdr = buf_msg(skb); +		msg_set_non_seq(hdr, 1); +		msg_set_mc_netid(hdr, net_id); +		__skb_dequeue(xmitq); +		b->media->send_msg(net, skb, b, &b->bcast_addr);  	}  	rcu_read_unlock();  } @@ -513,24 +517,21 @@ void tipc_bearer_bc_xmit(struct net *net, u32 bearer_id,   * ignores packets sent using interface multicast, and traffic sent to other   * nodes (which can happen if interface is running in promiscuous mode).   */ -static int tipc_l2_rcv_msg(struct sk_buff *buf, struct net_device *dev, +static int tipc_l2_rcv_msg(struct sk_buff *skb, struct net_device *dev,  			   struct packet_type *pt, struct net_device *orig_dev)  {  	struct tipc_bearer *b;  	rcu_read_lock();  	b = rcu_dereference_rtnl(dev->tipc_ptr); -	if (likely(b)) { -		if (likely(buf->pkt_type <= PACKET_BROADCAST)) { -			buf->next = NULL; -			tipc_rcv(dev_net(dev), buf, b); -			rcu_read_unlock(); -			return NET_RX_SUCCESS; -		} +	if (likely(b && (skb->pkt_type <= PACKET_BROADCAST))) { +		skb->next = NULL; +		tipc_rcv(dev_net(dev), skb, b); +		rcu_read_unlock(); +		return NET_RX_SUCCESS;  	}  	rcu_read_unlock(); - -	kfree_skb(buf); +	kfree_skb(skb);  	return NET_RX_DROP;  } @@ -548,9 +549,18 @@ static int tipc_l2_device_event(struct notifier_block *nb, unsigned long evt,  {  	struct net_device *dev = netdev_notifier_info_to_dev(ptr);  	struct net *net = dev_net(dev); +	struct tipc_net *tn = tipc_net(net);  	struct tipc_bearer *b; +	int i;  	b = rtnl_dereference(dev->tipc_ptr); +	if (!b) { +		for (i = 0; i < MAX_BEARERS; b = NULL, i++) { +			b = rtnl_dereference(tn->bearer_list[i]); +			if (b && (b->media_ptr == dev)) +				break; +		} +	}  	if (!b)  		return NOTIFY_DONE; @@ -560,13 +570,20 @@ static int tipc_l2_device_event(struct notifier_block *nb, unsigned long evt,  	case NETDEV_CHANGE:  		if (netif_carrier_ok(dev))  			break; +	case NETDEV_UP: +		rcu_assign_pointer(dev->tipc_ptr, b); +		break;  	case NETDEV_GOING_DOWN: +		RCU_INIT_POINTER(dev->tipc_ptr, NULL); +		synchronize_net(); +		tipc_reset_bearer(net, b); +		break;  	case NETDEV_CHANGEMTU:  		tipc_reset_bearer(net, b);  		break;  	case NETDEV_CHANGEADDR:  		b->media->raw2addr(b, &b->addr, -				       (char *)dev->dev_addr); +				   (char *)dev->dev_addr);  		tipc_reset_bearer(net, b);  		break;  	case NETDEV_UNREGISTER: diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h index e31820516774..f686e41b5abb 100644 --- a/net/tipc/bearer.h +++ b/net/tipc/bearer.h @@ -42,8 +42,6 @@  #include <net/genetlink.h>  #define MAX_MEDIA	3 -#define MAX_NODES	4096 -#define WSIZE		32  /* Identifiers associated with TIPC message header media address info   * - address info field is 32 bytes long @@ -62,16 +60,6 @@  #define TIPC_MEDIA_TYPE_UDP	3  /** - * struct tipc_node_map - set of node identifiers - * @count: # of nodes in set - * @map: bitmap of node identifiers that are in the set - */ -struct tipc_node_map { -	u32 count; -	u32 map[MAX_NODES / WSIZE]; -}; - -/**   * struct tipc_media_addr - destination address used by TIPC bearers   * @value: address info (format defined by media)   * @media_id: TIPC media type identifier @@ -142,7 +130,6 @@ struct tipc_media {   * @identity: array index of this bearer within TIPC bearer array   * @link_req: ptr to (optional) structure making periodic link setup requests   * @net_plane: network plane ('A' through 'H') currently associated with bearer - * @nodes: indicates which nodes in cluster can be reached through bearer   *   * Note: media-specific code is responsible for initialization of the fields   * indicated below when a bearer is enabled; TIPC's generic bearer code takes @@ -163,8 +150,6 @@ struct tipc_bearer {  	u32 identity;  	struct tipc_link_req *link_req;  	char net_plane; -	int node_cnt; -	struct tipc_node_map nodes;  };  struct tipc_bearer_names { diff --git a/net/tipc/core.c b/net/tipc/core.c index e2bdb07a49a2..fe1b062c4f18 100644 --- a/net/tipc/core.c +++ b/net/tipc/core.c @@ -112,11 +112,9 @@ static int __init tipc_init(void)  	pr_info("Activated (version " TIPC_MOD_VER ")\n"); -	sysctl_tipc_rmem[0] = TIPC_CONN_OVERLOAD_LIMIT >> 4 << -			      TIPC_LOW_IMPORTANCE; -	sysctl_tipc_rmem[1] = TIPC_CONN_OVERLOAD_LIMIT >> 4 << -			      TIPC_CRITICAL_IMPORTANCE; -	sysctl_tipc_rmem[2] = TIPC_CONN_OVERLOAD_LIMIT; +	sysctl_tipc_rmem[0] = RCVBUF_MIN; +	sysctl_tipc_rmem[1] = RCVBUF_DEF; +	sysctl_tipc_rmem[2] = RCVBUF_MAX;  	err = tipc_netlink_start();  	if (err) diff --git a/net/tipc/discover.c b/net/tipc/discover.c index f1e738e80535..ad9d477cc242 100644 --- a/net/tipc/discover.c +++ b/net/tipc/discover.c @@ -268,10 +268,9 @@ exit:   * Returns 0 if successful, otherwise -errno.   */  int tipc_disc_create(struct net *net, struct tipc_bearer *b, -		     struct tipc_media_addr *dest) +		     struct tipc_media_addr *dest, struct sk_buff **skb)  {  	struct tipc_link_req *req; -	struct sk_buff *skb;  	req = kmalloc(sizeof(*req), GFP_ATOMIC);  	if (!req) @@ -293,9 +292,7 @@ int tipc_disc_create(struct net *net, struct tipc_bearer *b,  	setup_timer(&req->timer, disc_timeout, (unsigned long)req);  	mod_timer(&req->timer, jiffies + req->timer_intv);  	b->link_req = req; -	skb = skb_clone(req->buf, GFP_ATOMIC); -	if (skb) -		tipc_bearer_xmit_skb(net, req->bearer_id, skb, &req->dest); +	*skb = skb_clone(req->buf, GFP_ATOMIC);  	return 0;  } diff --git a/net/tipc/discover.h b/net/tipc/discover.h index c9b12770c5ed..b80a335389c0 100644 --- a/net/tipc/discover.h +++ b/net/tipc/discover.h @@ -40,7 +40,7 @@  struct tipc_link_req;  int tipc_disc_create(struct net *net, struct tipc_bearer *b_ptr, -		     struct tipc_media_addr *dest); +		     struct tipc_media_addr *dest, struct sk_buff **skb);  void tipc_disc_delete(struct tipc_link_req *req);  void tipc_disc_reset(struct net *net, struct tipc_bearer *b_ptr);  void tipc_disc_add_dest(struct tipc_link_req *req); diff --git a/net/tipc/link.c b/net/tipc/link.c index 7d2bb3e70baa..7059c94f33c5 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -140,6 +140,7 @@ struct tipc_link {  	char if_name[TIPC_MAX_IF_NAME];  	u32 priority;  	char net_plane; +	u16 rst_cnt;  	/* Failover/synch */  	u16 drop_point; @@ -701,40 +702,34 @@ static void link_profile_stats(struct tipc_link *l)  /* tipc_link_timeout - perform periodic task as instructed from node timeout   */ -/* tipc_link_timeout - perform periodic task as instructed from node timeout - */  int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)  { -	int rc = 0; -	int mtyp = STATE_MSG; -	bool xmit = false; -	bool prb = false; +	int mtyp, rc = 0; +	bool state = false; +	bool probe = false; +	bool setup = false;  	u16 bc_snt = l->bc_sndlink->snd_nxt - 1;  	u16 bc_acked = l->bc_rcvlink->acked; -	bool bc_up = link_is_up(l->bc_rcvlink);  	link_profile_stats(l);  	switch (l->state) {  	case LINK_ESTABLISHED:  	case LINK_SYNCHING: -		if (!l->silent_intv_cnt) { -			if (bc_up && (bc_acked != bc_snt)) -				xmit = true; -		} else if (l->silent_intv_cnt <= l->abort_limit) { -			xmit = true; -			prb = true; -		} else { -			rc |= tipc_link_fsm_evt(l, LINK_FAILURE_EVT); -		} +		if (l->silent_intv_cnt > l->abort_limit) +			return tipc_link_fsm_evt(l, LINK_FAILURE_EVT); +		mtyp = STATE_MSG; +		state = bc_acked != bc_snt; +		probe = l->silent_intv_cnt;  		l->silent_intv_cnt++;  		break;  	case LINK_RESET: -		xmit = true; +		setup = l->rst_cnt++ <= 4; +		setup |= !(l->rst_cnt % 16);  		mtyp = RESET_MSG;  		break;  	case LINK_ESTABLISHING: -		xmit = true; +		setup = true;  		mtyp = ACTIVATE_MSG;  		break;  	case LINK_PEER_RESET: @@ -745,8 +740,8 @@ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)  		break;  	} -	if (xmit) -		tipc_link_build_proto_msg(l, mtyp, prb, 0, 0, 0, xmitq); +	if (state || probe || setup) +		tipc_link_build_proto_msg(l, mtyp, probe, 0, 0, 0, xmitq);  	return rc;  } @@ -833,6 +828,7 @@ void tipc_link_reset(struct tipc_link *l)  	l->rcv_nxt = 1;  	l->acked = 0;  	l->silent_intv_cnt = 0; +	l->rst_cnt = 0;  	l->stats.recv_info = 0;  	l->stale_count = 0;  	l->bc_peer_is_up = false; @@ -1110,12 +1106,12 @@ static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked)  	return released;  } -/* tipc_link_build_ack_msg: prepare link acknowledge message for transmission +/* tipc_link_build_state_msg: prepare link state message for transmission   *   * Note that sending of broadcast ack is coordinated among nodes, to reduce   * risk of ack storms towards the sender   */ -int tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq) +int tipc_link_build_state_msg(struct tipc_link *l, struct sk_buff_head *xmitq)  {  	if (!l)  		return 0; @@ -1140,11 +1136,17 @@ int tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq)  void tipc_link_build_reset_msg(struct tipc_link *l, struct sk_buff_head *xmitq)  {  	int mtyp = RESET_MSG; +	struct sk_buff *skb;  	if (l->state == LINK_ESTABLISHING)  		mtyp = ACTIVATE_MSG;  	tipc_link_build_proto_msg(l, mtyp, 0, 0, 0, 0, xmitq); + +	/* Inform peer that this endpoint is going down if applicable */ +	skb = skb_peek_tail(xmitq); +	if (skb && (l->state == LINK_RESET)) +		msg_set_peer_stopping(buf_msg(skb), 1);  }  /* tipc_link_build_nack_msg: prepare link nack message for transmission @@ -1219,7 +1221,7 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,  		if (!tipc_data_input(l, skb, l->inputq))  			rc |= tipc_link_input(l, skb, l->inputq);  		if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) -			rc |= tipc_link_build_ack_msg(l, xmitq); +			rc |= tipc_link_build_state_msg(l, xmitq);  		if (unlikely(rc & ~TIPC_LINK_SND_BC_ACK))  			break;  	} while ((skb = __skb_dequeue(defq))); @@ -1411,7 +1413,9 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,  			l->priority = peers_prio;  		/* ACTIVATE_MSG serves as PEER_RESET if link is already down */ -		if ((mtyp == RESET_MSG) || !link_is_up(l)) +		if (msg_peer_stopping(hdr)) +			rc = tipc_link_fsm_evt(l, LINK_FAILURE_EVT); +		else if ((mtyp == RESET_MSG) || !link_is_up(l))  			rc = tipc_link_fsm_evt(l, LINK_PEER_RESET_EVT);  		/* ACTIVATE_MSG takes up link if it was already locally reset */ diff --git a/net/tipc/link.h b/net/tipc/link.h index 6a94175ee20a..d7e9d42fcb2d 100644 --- a/net/tipc/link.h +++ b/net/tipc/link.h @@ -123,7 +123,7 @@ int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[]);  int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq);  int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,  		  struct sk_buff_head *xmitq); -int tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq); +int tipc_link_build_state_msg(struct tipc_link *l, struct sk_buff_head *xmitq);  void tipc_link_add_bc_peer(struct tipc_link *snd_l,  			   struct tipc_link *uc_l,  			   struct sk_buff_head *xmitq); diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 55778a0aebf3..024da8af91f0 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -715,6 +715,16 @@ static inline void msg_set_redundant_link(struct tipc_msg *m, u32 r)  	msg_set_bits(m, 5, 12, 0x1, r);  } +static inline u32 msg_peer_stopping(struct tipc_msg *m) +{ +	return msg_bits(m, 5, 13, 0x1); +} + +static inline void msg_set_peer_stopping(struct tipc_msg *m, u32 s) +{ +	msg_set_bits(m, 5, 13, 0x1, s); +} +  static inline char *msg_media_addr(struct tipc_msg *m)  {  	return (char *)&m->hdr[TIPC_MEDIA_INFO_OFFSET]; @@ -733,16 +743,26 @@ static inline void msg_set_msgcnt(struct tipc_msg *m, u16 n)  	msg_set_bits(m, 9, 16, 0xffff, n);  } -static inline u32 msg_bcast_tag(struct tipc_msg *m) +static inline u32 msg_conn_ack(struct tipc_msg *m)  {  	return msg_bits(m, 9, 16, 0xffff);  } -static inline void msg_set_bcast_tag(struct tipc_msg *m, u32 n) +static inline void msg_set_conn_ack(struct tipc_msg *m, u32 n)  {  	msg_set_bits(m, 9, 16, 0xffff, n);  } +static inline u32 msg_adv_win(struct tipc_msg *m) +{ +	return msg_bits(m, 9, 0, 0xffff); +} + +static inline void msg_set_adv_win(struct tipc_msg *m, u32 n) +{ +	msg_set_bits(m, 9, 0, 0xffff, n); +} +  static inline u32 msg_max_pkt(struct tipc_msg *m)  {  	return msg_bits(m, 9, 16, 0xffff) * 4; @@ -779,6 +799,11 @@ static inline bool msg_peer_node_is_up(struct tipc_msg *m)  	return msg_redundant_link(m);  } +static inline bool msg_is_reset(struct tipc_msg *hdr) +{ +	return (msg_user(hdr) == LINK_PROTOCOL) && (msg_type(hdr) == RESET_MSG); +} +  struct sk_buff *tipc_buf_acquire(u32 size);  bool tipc_msg_validate(struct sk_buff *skb);  bool tipc_msg_reverse(u32 own_addr, struct sk_buff **skb, int err); diff --git a/net/tipc/netlink_compat.c b/net/tipc/netlink_compat.c index d7d050f44fc1..4dfc5c14f8c3 100644 --- a/net/tipc/netlink_compat.c +++ b/net/tipc/netlink_compat.c @@ -802,7 +802,7 @@ static int tipc_nl_compat_name_table_dump(struct tipc_nl_compat_msg *msg,  		goto out;  	tipc_tlv_sprintf(msg->rep, "%-10u %s", -			 nla_get_u32(publ[TIPC_NLA_PUBL_REF]), +			 nla_get_u32(publ[TIPC_NLA_PUBL_KEY]),  			 scope_str[nla_get_u32(publ[TIPC_NLA_PUBL_SCOPE])]);  out:  	tipc_tlv_sprintf(msg->rep, "\n"); diff --git a/net/tipc/node.c b/net/tipc/node.c index 9aaa1bc566ae..e01e2c71b5a1 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -1,7 +1,7 @@  /*   * net/tipc/node.c: TIPC node management routines   * - * Copyright (c) 2000-2006, 2012-2015, Ericsson AB + * Copyright (c) 2000-2006, 2012-2016, Ericsson AB   * Copyright (c) 2005-2006, 2010-2014, Wind River Systems   * All rights reserved.   * @@ -191,6 +191,20 @@ int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel)  	tipc_node_put(n);  	return mtu;  } + +u16 tipc_node_get_capabilities(struct net *net, u32 addr) +{ +	struct tipc_node *n; +	u16 caps; + +	n = tipc_node_find(net, addr); +	if (unlikely(!n)) +		return TIPC_NODE_CAPABILITIES; +	caps = n->capabilities; +	tipc_node_put(n); +	return caps; +} +  /*   * A trivial power-of-two bitmask technique is used for speed, since this   * operation is done for every incoming TIPC packet. The number of hash table @@ -304,8 +318,11 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)  	spin_lock_bh(&tn->node_list_lock);  	n = tipc_node_find(net, addr); -	if (n) +	if (n) { +		/* Same node may come back with new capabilities */ +		n->capabilities = capabilities;  		goto exit; +	}  	n = kzalloc(sizeof(*n), GFP_ATOMIC);  	if (!n) {  		pr_warn("Node creation failed, no memory\n"); @@ -525,7 +542,7 @@ static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,  	struct tipc_link *ol = node_active_link(n, 0);  	struct tipc_link *nl = n->links[bearer_id].link; -	if (!nl) +	if (!nl || tipc_link_is_up(nl))  		return;  	tipc_link_fsm_evt(nl, LINK_ESTABLISH_EVT); @@ -545,12 +562,16 @@ static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,  	pr_debug("Established link <%s> on network plane %c\n",  		 tipc_link_name(nl), tipc_link_plane(nl)); +	/* Ensure that a STATE message goes first */ +	tipc_link_build_state_msg(nl, xmitq); +  	/* First link? => give it both slots */  	if (!ol) {  		*slot0 = bearer_id;  		*slot1 = bearer_id;  		tipc_node_fsm_evt(n, SELF_ESTABL_CONTACT_EVT);  		n->action_flags |= TIPC_NOTIFY_NODE_UP; +		tipc_link_set_active(nl, true);  		tipc_bcast_add_peer(n->net, nl, xmitq);  		return;  	} @@ -581,8 +602,12 @@ static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,  static void tipc_node_link_up(struct tipc_node *n, int bearer_id,  			      struct sk_buff_head *xmitq)  { +	struct tipc_media_addr *maddr; +  	tipc_node_write_lock(n);  	__tipc_node_link_up(n, bearer_id, xmitq); +	maddr = &n->links[bearer_id].maddr; +	tipc_bearer_xmit(n->net, bearer_id, xmitq, maddr);  	tipc_node_write_unlock(n);  } @@ -1279,7 +1304,7 @@ static void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id  	/* Broadcast ACKs are sent on a unicast link */  	if (rc & TIPC_LINK_SND_BC_ACK) {  		tipc_node_read_lock(n); -		tipc_link_build_ack_msg(le->link, &xmitq); +		tipc_link_build_state_msg(le->link, &xmitq);  		tipc_node_read_unlock(n);  	} diff --git a/net/tipc/node.h b/net/tipc/node.h index f39d9d06e8bb..8264b3d97dc4 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -45,10 +45,11 @@  /* Optional capabilities supported by this code version   */  enum { -	TIPC_BCAST_SYNCH = (1 << 1) +	TIPC_BCAST_SYNCH   = (1 << 1), +	TIPC_BLOCK_FLOWCTL = (2 << 1)  }; -#define TIPC_NODE_CAPABILITIES TIPC_BCAST_SYNCH +#define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | TIPC_BLOCK_FLOWCTL)  #define INVALID_BEARER_ID -1  void tipc_node_stop(struct net *net); @@ -70,6 +71,7 @@ void tipc_node_broadcast(struct net *net, struct sk_buff *skb);  int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port);  void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port);  int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel); +u16 tipc_node_get_capabilities(struct net *net, u32 addr);  int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb);  int tipc_nl_node_dump_link(struct sk_buff *skb, struct netlink_callback *cb);  int tipc_nl_node_reset_link_stats(struct sk_buff *skb, struct genl_info *info); diff --git a/net/tipc/server.c b/net/tipc/server.c index 2446bfbaa309..272d20a795d5 100644 --- a/net/tipc/server.c +++ b/net/tipc/server.c @@ -86,6 +86,7 @@ struct outqueue_entry {  static void tipc_recv_work(struct work_struct *work);  static void tipc_send_work(struct work_struct *work);  static void tipc_clean_outqueues(struct tipc_conn *con); +static void tipc_sock_release(struct tipc_conn *con);  static void tipc_conn_kref_release(struct kref *kref)  { @@ -102,6 +103,7 @@ static void tipc_conn_kref_release(struct kref *kref)  		}  		saddr->scope = -TIPC_NODE_SCOPE;  		kernel_bind(sock, (struct sockaddr *)saddr, sizeof(*saddr)); +		tipc_sock_release(con);  		sock_release(sock);  		con->sock = NULL;  	} @@ -136,28 +138,28 @@ static void sock_data_ready(struct sock *sk)  {  	struct tipc_conn *con; -	read_lock(&sk->sk_callback_lock); +	read_lock_bh(&sk->sk_callback_lock);  	con = sock2con(sk);  	if (con && test_bit(CF_CONNECTED, &con->flags)) {  		conn_get(con);  		if (!queue_work(con->server->rcv_wq, &con->rwork))  			conn_put(con);  	} -	read_unlock(&sk->sk_callback_lock); +	read_unlock_bh(&sk->sk_callback_lock);  }  static void sock_write_space(struct sock *sk)  {  	struct tipc_conn *con; -	read_lock(&sk->sk_callback_lock); +	read_lock_bh(&sk->sk_callback_lock);  	con = sock2con(sk);  	if (con && test_bit(CF_CONNECTED, &con->flags)) {  		conn_get(con);  		if (!queue_work(con->server->send_wq, &con->swork))  			conn_put(con);  	} -	read_unlock(&sk->sk_callback_lock); +	read_unlock_bh(&sk->sk_callback_lock);  }  static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con) @@ -184,26 +186,31 @@ static void tipc_unregister_callbacks(struct tipc_conn *con)  	write_unlock_bh(&sk->sk_callback_lock);  } +static void tipc_sock_release(struct tipc_conn *con) +{ +	struct tipc_server *s = con->server; + +	if (con->conid) +		s->tipc_conn_release(con->conid, con->usr_data); + +	tipc_unregister_callbacks(con); +} +  static void tipc_close_conn(struct tipc_conn *con)  {  	struct tipc_server *s = con->server;  	if (test_and_clear_bit(CF_CONNECTED, &con->flags)) { -		if (con->conid) -			s->tipc_conn_shutdown(con->conid, con->usr_data);  		spin_lock_bh(&s->idr_lock);  		idr_remove(&s->conn_idr, con->conid);  		s->idr_in_use--;  		spin_unlock_bh(&s->idr_lock); -		tipc_unregister_callbacks(con); -  		/* We shouldn't flush pending works as we may be in the  		 * thread. In fact the races with pending rx/tx work structs  		 * are harmless for us here as we have already deleted this -		 * connection from server connection list and set -		 * sk->sk_user_data to 0 before releasing connection object. +		 * connection from server connection list.  		 */  		kernel_sock_shutdown(con->sock, SHUT_RDWR); diff --git a/net/tipc/server.h b/net/tipc/server.h index 9015faedb1b0..34f8055afa3b 100644 --- a/net/tipc/server.h +++ b/net/tipc/server.h @@ -53,7 +53,7 @@   * @send_wq: send workqueue   * @max_rcvbuf_size: maximum permitted receive message length   * @tipc_conn_new: callback will be called when new connection is incoming - * @tipc_conn_shutdown: callback will be called when connection is shut down + * @tipc_conn_release: callback will be called before releasing the connection   * @tipc_conn_recvmsg: callback will be called when message arrives   * @saddr: TIPC server address   * @name: server name @@ -70,7 +70,7 @@ struct tipc_server {  	struct workqueue_struct *send_wq;  	int max_rcvbuf_size;  	void *(*tipc_conn_new)(int conid); -	void (*tipc_conn_shutdown)(int conid, void *usr_data); +	void (*tipc_conn_release)(int conid, void *usr_data);  	void (*tipc_conn_recvmsg)(struct net *net, int conid,  				  struct sockaddr_tipc *addr, void *usr_data,  				  void *buf, size_t len); diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 3eeb50a27b89..88bfcd707064 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -96,8 +96,11 @@ struct tipc_sock {  	uint conn_timeout;  	atomic_t dupl_rcvcnt;  	bool link_cong; -	uint sent_unacked; -	uint rcv_unacked; +	u16 snt_unacked; +	u16 snd_win; +	u16 peer_caps; +	u16 rcv_unacked; +	u16 rcv_win;  	struct sockaddr_tipc remote;  	struct rhash_head node;  	struct rcu_head rcu; @@ -227,9 +230,29 @@ static struct tipc_sock *tipc_sk(const struct sock *sk)  	return container_of(sk, struct tipc_sock, sk);  } -static int tsk_conn_cong(struct tipc_sock *tsk) +static bool tsk_conn_cong(struct tipc_sock *tsk)  { -	return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN; +	return tsk->snt_unacked >= tsk->snd_win; +} + +/* tsk_blocks(): translate a buffer size in bytes to number of + * advertisable blocks, taking into account the ratio truesize(len)/len + * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ + */ +static u16 tsk_adv_blocks(int len) +{ +	return len / FLOWCTL_BLK_SZ / 4; +} + +/* tsk_inc(): increment counter for sent or received data + * - If block based flow control is not supported by peer we + *   fall back to message based ditto, incrementing the counter + */ +static u16 tsk_inc(struct tipc_sock *tsk, int msglen) +{ +	if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL)) +		return ((msglen / FLOWCTL_BLK_SZ) + 1); +	return 1;  }  /** @@ -366,7 +389,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,  	sock->state = state;  	sock_init_data(sock, sk);  	if (tipc_sk_insert(tsk)) { -		pr_warn("Socket create failed; port numbrer exhausted\n"); +		pr_warn("Socket create failed; port number exhausted\n");  		return -EINVAL;  	}  	msg_set_origport(msg, tsk->portid); @@ -377,9 +400,12 @@ static int tipc_sk_create(struct net *net, struct socket *sock,  	sk->sk_write_space = tipc_write_space;  	sk->sk_destruct = tipc_sock_destruct;  	tsk->conn_timeout = CONN_TIMEOUT_DEFAULT; -	tsk->sent_unacked = 0;  	atomic_set(&tsk->dupl_rcvcnt, 0); +	/* Start out with safe limits until we receive an advertised window */ +	tsk->snd_win = tsk_adv_blocks(RCVBUF_MIN); +	tsk->rcv_win = tsk->snd_win; +  	if (sock->state == SS_READY) {  		tsk_set_unreturnable(tsk, true);  		if (sock->type == SOCK_DGRAM) @@ -775,7 +801,7 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb)  	struct sock *sk = &tsk->sk;  	struct tipc_msg *hdr = buf_msg(skb);  	int mtyp = msg_type(hdr); -	int conn_cong; +	bool conn_cong;  	/* Ignore if connection cannot be validated: */  	if (!tsk_peer_msg(tsk, hdr)) @@ -789,7 +815,9 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb)  		return;  	} else if (mtyp == CONN_ACK) {  		conn_cong = tsk_conn_cong(tsk); -		tsk->sent_unacked -= msg_msgcnt(hdr); +		tsk->snt_unacked -= msg_conn_ack(hdr); +		if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) +			tsk->snd_win = msg_adv_win(hdr);  		if (conn_cong)  			sk->sk_write_space(sk);  	} else if (mtyp != CONN_PROBE_REPLY) { @@ -1020,12 +1048,14 @@ static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)  	u32 dnode;  	uint mtu, send, sent = 0;  	struct iov_iter save; +	int hlen = MIN_H_SIZE;  	/* Handle implied connection establishment */  	if (unlikely(dest)) {  		rc = __tipc_sendmsg(sock, m, dsz); +		hlen = msg_hdr_sz(mhdr);  		if (dsz && (dsz == rc)) -			tsk->sent_unacked = 1; +			tsk->snt_unacked = tsk_inc(tsk, dsz + hlen);  		return rc;  	}  	if (dsz > (uint)INT_MAX) @@ -1054,7 +1084,7 @@ next:  		if (likely(!tsk_conn_cong(tsk))) {  			rc = tipc_node_xmit(net, &pktchain, dnode, portid);  			if (likely(!rc)) { -				tsk->sent_unacked++; +				tsk->snt_unacked += tsk_inc(tsk, send + hlen);  				sent += send;  				if (sent == dsz)  					return dsz; @@ -1118,6 +1148,13 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,  	sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv);  	tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);  	tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid); +	tsk->peer_caps = tipc_node_get_capabilities(net, peer_node); +	if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) +		return; + +	/* Fall back to message based flow control */ +	tsk->rcv_win = FLOWCTL_MSG_WIN; +	tsk->snd_win = FLOWCTL_MSG_WIN;  }  /** @@ -1214,7 +1251,7 @@ static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg,  	return 0;  } -static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack) +static void tipc_sk_send_ack(struct tipc_sock *tsk)  {  	struct net *net = sock_net(&tsk->sk);  	struct sk_buff *skb = NULL; @@ -1230,7 +1267,14 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)  	if (!skb)  		return;  	msg = buf_msg(skb); -	msg_set_msgcnt(msg, ack); +	msg_set_conn_ack(msg, tsk->rcv_unacked); +	tsk->rcv_unacked = 0; + +	/* Adjust to and advertize the correct window limit */ +	if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) { +		tsk->rcv_win = tsk_adv_blocks(tsk->sk.sk_rcvbuf); +		msg_set_adv_win(msg, tsk->rcv_win); +	}  	tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg));  } @@ -1288,7 +1332,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len,  	long timeo;  	unsigned int sz;  	u32 err; -	int res; +	int res, hlen;  	/* Catch invalid receive requests */  	if (unlikely(!buf_len)) @@ -1313,6 +1357,7 @@ restart:  	buf = skb_peek(&sk->sk_receive_queue);  	msg = buf_msg(buf);  	sz = msg_data_sz(msg); +	hlen = msg_hdr_sz(msg);  	err = msg_errcode(msg);  	/* Discard an empty non-errored message & try again */ @@ -1335,7 +1380,7 @@ restart:  			sz = buf_len;  			m->msg_flags |= MSG_TRUNC;  		} -		res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg), m, sz); +		res = skb_copy_datagram_msg(buf, hlen, m, sz);  		if (res)  			goto exit;  		res = sz; @@ -1347,15 +1392,15 @@ restart:  			res = -ECONNRESET;  	} -	/* Consume received message (optional) */ -	if (likely(!(flags & MSG_PEEK))) { -		if ((sock->state != SS_READY) && -		    (++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) { -			tipc_sk_send_ack(tsk, tsk->rcv_unacked); -			tsk->rcv_unacked = 0; -		} -		tsk_advance_rx_queue(sk); +	if (unlikely(flags & MSG_PEEK)) +		goto exit; + +	if (likely(sock->state != SS_READY)) { +		tsk->rcv_unacked += tsk_inc(tsk, hlen + sz); +		if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4))) +			tipc_sk_send_ack(tsk);  	} +	tsk_advance_rx_queue(sk);  exit:  	release_sock(sk);  	return res; @@ -1384,7 +1429,7 @@ static int tipc_recv_stream(struct socket *sock, struct msghdr *m,  	int sz_to_copy, target, needed;  	int sz_copied = 0;  	u32 err; -	int res = 0; +	int res = 0, hlen;  	/* Catch invalid receive attempts */  	if (unlikely(!buf_len)) @@ -1410,6 +1455,7 @@ restart:  	buf = skb_peek(&sk->sk_receive_queue);  	msg = buf_msg(buf);  	sz = msg_data_sz(msg); +	hlen = msg_hdr_sz(msg);  	err = msg_errcode(msg);  	/* Discard an empty non-errored message & try again */ @@ -1434,8 +1480,7 @@ restart:  		needed = (buf_len - sz_copied);  		sz_to_copy = (sz <= needed) ? sz : needed; -		res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg) + offset, -					    m, sz_to_copy); +		res = skb_copy_datagram_msg(buf, hlen + offset, m, sz_to_copy);  		if (res)  			goto exit; @@ -1457,20 +1502,18 @@ restart:  			res = -ECONNRESET;  	} -	/* Consume received message (optional) */ -	if (likely(!(flags & MSG_PEEK))) { -		if (unlikely(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) { -			tipc_sk_send_ack(tsk, tsk->rcv_unacked); -			tsk->rcv_unacked = 0; -		} -		tsk_advance_rx_queue(sk); -	} +	if (unlikely(flags & MSG_PEEK)) +		goto exit; + +	tsk->rcv_unacked += tsk_inc(tsk, hlen + sz); +	if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4))) +		tipc_sk_send_ack(tsk); +	tsk_advance_rx_queue(sk);  	/* Loop around if more data is required */  	if ((sz_copied < buf_len) &&	/* didn't get all requested data */  	    (!skb_queue_empty(&sk->sk_receive_queue) ||  	    (sz_copied < target)) &&	/* and more is ready or required */ -	    (!(flags & MSG_PEEK)) &&	/* and aren't just peeking at data */  	    (!err))			/* and haven't reached a FIN */  		goto restart; @@ -1602,30 +1645,33 @@ static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)  /**   * rcvbuf_limit - get proper overload limit of socket receive queue   * @sk: socket - * @buf: message + * @skb: message   * - * For all connection oriented messages, irrespective of importance, - * the default overload value (i.e. 67MB) is set as limit. + * For connection oriented messages, irrespective of importance, + * default queue limit is 2 MB.   * - * For all connectionless messages, by default new queue limits are - * as belows: + * For connectionless messages, queue limits are based on message + * importance as follows:   * - * TIPC_LOW_IMPORTANCE       (4 MB) - * TIPC_MEDIUM_IMPORTANCE    (8 MB) - * TIPC_HIGH_IMPORTANCE      (16 MB) - * TIPC_CRITICAL_IMPORTANCE  (32 MB) + * TIPC_LOW_IMPORTANCE       (2 MB) + * TIPC_MEDIUM_IMPORTANCE    (4 MB) + * TIPC_HIGH_IMPORTANCE      (8 MB) + * TIPC_CRITICAL_IMPORTANCE  (16 MB)   *   * Returns overload limit according to corresponding message importance   */ -static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf) +static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)  { -	struct tipc_msg *msg = buf_msg(buf); +	struct tipc_sock *tsk = tipc_sk(sk); +	struct tipc_msg *hdr = buf_msg(skb); -	if (msg_connected(msg)) -		return sysctl_tipc_rmem[2]; +	if (unlikely(!msg_connected(hdr))) +		return sk->sk_rcvbuf << msg_importance(hdr); -	return sk->sk_rcvbuf >> TIPC_CRITICAL_IMPORTANCE << -		msg_importance(msg); +	if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL)) +		return sk->sk_rcvbuf; + +	return FLOWCTL_MSG_LIM;  }  /** @@ -1748,7 +1794,7 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,  		/* Try backlog, compensating for double-counted bytes */  		dcnt = &tipc_sk(sk)->dupl_rcvcnt; -		if (sk->sk_backlog.len) +		if (!sk->sk_backlog.len)  			atomic_set(dcnt, 0);  		lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);  		if (likely(!sk_add_backlog(sk, skb, lim))) @@ -2807,6 +2853,9 @@ int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)  		if (err)  			return err; +		if (!attrs[TIPC_NLA_SOCK]) +			return -EINVAL; +  		err = nla_parse_nested(sock, TIPC_NLA_SOCK_MAX,  				       attrs[TIPC_NLA_SOCK],  				       tipc_nl_sock_policy); diff --git a/net/tipc/socket.h b/net/tipc/socket.h index 4241f22069dc..06fb5944cf76 100644 --- a/net/tipc/socket.h +++ b/net/tipc/socket.h @@ -1,6 +1,6 @@  /* net/tipc/socket.h: Include file for TIPC socket code   * - * Copyright (c) 2014-2015, Ericsson AB + * Copyright (c) 2014-2016, Ericsson AB   * All rights reserved.   *   * Redistribution and use in source and binary forms, with or without @@ -38,10 +38,17 @@  #include <net/sock.h>  #include <net/genetlink.h> -#define TIPC_CONNACK_INTV         256 -#define TIPC_FLOWCTRL_WIN        (TIPC_CONNACK_INTV * 2) -#define TIPC_CONN_OVERLOAD_LIMIT ((TIPC_FLOWCTRL_WIN * 2 + 1) * \ -				  SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE)) +/* Compatibility values for deprecated message based flow control */ +#define FLOWCTL_MSG_WIN 512 +#define FLOWCTL_MSG_LIM ((FLOWCTL_MSG_WIN * 2 + 1) * SKB_TRUESIZE(MAX_MSG_SIZE)) + +#define FLOWCTL_BLK_SZ 1024 + +/* Socket receive buffer sizes */ +#define RCVBUF_MIN  (FLOWCTL_BLK_SZ * 512) +#define RCVBUF_DEF  (FLOWCTL_BLK_SZ * 1024 * 2) +#define RCVBUF_MAX  (FLOWCTL_BLK_SZ * 1024 * 16) +  int tipc_socket_init(void);  void tipc_socket_stop(void);  void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq); diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index e6cb386fbf34..0dd02244e21d 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -302,7 +302,7 @@ static void tipc_subscrp_subscribe(struct net *net, struct tipc_subscr *s,  }  /* Handle one termination request for the subscriber */ -static void tipc_subscrb_shutdown_cb(int conid, void *usr_data) +static void tipc_subscrb_release_cb(int conid, void *usr_data)  {  	tipc_subscrb_delete((struct tipc_subscriber *)usr_data);  } @@ -326,8 +326,7 @@ static void tipc_subscrb_rcv_cb(struct net *net, int conid,  		return tipc_subscrp_cancel(s, subscriber);  	} -	if (s) -		tipc_subscrp_subscribe(net, s, subscriber, swap); +	tipc_subscrp_subscribe(net, s, subscriber, swap);  }  /* Handle one request to establish a new subscriber */ @@ -365,7 +364,7 @@ int tipc_topsrv_start(struct net *net)  	topsrv->max_rcvbuf_size		= sizeof(struct tipc_subscr);  	topsrv->tipc_conn_recvmsg	= tipc_subscrb_rcv_cb;  	topsrv->tipc_conn_new		= tipc_subscrb_connect_cb; -	topsrv->tipc_conn_shutdown	= tipc_subscrb_shutdown_cb; +	topsrv->tipc_conn_release	= tipc_subscrb_release_cb;  	strncpy(topsrv->name, name, strlen(name) + 1);  	tn->topsrv = topsrv;  | 
