From 2da7142516527a5213588f47ed302e79a5d9527a Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 2 Apr 2015 09:33:00 -0400 Subject: tipc: drop tunneled packet duplicates at reception In commit 8b4ed8634f8b3f9aacfc42b4a872d30c36b9e255 ("tipc: eliminate race condition at dual link establishment") we introduced a parallel link synchronization mechanism that guarentees sequential delivery even for users switching from an old to a newly established link. The new mechanism makes it unnecessary to deliver the tunneled duplicate packets back to the old link, as we are currently doing. It is now sufficient to use the last tunneled packet's inner sequence number as synchronization point between the two parallel links, whereafter it can be dropped. In this commit, we drop the duplicate packets arriving on the new link, after updating the synchronization point at each new arrival. Although it would now have been sufficient for the other endpoint to only tunnel the last packet in its send queue, and not the entire queue, we must still do this to maintain compatibility with older nodes. This commit makes it possible to get rid if some complex interaction between the two parallel links. Reviewed-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/link.c | 132 ++++++++++++++++++++------------------------------------ 1 file changed, 47 insertions(+), 85 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index 514466efc25c..c697cf69da91 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -105,8 +105,6 @@ static void link_handle_out_of_seq_msg(struct tipc_link *link, struct sk_buff *skb); static void tipc_link_proto_rcv(struct tipc_link *link, struct sk_buff *skb); -static int tipc_link_tunnel_rcv(struct tipc_node *node, - struct sk_buff **skb); static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol); static void link_state_event(struct tipc_link *l_ptr, u32 event); static void link_reset_statistics(struct tipc_link *l_ptr); @@ -115,7 +113,8 @@ static void tipc_link_sync_xmit(struct tipc_link *l); static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf); static void tipc_link_input(struct tipc_link *l, struct sk_buff *skb); static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb); - +static bool tipc_link_failover_rcv(struct tipc_node *node, + struct sk_buff **skb); /* * Simple link routines */ @@ -1274,8 +1273,10 @@ static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb) if (msg_dup(msg)) { link->flags |= LINK_SYNCHING; link->synch_point = msg_seqno(msg_get_wrapped(msg)); + kfree_skb(skb); + break; } - if (!tipc_link_tunnel_rcv(node, &skb)) + if (!tipc_link_failover_rcv(node, &skb)) break; if (msg_user(buf_msg(skb)) != MSG_BUNDLER) { tipc_data_input(link, skb); @@ -1755,101 +1756,62 @@ tunnel_queue: goto tunnel_queue; } -/* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet. - * Owner node is locked. - */ -static void tipc_link_dup_rcv(struct tipc_link *link, - struct sk_buff *skb) -{ - struct sk_buff *iskb; - int pos = 0; - - if (!tipc_link_is_up(link)) - return; - - if (!tipc_msg_extract(skb, &iskb, &pos)) { - pr_warn("%sfailed to extract inner dup pkt\n", link_co_err); - return; - } - /* Append buffer to deferred queue, if applicable: */ - link_handle_out_of_seq_msg(link, iskb); -} - /* tipc_link_failover_rcv(): Receive a tunnelled ORIGINAL_MSG packet * Owner node is locked. */ -static struct sk_buff *tipc_link_failover_rcv(struct tipc_link *l_ptr, - struct sk_buff *t_buf) +static bool tipc_link_failover_rcv(struct tipc_node *node, + struct sk_buff **skb) { - struct tipc_msg *t_msg = buf_msg(t_buf); - struct sk_buff *buf = NULL; - struct tipc_msg *msg; + struct tipc_msg *msg = buf_msg(*skb); + struct sk_buff *iskb = NULL; + struct tipc_link *link = NULL; + int bearer_id = msg_bearer_id(msg); int pos = 0; - if (tipc_link_is_up(l_ptr)) - tipc_link_reset(l_ptr); - - /* First failover packet? */ - if (l_ptr->exp_msg_count == START_CHANGEOVER) - l_ptr->exp_msg_count = msg_msgcnt(t_msg); - - /* Should there be an inner packet? */ - if (l_ptr->exp_msg_count) { - l_ptr->exp_msg_count--; - if (!tipc_msg_extract(t_buf, &buf, &pos)) { - pr_warn("%sno inner failover pkt\n", link_co_err); - goto exit; - } - msg = buf_msg(buf); - - if (less(msg_seqno(msg), l_ptr->reset_checkpoint)) { - kfree_skb(buf); - buf = NULL; - goto exit; - } - if (msg_user(msg) == MSG_FRAGMENTER) { - l_ptr->stats.recv_fragments++; - tipc_buf_append(&l_ptr->reasm_buf, &buf); - } + if (msg_type(msg) != ORIGINAL_MSG) { + pr_warn("%sunknown tunnel pkt received\n", link_co_err); + goto exit; } -exit: - if ((!l_ptr->exp_msg_count) && (l_ptr->flags & LINK_STOPPED)) - tipc_link_delete(l_ptr); - return buf; -} - -/* tipc_link_tunnel_rcv(): Receive a tunnelled packet, sent - * via other link as result of a failover (ORIGINAL_MSG) or - * a new active link (DUPLICATE_MSG). Failover packets are - * returned to the active link for delivery upwards. - * Owner node is locked. - */ -static int tipc_link_tunnel_rcv(struct tipc_node *n_ptr, - struct sk_buff **buf) -{ - struct sk_buff *t_buf = *buf; - struct tipc_link *l_ptr; - struct tipc_msg *t_msg = buf_msg(t_buf); - u32 bearer_id = msg_bearer_id(t_msg); + if (bearer_id >= MAX_BEARERS) + goto exit; + link = node->links[bearer_id]; + if (!link) + goto exit; + if (tipc_link_is_up(link)) + tipc_link_reset(link); - *buf = NULL; + /* First failover packet? */ + if (link->exp_msg_count == START_CHANGEOVER) + link->exp_msg_count = msg_msgcnt(msg); - if (bearer_id >= MAX_BEARERS) + /* Should we expect an inner packet? */ + if (!link->exp_msg_count) goto exit; - l_ptr = n_ptr->links[bearer_id]; - if (!l_ptr) + if (!tipc_msg_extract(*skb, &iskb, &pos)) { + pr_warn("%sno inner failover pkt\n", link_co_err); + *skb = NULL; goto exit; + } + link->exp_msg_count--; + *skb = NULL; - if (msg_type(t_msg) == DUPLICATE_MSG) - tipc_link_dup_rcv(l_ptr, t_buf); - else if (msg_type(t_msg) == ORIGINAL_MSG) - *buf = tipc_link_failover_rcv(l_ptr, t_buf); - else - pr_warn("%sunknown tunnel pkt received\n", link_co_err); + /* Was packet already delivered? */ + if (less(buf_seqno(iskb), link->reset_checkpoint)) { + kfree_skb(iskb); + iskb = NULL; + goto exit; + } + if (msg_user(buf_msg(iskb)) == MSG_FRAGMENTER) { + link->stats.recv_fragments++; + tipc_buf_append(&link->reasm_buf, &iskb); + } exit: - kfree_skb(t_buf); - return *buf != NULL; + if (link && (!link->exp_msg_count) && (link->flags & LINK_STOPPED)) + tipc_link_delete(link); + kfree_skb(*skb); + *skb = iskb; + return *skb; } static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol) -- cgit v1.2.3-59-g8ed1b From dff29b1a88524fe6afe296d6c477c491d1e02af0 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 2 Apr 2015 09:33:01 -0400 Subject: tipc: eliminate delayed link deletion at link failover When a bearer is disabled manually, all its links have to be reset and deleted. However, if there is a remaining, parallel link ready to take over a deleted link's traffic, we currently delay the delete of the removed link until the failover procedure is finished. This is because the remaining link needs to access state from the reset link, such as the last received packet number, and any partially reassembled buffer, in order to perform a successful failover. In this commit, we do instead move the state data over to the new link, so that it can fulfill the procedure autonomously, without accessing any data on the old link. This means that we can now proceed and delete all pertaining links immediately when a bearer is disabled. This saves us from some unnecessary complexity in such situations. We also choose to change the confusing definitions CHANGEOVER_PROTOCOL, ORIGINAL_MSG and DUPLICATE_MSG to the more descriptive TUNNEL_PROTOCOL, FAILOVER_MSG and SYNCH_MSG respectively. Reviewed-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/link.c | 124 +++++++++++++++++++++++++------------------------------- net/tipc/link.h | 17 ++++---- net/tipc/msg.c | 4 +- net/tipc/msg.h | 10 ++--- net/tipc/node.c | 13 +++--- 5 files changed, 78 insertions(+), 90 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index c697cf69da91..b1e17953eeea 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -89,17 +89,9 @@ static const struct nla_policy tipc_nl_prop_policy[TIPC_NLA_PROP_MAX + 1] = { #define TIMEOUT_EVT 560817u /* link timer expired */ /* - * The following two 'message types' is really just implementation - * data conveniently stored in the message header. - * They must not be considered part of the protocol + * State value stored in 'failover_pkts' */ -#define OPEN_MSG 0 -#define CLOSED_MSG 1 - -/* - * State value stored in 'exp_msg_count' - */ -#define START_CHANGEOVER 100000u +#define FIRST_FAILOVER 0xffffu static void link_handle_out_of_seq_msg(struct tipc_link *link, struct sk_buff *skb); @@ -113,8 +105,7 @@ static void tipc_link_sync_xmit(struct tipc_link *l); static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf); static void tipc_link_input(struct tipc_link *l, struct sk_buff *skb); static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb); -static bool tipc_link_failover_rcv(struct tipc_node *node, - struct sk_buff **skb); +static bool tipc_link_failover_rcv(struct tipc_link *l, struct sk_buff **skb); /* * Simple link routines */ @@ -332,15 +323,19 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr, } /** - * link_delete - Conditional deletion of link. - * If timer still running, real delete is done when it expires - * @link: link to be deleted + * tipc_link_delete - Delete a link + * @l: link to be deleted */ -void tipc_link_delete(struct tipc_link *link) +void tipc_link_delete(struct tipc_link *l) { - tipc_link_reset_fragments(link); - tipc_node_detach_link(link->owner, link); - tipc_link_put(link); + tipc_link_reset(l); + if (del_timer(&l->timer)) + tipc_link_put(l); + l->flags |= LINK_STOPPED; + /* Delete link now, or when timer is finished: */ + tipc_link_reset_fragments(l); + tipc_node_detach_link(l->owner, l); + tipc_link_put(l); } void tipc_link_delete_list(struct net *net, unsigned int bearer_id, @@ -349,23 +344,12 @@ void tipc_link_delete_list(struct net *net, unsigned int bearer_id, struct tipc_net *tn = net_generic(net, tipc_net_id); struct tipc_link *link; struct tipc_node *node; - bool del_link; rcu_read_lock(); list_for_each_entry_rcu(node, &tn->node_list, list) { tipc_node_lock(node); link = node->links[bearer_id]; - if (!link) { - tipc_node_unlock(node); - continue; - } - del_link = !tipc_link_is_up(link) && !link->exp_msg_count; - tipc_link_reset(link); - if (del_timer(&link->timer)) - tipc_link_put(link); - link->flags |= LINK_STOPPED; - /* Delete link now, or when failover is finished: */ - if (shutting_down || !tipc_node_is_up(node) || del_link) + if (link) tipc_link_delete(link); tipc_node_unlock(node); } @@ -472,9 +456,9 @@ void tipc_link_purge_queues(struct tipc_link *l_ptr) void tipc_link_reset(struct tipc_link *l_ptr) { u32 prev_state = l_ptr->state; - u32 checkpoint = l_ptr->next_in_no; int was_active_link = tipc_link_is_active(l_ptr); struct tipc_node *owner = l_ptr->owner; + struct tipc_link *pl = tipc_parallel_link(l_ptr); msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff)); @@ -492,11 +476,15 @@ void tipc_link_reset(struct tipc_link *l_ptr) tipc_node_link_down(l_ptr->owner, l_ptr); tipc_bearer_remove_dest(owner->net, l_ptr->bearer_id, l_ptr->addr); - if (was_active_link && tipc_node_active_links(l_ptr->owner)) { - l_ptr->reset_checkpoint = checkpoint; - l_ptr->exp_msg_count = START_CHANGEOVER; + if (was_active_link && tipc_node_is_up(l_ptr->owner) && (pl != l_ptr)) { + l_ptr->flags |= LINK_FAILINGOVER; + l_ptr->failover_checkpt = l_ptr->next_in_no; + pl->failover_pkts = FIRST_FAILOVER; + pl->failover_checkpt = l_ptr->next_in_no; + pl->failover_skb = l_ptr->reasm_buf; + } else { + kfree_skb(l_ptr->reasm_buf); } - /* Clean up all queues, except inputq: */ __skb_queue_purge(&l_ptr->transmq); __skb_queue_purge(&l_ptr->deferdq); @@ -506,6 +494,7 @@ void tipc_link_reset(struct tipc_link *l_ptr) if (!skb_queue_empty(owner->inputq)) owner->action_flags |= TIPC_MSG_EVT; tipc_link_purge_backlog(l_ptr); + l_ptr->reasm_buf = NULL; l_ptr->rcv_unacked = 0; l_ptr->checkpoint = 1; l_ptr->next_out_no = 1; @@ -557,8 +546,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) if (!(l_ptr->flags & LINK_STARTED) && (event != STARTING_EVT)) return; /* Not yet. */ - /* Check whether changeover is going on */ - if (l_ptr->exp_msg_count) { + if (l_ptr->flags & LINK_FAILINGOVER) { if (event == TIMEOUT_EVT) link_set_timer(l_ptr, cont_intv); return; @@ -1242,7 +1230,7 @@ static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb) node->action_flags |= TIPC_NAMED_MSG_EVT; return true; case MSG_BUNDLER: - case CHANGEOVER_PROTOCOL: + case TUNNEL_PROTOCOL: case MSG_FRAGMENTER: case BCAST_PROTOCOL: return false; @@ -1269,14 +1257,14 @@ static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb) return; switch (msg_user(msg)) { - case CHANGEOVER_PROTOCOL: + case TUNNEL_PROTOCOL: if (msg_dup(msg)) { link->flags |= LINK_SYNCHING; link->synch_point = msg_seqno(msg_get_wrapped(msg)); kfree_skb(skb); break; } - if (!tipc_link_failover_rcv(node, &skb)) + if (!tipc_link_failover_rcv(link, &skb)) break; if (msg_user(buf_msg(skb)) != MSG_BUNDLER) { tipc_data_input(link, skb); @@ -1391,8 +1379,8 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg, u32 msg_size = sizeof(l_ptr->proto_msg); int r_flag; - /* Don't send protocol message during link changeover */ - if (l_ptr->exp_msg_count) + /* Don't send protocol message during link failover */ + if (l_ptr->flags & LINK_FAILINGOVER) return; /* Abort non-RESET send if communication with node is prohibited */ @@ -1444,7 +1432,7 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg, } l_ptr->stats.sent_states++; } else { /* RESET_MSG or ACTIVATE_MSG */ - msg_set_ack(msg, mod(l_ptr->reset_checkpoint - 1)); + msg_set_ack(msg, mod(l_ptr->failover_checkpt - 1)); msg_set_seq_gap(msg, 0); msg_set_next_sent(msg, 1); msg_set_probe(msg, 0); @@ -1486,8 +1474,7 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, u32 msg_tol; struct tipc_msg *msg = buf_msg(buf); - /* Discard protocol message during link changeover */ - if (l_ptr->exp_msg_count) + if (l_ptr->flags & LINK_FAILINGOVER) goto exit; if (l_ptr->net_plane != msg_net_plane(msg)) @@ -1659,8 +1646,8 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr) if (!tunnel) return; - tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, CHANGEOVER_PROTOCOL, - ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr); + tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, TUNNEL_PROTOCOL, + FAILOVER_MSG, INT_H_SIZE, l_ptr->addr); skb_queue_splice_tail_init(&l_ptr->backlogq, &l_ptr->transmq); tipc_link_purge_backlog(l_ptr); msgcount = skb_queue_len(&l_ptr->transmq); @@ -1722,8 +1709,8 @@ void tipc_link_dup_queue_xmit(struct tipc_link *link, struct sk_buff_head *queue = &link->transmq; int mcnt; - tipc_msg_init(link_own_addr(link), &tnl_hdr, CHANGEOVER_PROTOCOL, - DUPLICATE_MSG, INT_H_SIZE, link->addr); + tipc_msg_init(link_own_addr(link), &tnl_hdr, TUNNEL_PROTOCOL, + SYNCH_MSG, INT_H_SIZE, link->addr); mcnt = skb_queue_len(&link->transmq) + skb_queue_len(&link->backlogq); msg_set_msgcnt(&tnl_hdr, mcnt); msg_set_bearer_id(&tnl_hdr, link->peer_bearer_id); @@ -1756,36 +1743,37 @@ tunnel_queue: goto tunnel_queue; } -/* tipc_link_failover_rcv(): Receive a tunnelled ORIGINAL_MSG packet +/* tipc_link_failover_rcv(): Receive a tunnelled FAILOVER_MSG packet * Owner node is locked. */ -static bool tipc_link_failover_rcv(struct tipc_node *node, +static bool tipc_link_failover_rcv(struct tipc_link *link, struct sk_buff **skb) { struct tipc_msg *msg = buf_msg(*skb); struct sk_buff *iskb = NULL; - struct tipc_link *link = NULL; + struct tipc_link *pl = NULL; int bearer_id = msg_bearer_id(msg); int pos = 0; - if (msg_type(msg) != ORIGINAL_MSG) { + if (msg_type(msg) != FAILOVER_MSG) { pr_warn("%sunknown tunnel pkt received\n", link_co_err); goto exit; } if (bearer_id >= MAX_BEARERS) goto exit; - link = node->links[bearer_id]; - if (!link) + + if (bearer_id == link->bearer_id) goto exit; - if (tipc_link_is_up(link)) - tipc_link_reset(link); - /* First failover packet? */ - if (link->exp_msg_count == START_CHANGEOVER) - link->exp_msg_count = msg_msgcnt(msg); + pl = link->owner->links[bearer_id]; + if (pl && tipc_link_is_up(pl)) + tipc_link_reset(pl); + + if (link->failover_pkts == FIRST_FAILOVER) + link->failover_pkts = msg_msgcnt(msg); /* Should we expect an inner packet? */ - if (!link->exp_msg_count) + if (!link->failover_pkts) goto exit; if (!tipc_msg_extract(*skb, &iskb, &pos)) { @@ -1793,22 +1781,22 @@ static bool tipc_link_failover_rcv(struct tipc_node *node, *skb = NULL; goto exit; } - link->exp_msg_count--; + link->failover_pkts--; *skb = NULL; - /* Was packet already delivered? */ - if (less(buf_seqno(iskb), link->reset_checkpoint)) { + /* Was this packet already delivered? */ + if (less(buf_seqno(iskb), link->failover_checkpt)) { kfree_skb(iskb); iskb = NULL; goto exit; } if (msg_user(buf_msg(iskb)) == MSG_FRAGMENTER) { link->stats.recv_fragments++; - tipc_buf_append(&link->reasm_buf, &iskb); + tipc_buf_append(&link->failover_skb, &iskb); } exit: - if (link && (!link->exp_msg_count) && (link->flags & LINK_STOPPED)) - tipc_link_delete(link); + if (!link->failover_pkts && pl) + pl->flags &= ~LINK_FAILINGOVER; kfree_skb(*skb); *skb = iskb; return *skb; diff --git a/net/tipc/link.h b/net/tipc/link.h index d2b5663643da..6e28f03c7905 100644 --- a/net/tipc/link.h +++ b/net/tipc/link.h @@ -58,9 +58,10 @@ /* Link endpoint execution states */ -#define LINK_STARTED 0x0001 -#define LINK_STOPPED 0x0002 -#define LINK_SYNCHING 0x0004 +#define LINK_STARTED 0x0001 +#define LINK_STOPPED 0x0002 +#define LINK_SYNCHING 0x0004 +#define LINK_FAILINGOVER 0x0008 /* Starting value for maximum packet size negotiation on unicast links * (unless bearer MTU is less) @@ -167,11 +168,12 @@ struct tipc_link { struct tipc_msg *pmsg; u32 priority; char net_plane; + u16 synch_point; - /* Changeover */ - u32 exp_msg_count; - u32 reset_checkpoint; - u32 synch_point; + /* Failover */ + u16 failover_pkts; + u16 failover_checkpt; + struct sk_buff *failover_skb; /* Max packet negotiation */ u32 max_pkt; @@ -201,7 +203,6 @@ struct tipc_link { struct sk_buff_head wakeupq; /* Fragmentation/reassembly */ - u32 long_msg_seq_no; struct sk_buff *reasm_buf; /* Statistics */ diff --git a/net/tipc/msg.c b/net/tipc/msg.c index 3bb499c61918..c3e96e815418 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -355,7 +355,7 @@ bool tipc_msg_bundle(struct sk_buff *bskb, struct sk_buff *skb, u32 mtu) start = align(bsz); pad = start - bsz; - if (unlikely(msg_user(msg) == CHANGEOVER_PROTOCOL)) + if (unlikely(msg_user(msg) == TUNNEL_PROTOCOL)) return false; if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) return false; @@ -433,7 +433,7 @@ bool tipc_msg_make_bundle(struct sk_buff **skb, u32 mtu, u32 dnode) if (msg_user(msg) == MSG_FRAGMENTER) return false; - if (msg_user(msg) == CHANGEOVER_PROTOCOL) + if (msg_user(msg) == TUNNEL_PROTOCOL) return false; if (msg_user(msg) == BCAST_PROTOCOL) return false; diff --git a/net/tipc/msg.h b/net/tipc/msg.h index d273207ede28..e1d3595e2ee9 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -72,7 +72,7 @@ struct plist; #define MSG_BUNDLER 6 #define LINK_PROTOCOL 7 #define CONN_MANAGER 8 -#define CHANGEOVER_PROTOCOL 10 +#define TUNNEL_PROTOCOL 10 #define NAME_DISTRIBUTOR 11 #define MSG_FRAGMENTER 12 #define LINK_CONFIG 13 @@ -512,8 +512,8 @@ static inline void msg_set_nameupper(struct tipc_msg *m, u32 n) /* * Changeover tunnel message types */ -#define DUPLICATE_MSG 0 -#define ORIGINAL_MSG 1 +#define SYNCH_MSG 0 +#define FAILOVER_MSG 1 /* * Config protocol message types @@ -556,9 +556,9 @@ static inline void msg_set_node_capabilities(struct tipc_msg *m, u32 n) static inline bool msg_dup(struct tipc_msg *m) { - if (likely(msg_user(m) != CHANGEOVER_PROTOCOL)) + if (likely(msg_user(m) != TUNNEL_PROTOCOL)) return false; - if (msg_type(m) != DUPLICATE_MSG) + if (msg_type(m) != SYNCH_MSG) return false; return true; } diff --git a/net/tipc/node.c b/net/tipc/node.c index 3e4f04897c03..f3d522c2881a 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -394,18 +394,17 @@ static void node_lost_contact(struct tipc_node *n_ptr) n_ptr->bclink.recv_permitted = false; } - /* Abort link changeover */ + /* Abort any ongoing link failover */ for (i = 0; i < MAX_BEARERS; i++) { struct tipc_link *l_ptr = n_ptr->links[i]; if (!l_ptr) continue; - l_ptr->reset_checkpoint = l_ptr->next_in_no; - l_ptr->exp_msg_count = 0; + l_ptr->flags &= ~LINK_FAILINGOVER; + l_ptr->failover_checkpt = 0; + l_ptr->failover_pkts = 0; + kfree_skb(l_ptr->failover_skb); + l_ptr->failover_skb = NULL; tipc_link_reset_fragments(l_ptr); - - /* Link marked for deletion after failover? => do it now */ - if (l_ptr->flags & LINK_STOPPED) - tipc_link_delete(l_ptr); } n_ptr->action_flags &= ~TIPC_WAIT_OWN_LINKS_DOWN; -- cgit v1.2.3-59-g8ed1b From ed193ece2649c194a87a9d8470195760d367c075 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 2 Apr 2015 09:33:02 -0400 Subject: tipc: simplify link mtu negotiation When a link is being established, the two endpoints advertise their respective interface MTU in the transmitted RESET and ACTIVATE messages. If there is any difference, the lower of the two MTUs will be selected for use by both endpoints. However, as a remnant of earlier attempts to introduce TIPC level routing. there also exists an MTU discovery mechanism. If an intermediate node has a lower MTU than the two endpoints, they will discover this through a bisectional approach, and finally adopt this MTU for common use. Since there is no TIPC level routing, and probably never will be, this mechanism doesn't make any sense, and only serves to make the link level protocol unecessarily complex. In this commit, we eliminate the MTU discovery algorithm,and fall back to the simple MTU advertising approach. This change is fully backwards compatible. Reviewed-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/bcast.c | 4 +- net/tipc/link.c | 129 ++++++++++++++----------------------------------------- net/tipc/link.h | 12 +++--- net/tipc/node.c | 9 ++-- 4 files changed, 43 insertions(+), 111 deletions(-) diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index ae558dd7f8ee..c5cbdcb1f0b5 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -413,7 +413,7 @@ static void bclink_accept_pkt(struct tipc_node *node, u32 seqno) */ if (((seqno - tn->own_addr) % TIPC_MIN_LINK_WIN) == 0) { tipc_link_proto_xmit(node->active_links[node->addr & 1], - STATE_MSG, 0, 0, 0, 0, 0); + STATE_MSG, 0, 0, 0, 0); tn->bcl->stats.sent_acks++; } } @@ -899,7 +899,7 @@ int tipc_bclink_init(struct net *net) skb_queue_head_init(&bclink->inputq); bcl->owner = &bclink->node; bcl->owner->net = net; - bcl->max_pkt = MAX_PKT_DEFAULT_MCAST; + bcl->mtu = MAX_PKT_DEFAULT_MCAST; tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT); bcl->bearer_id = MAX_BEARERS; rcu_assign_pointer(tn->bearer_list[MAX_BEARERS], &bcbearer->bearer); diff --git a/net/tipc/link.c b/net/tipc/link.c index b1e17953eeea..a6b30df6ec02 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -136,34 +136,6 @@ static struct tipc_link *tipc_parallel_link(struct tipc_link *l) return l->owner->active_links[1]; } -static void link_init_max_pkt(struct tipc_link *l_ptr) -{ - struct tipc_node *node = l_ptr->owner; - struct tipc_net *tn = net_generic(node->net, tipc_net_id); - struct tipc_bearer *b_ptr; - u32 max_pkt; - - rcu_read_lock(); - b_ptr = rcu_dereference_rtnl(tn->bearer_list[l_ptr->bearer_id]); - if (!b_ptr) { - rcu_read_unlock(); - return; - } - max_pkt = (b_ptr->mtu & ~3); - rcu_read_unlock(); - - if (max_pkt > MAX_MSG_SIZE) - max_pkt = MAX_MSG_SIZE; - - l_ptr->max_pkt_target = max_pkt; - if (l_ptr->max_pkt_target < MAX_PKT_DEFAULT) - l_ptr->max_pkt = l_ptr->max_pkt_target; - else - l_ptr->max_pkt = MAX_PKT_DEFAULT; - - l_ptr->max_pkt_probes = 0; -} - /* * Simple non-static link routines (i.e. referenced outside this file) */ @@ -304,7 +276,8 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr, msg_set_bearer_id(msg, b_ptr->identity); strcpy((char *)msg_data(msg), if_name); l_ptr->net_plane = b_ptr->net_plane; - link_init_max_pkt(l_ptr); + l_ptr->advertised_mtu = b_ptr->mtu; + l_ptr->mtu = l_ptr->advertised_mtu; l_ptr->priority = b_ptr->priority; tipc_link_set_queue_limits(l_ptr, b_ptr->window); l_ptr->next_out_no = 1; @@ -465,8 +438,8 @@ void tipc_link_reset(struct tipc_link *l_ptr) /* Link is down, accept any session */ l_ptr->peer_session = INVALID_SESSION; - /* Prepare for max packet size negotiation */ - link_init_max_pkt(l_ptr); + /* Prepare for renewed mtu size negotiation */ + l_ptr->mtu = l_ptr->advertised_mtu; l_ptr->state = RESET_UNKNOWN; @@ -563,11 +536,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) l_ptr->checkpoint = l_ptr->next_in_no; if (tipc_bclink_acks_missing(l_ptr->owner)) { tipc_link_proto_xmit(l_ptr, STATE_MSG, - 0, 0, 0, 0, 0); - l_ptr->fsm_msg_cnt++; - } else if (l_ptr->max_pkt < l_ptr->max_pkt_target) { - tipc_link_proto_xmit(l_ptr, STATE_MSG, - 1, 0, 0, 0, 0); + 0, 0, 0, 0); l_ptr->fsm_msg_cnt++; } link_set_timer(l_ptr, cont_intv); @@ -575,7 +544,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) } l_ptr->state = WORKING_UNKNOWN; l_ptr->fsm_msg_cnt = 0; - tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); + tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0); l_ptr->fsm_msg_cnt++; link_set_timer(l_ptr, cont_intv / 4); break; @@ -586,7 +555,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) l_ptr->state = RESET_RESET; l_ptr->fsm_msg_cnt = 0; tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG, - 0, 0, 0, 0, 0); + 0, 0, 0, 0); l_ptr->fsm_msg_cnt++; link_set_timer(l_ptr, cont_intv); break; @@ -609,7 +578,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) l_ptr->state = RESET_RESET; l_ptr->fsm_msg_cnt = 0; tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG, - 0, 0, 0, 0, 0); + 0, 0, 0, 0); l_ptr->fsm_msg_cnt++; link_set_timer(l_ptr, cont_intv); break; @@ -620,13 +589,13 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) l_ptr->checkpoint = l_ptr->next_in_no; if (tipc_bclink_acks_missing(l_ptr->owner)) { tipc_link_proto_xmit(l_ptr, STATE_MSG, - 0, 0, 0, 0, 0); + 0, 0, 0, 0); l_ptr->fsm_msg_cnt++; } link_set_timer(l_ptr, cont_intv); } else if (l_ptr->fsm_msg_cnt < l_ptr->abort_limit) { tipc_link_proto_xmit(l_ptr, STATE_MSG, - 1, 0, 0, 0, 0); + 1, 0, 0, 0); l_ptr->fsm_msg_cnt++; link_set_timer(l_ptr, cont_intv / 4); } else { /* Link has failed */ @@ -636,7 +605,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) l_ptr->state = RESET_UNKNOWN; l_ptr->fsm_msg_cnt = 0; tipc_link_proto_xmit(l_ptr, RESET_MSG, - 0, 0, 0, 0, 0); + 0, 0, 0, 0); l_ptr->fsm_msg_cnt++; link_set_timer(l_ptr, cont_intv); } @@ -656,7 +625,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) l_ptr->state = WORKING_WORKING; l_ptr->fsm_msg_cnt = 0; link_activate(l_ptr); - tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); + tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0); l_ptr->fsm_msg_cnt++; if (l_ptr->owner->working_links == 1) tipc_link_sync_xmit(l_ptr); @@ -666,7 +635,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) l_ptr->state = RESET_RESET; l_ptr->fsm_msg_cnt = 0; tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG, - 1, 0, 0, 0, 0); + 1, 0, 0, 0); l_ptr->fsm_msg_cnt++; link_set_timer(l_ptr, cont_intv); break; @@ -676,7 +645,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) link_set_timer(l_ptr, cont_intv); break; case TIMEOUT_EVT: - tipc_link_proto_xmit(l_ptr, RESET_MSG, 0, 0, 0, 0, 0); + tipc_link_proto_xmit(l_ptr, RESET_MSG, 0, 0, 0, 0); l_ptr->fsm_msg_cnt++; link_set_timer(l_ptr, cont_intv); break; @@ -694,7 +663,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) l_ptr->state = WORKING_WORKING; l_ptr->fsm_msg_cnt = 0; link_activate(l_ptr); - tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); + tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0); l_ptr->fsm_msg_cnt++; if (l_ptr->owner->working_links == 1) tipc_link_sync_xmit(l_ptr); @@ -704,7 +673,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) break; case TIMEOUT_EVT: tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG, - 0, 0, 0, 0, 0); + 0, 0, 0, 0); l_ptr->fsm_msg_cnt++; link_set_timer(l_ptr, cont_intv); break; @@ -733,7 +702,7 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link, struct tipc_msg *msg = buf_msg(skb_peek(list)); unsigned int maxwin = link->window; unsigned int imp = msg_importance(msg); - uint mtu = link->max_pkt; + uint mtu = link->mtu; uint ack = mod(link->next_in_no - 1); uint seqno = link->next_out_no; uint bc_last_in = link->owner->bclink.last_in; @@ -1187,7 +1156,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr) link_retrieve_defq(l_ptr, &head); if (unlikely(++l_ptr->rcv_unacked >= TIPC_MIN_LINK_WIN)) { l_ptr->stats.sent_acks++; - tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); + tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0); } tipc_link_input(l_ptr, skb); skb = NULL; @@ -1362,7 +1331,7 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr, if (tipc_link_defer_pkt(&l_ptr->deferdq, buf)) { l_ptr->stats.deferred_recv++; if ((skb_queue_len(&l_ptr->deferdq) % TIPC_MIN_LINK_WIN) == 1) - tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); + tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0); } else { l_ptr->stats.duplicates++; } @@ -1372,7 +1341,7 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr, * Send protocol message to the other endpoint. */ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg, - u32 gap, u32 tolerance, u32 priority, u32 ack_mtu) + u32 gap, u32 tolerance, u32 priority) { struct sk_buff *buf = NULL; struct tipc_msg *msg = l_ptr->pmsg; @@ -1410,26 +1379,11 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg, l_ptr->stats.sent_nacks++; msg_set_link_tolerance(msg, tolerance); msg_set_linkprio(msg, priority); - msg_set_max_pkt(msg, ack_mtu); + msg_set_max_pkt(msg, l_ptr->mtu); msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); msg_set_probe(msg, probe_msg != 0); - if (probe_msg) { - u32 mtu = l_ptr->max_pkt; - - if ((mtu < l_ptr->max_pkt_target) && - link_working_working(l_ptr) && - l_ptr->fsm_msg_cnt) { - msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3; - if (l_ptr->max_pkt_probes == 10) { - l_ptr->max_pkt_target = (msg_size - 4); - l_ptr->max_pkt_probes = 0; - msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3; - } - l_ptr->max_pkt_probes++; - } - + if (probe_msg) l_ptr->stats.sent_probes++; - } l_ptr->stats.sent_states++; } else { /* RESET_MSG or ACTIVATE_MSG */ msg_set_ack(msg, mod(l_ptr->failover_checkpt - 1)); @@ -1438,7 +1392,7 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg, msg_set_probe(msg, 0); msg_set_link_tolerance(msg, l_ptr->tolerance); msg_set_linkprio(msg, l_ptr->priority); - msg_set_max_pkt(msg, l_ptr->max_pkt_target); + msg_set_max_pkt(msg, l_ptr->advertised_mtu); } r_flag = (l_ptr->owner->working_links > tipc_link_is_up(l_ptr)); @@ -1469,8 +1423,6 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf) { u32 rec_gap = 0; - u32 max_pkt_info; - u32 max_pkt_ack; u32 msg_tol; struct tipc_msg *msg = buf_msg(buf); @@ -1513,15 +1465,8 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, if (msg_linkprio(msg) > l_ptr->priority) l_ptr->priority = msg_linkprio(msg); - max_pkt_info = msg_max_pkt(msg); - if (max_pkt_info) { - if (max_pkt_info < l_ptr->max_pkt_target) - l_ptr->max_pkt_target = max_pkt_info; - if (l_ptr->max_pkt > l_ptr->max_pkt_target) - l_ptr->max_pkt = l_ptr->max_pkt_target; - } else { - l_ptr->max_pkt = l_ptr->max_pkt_target; - } + if (l_ptr->mtu > msg_max_pkt(msg)) + l_ptr->mtu = msg_max_pkt(msg); /* Synchronize broadcast link info, if not done previously */ if (!tipc_node_is_up(l_ptr->owner)) { @@ -1566,18 +1511,8 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, mod(l_ptr->next_in_no)); } - max_pkt_ack = msg_max_pkt(msg); - if (max_pkt_ack > l_ptr->max_pkt) { - l_ptr->max_pkt = max_pkt_ack; - l_ptr->max_pkt_probes = 0; - } - - max_pkt_ack = 0; - if (msg_probe(msg)) { + if (msg_probe(msg)) l_ptr->stats.recv_probes++; - if (msg_size(msg) > sizeof(l_ptr->proto_msg)) - max_pkt_ack = msg_size(msg); - } /* Protocol message before retransmits, reduce loss risk */ if (l_ptr->owner->bclink.recv_permitted) @@ -1585,8 +1520,8 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, msg_last_bcast(msg)); if (rec_gap || (msg_probe(msg))) { - tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, rec_gap, 0, - 0, max_pkt_ack); + tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, + rec_gap, 0, 0); } if (msg_seq_gap(msg)) { l_ptr->stats.recv_nacks++; @@ -1816,7 +1751,7 @@ static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol) void tipc_link_set_queue_limits(struct tipc_link *l, u32 win) { - int max_bulk = TIPC_MAX_PUBLICATIONS / (l->max_pkt / ITEM_SIZE); + int max_bulk = TIPC_MAX_PUBLICATIONS / (l->mtu / ITEM_SIZE); l->window = win; l->backlog[TIPC_LOW_IMPORTANCE].limit = win / 2; @@ -1988,14 +1923,14 @@ int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info) tol = nla_get_u32(props[TIPC_NLA_PROP_TOL]); link_set_supervision_props(link, tol); - tipc_link_proto_xmit(link, STATE_MSG, 0, 0, tol, 0, 0); + tipc_link_proto_xmit(link, STATE_MSG, 0, 0, tol, 0); } if (props[TIPC_NLA_PROP_PRIO]) { u32 prio; prio = nla_get_u32(props[TIPC_NLA_PROP_PRIO]); link->priority = prio; - tipc_link_proto_xmit(link, STATE_MSG, 0, 0, 0, prio, 0); + tipc_link_proto_xmit(link, STATE_MSG, 0, 0, 0, prio); } if (props[TIPC_NLA_PROP_WIN]) { u32 win; @@ -2100,7 +2035,7 @@ static int __tipc_nl_add_link(struct net *net, struct tipc_nl_msg *msg, if (nla_put_u32(msg->skb, TIPC_NLA_LINK_DEST, tipc_cluster_mask(tn->own_addr))) goto attr_msg_full; - if (nla_put_u32(msg->skb, TIPC_NLA_LINK_MTU, link->max_pkt)) + if (nla_put_u32(msg->skb, TIPC_NLA_LINK_MTU, link->mtu)) goto attr_msg_full; if (nla_put_u32(msg->skb, TIPC_NLA_LINK_RX, link->next_in_no)) goto attr_msg_full; diff --git a/net/tipc/link.h b/net/tipc/link.h index 6e28f03c7905..b5b4e3554d4e 100644 --- a/net/tipc/link.h +++ b/net/tipc/link.h @@ -123,9 +123,8 @@ struct tipc_stats { * @backlog_limit: backlog queue congestion thresholds (indexed by importance) * @exp_msg_count: # of tunnelled messages expected during link changeover * @reset_checkpoint: seq # of last acknowledged message at time of link reset - * @max_pkt: current maximum packet size for this link - * @max_pkt_target: desired maximum packet size for this link - * @max_pkt_probes: # of probes based on current (max_pkt, max_pkt_target) + * @mtu: current maximum packet size for this link + * @advertised_mtu: advertised own mtu when link is being established * @transmitq: queue for sent, non-acked messages * @backlogq: queue for messages waiting to be sent * @next_out_no: next sequence number to use for outbound messages @@ -176,9 +175,8 @@ struct tipc_link { struct sk_buff *failover_skb; /* Max packet negotiation */ - u32 max_pkt; - u32 max_pkt_target; - u32 max_pkt_probes; + u16 mtu; + u16 advertised_mtu; /* Sending */ struct sk_buff_head transmq; @@ -233,7 +231,7 @@ int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dest, int __tipc_link_xmit(struct net *net, struct tipc_link *link, struct sk_buff_head *list); void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob, - u32 gap, u32 tolerance, u32 priority, u32 acked_mtu); + u32 gap, u32 tolerance, u32 priority); void tipc_link_push_packets(struct tipc_link *l_ptr); u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *buf); void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window); diff --git a/net/tipc/node.c b/net/tipc/node.c index f3d522c2881a..22c059ad2999 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -254,8 +254,8 @@ void tipc_node_link_up(struct tipc_node *n_ptr, struct tipc_link *l_ptr) active[0] = active[1] = l_ptr; exit: /* Leave room for changeover header when returning 'mtu' to users: */ - n_ptr->act_mtus[0] = active[0]->max_pkt - INT_H_SIZE; - n_ptr->act_mtus[1] = active[1]->max_pkt - INT_H_SIZE; + n_ptr->act_mtus[0] = active[0]->mtu - INT_H_SIZE; + n_ptr->act_mtus[1] = active[1]->mtu - INT_H_SIZE; } /** @@ -319,11 +319,10 @@ void tipc_node_link_down(struct tipc_node *n_ptr, struct tipc_link *l_ptr) /* Leave room for changeover header when returning 'mtu' to users: */ if (active[0]) { - n_ptr->act_mtus[0] = active[0]->max_pkt - INT_H_SIZE; - n_ptr->act_mtus[1] = active[1]->max_pkt - INT_H_SIZE; + n_ptr->act_mtus[0] = active[0]->mtu - INT_H_SIZE; + n_ptr->act_mtus[1] = active[1]->mtu - INT_H_SIZE; return; } - /* Loopback link went down? No fragmentation needed from now on. */ if (n_ptr->addr == tn->own_addr) { n_ptr->act_mtus[0] = MAX_MSG_SIZE; -- cgit v1.2.3-59-g8ed1b