aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/node.c
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-07-30 18:24:19 -0400
committerDavid S. Miller <davem@davemloft.net>2015-07-30 17:25:14 -0700
commit6e498158a827fd515b514842e9a06bdf0f75ab86 (patch)
tree0f9312078445c1bd7d2ed669c564ca6c7a330764 /net/tipc/node.c
parenttipc: extend node FSM (diff)
downloadlinux-dev-6e498158a827fd515b514842e9a06bdf0f75ab86.tar.xz
linux-dev-6e498158a827fd515b514842e9a06bdf0f75ab86.zip
tipc: move link synch and failover to link aggregation level
Link failover and synchronization have until now been handled by the links themselves, forcing them to have knowledge about and to access parallel links in order to make the two algorithms work correctly. In this commit, we move the control part of this functionality to the link aggregation level in node.c, which is the right location for this. As a result, the two algorithms become easier to follow, and the link implementation becomes simpler. Tested-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/node.c')
-rw-r--r--net/tipc/node.c291
1 files changed, 190 insertions, 101 deletions
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 6b18d73830ca..b0372bb107f6 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -42,6 +42,31 @@
#include "bcast.h"
#include "discover.h"
+/* Node FSM states and events:
+ */
+enum {
+ SELF_DOWN_PEER_DOWN = 0xdd,
+ SELF_UP_PEER_UP = 0xaa,
+ SELF_DOWN_PEER_LEAVING = 0xd1,
+ SELF_UP_PEER_COMING = 0xac,
+ SELF_COMING_PEER_UP = 0xca,
+ SELF_LEAVING_PEER_DOWN = 0x1d,
+ NODE_FAILINGOVER = 0xf0,
+ NODE_SYNCHING = 0xcc
+};
+
+enum {
+ SELF_ESTABL_CONTACT_EVT = 0xece,
+ SELF_LOST_CONTACT_EVT = 0x1ce,
+ PEER_ESTABL_CONTACT_EVT = 0x9ece,
+ PEER_LOST_CONTACT_EVT = 0x91ce,
+ NODE_FAILOVER_BEGIN_EVT = 0xfbe,
+ NODE_FAILOVER_END_EVT = 0xfee,
+ NODE_SYNCH_BEGIN_EVT = 0xcbe,
+ NODE_SYNCH_END_EVT = 0xcee
+};
+
+static void tipc_node_link_down(struct tipc_node *n, int bearer_id);
static void node_lost_contact(struct tipc_node *n_ptr);
static void node_established_contact(struct tipc_node *n_ptr);
static void tipc_node_delete(struct tipc_node *node);
@@ -281,69 +306,75 @@ static void tipc_node_timeout(unsigned long data)
*
* Link becomes active (alone or shared) or standby, depending on its priority.
*/
-void tipc_node_link_up(struct tipc_node *n, int bearer_id)
+static void tipc_node_link_up(struct tipc_node *n, int bearer_id,
+ struct sk_buff_head *xmitq)
{
int *slot0 = &n->active_links[0];
int *slot1 = &n->active_links[1];
- struct tipc_link_entry *links = n->links;
- struct tipc_link *l = n->links[bearer_id].link;
-
- /* Leave room for tunnel header when returning 'mtu' to users: */
- links[bearer_id].mtu = l->mtu - INT_H_SIZE;
+ struct tipc_link *ol = node_active_link(n, 0);
+ struct tipc_link *nl = n->links[bearer_id].link;
+ if (n->working_links > 1) {
+ pr_warn("Attempt to establish 3rd link to %x\n", n->addr);
+ return;
+ }
n->working_links++;
n->action_flags |= TIPC_NOTIFY_LINK_UP;
- n->link_id = l->peer_bearer_id << 16 | l->bearer_id;
+ n->link_id = nl->peer_bearer_id << 16 | bearer_id;
+
+ /* Leave room for tunnel header when returning 'mtu' to users: */
+ n->links[bearer_id].mtu = nl->mtu - INT_H_SIZE;
tipc_bearer_add_dest(n->net, bearer_id, n->addr);
pr_debug("Established link <%s> on network plane %c\n",
- l->name, l->net_plane);
+ nl->name, nl->net_plane);
- /* No active links ? => take both active slots */
- if (!tipc_node_is_up(n)) {
+ /* First link? => give it both slots */
+ if (!ol) {
*slot0 = bearer_id;
*slot1 = bearer_id;
+ nl->exec_mode = TIPC_LINK_OPEN;
node_established_contact(n);
return;
}
- /* Lower prio than current active ? => no slot */
- if (l->priority < links[*slot0].link->priority) {
- pr_debug("New link <%s> becomes standby\n", l->name);
- return;
- }
- tipc_link_dup_queue_xmit(links[*slot0].link, l);
-
- /* Same prio as current active ? => take one slot */
- if (l->priority == links[*slot0].link->priority) {
+ /* Second link => redistribute slots */
+ if (nl->priority > ol->priority) {
+ pr_debug("Old link <%s> becomes standby\n", ol->name);
*slot0 = bearer_id;
- return;
+ *slot1 = bearer_id;
+ } else if (nl->priority == ol->priority) {
+ *slot0 = bearer_id;
+ } else {
+ pr_debug("New link <%s> is standby\n", nl->name);
}
- /* Higher prio than current active => take both active slots */
- pr_debug("Old link <%s> now standby\n", links[*slot0].link->name);
- *slot0 = bearer_id;
- *slot1 = bearer_id;
+ /* Prepare synchronization with first link */
+ tipc_link_tnl_prepare(ol, nl, SYNCH_MSG, xmitq);
}
/**
* tipc_node_link_down - handle loss of link
*/
-void tipc_node_link_down(struct tipc_node *n, int bearer_id)
+static void tipc_node_link_down(struct tipc_node *n, int bearer_id)
{
int *slot0 = &n->active_links[0];
int *slot1 = &n->active_links[1];
+ struct tipc_media_addr *maddr = &n->links[bearer_id].maddr;
int i, highest = 0;
- struct tipc_link *l, *_l;
+ struct tipc_link *l, *_l, *tnl;
+ struct sk_buff_head xmitq;
l = n->links[bearer_id].link;
if (!l || !tipc_link_is_up(l))
return;
+ __skb_queue_head_init(&xmitq);
+
n->working_links--;
n->action_flags |= TIPC_NOTIFY_LINK_DOWN;
- n->link_id = l->peer_bearer_id << 16 | l->bearer_id;
+ n->link_id = l->peer_bearer_id << 16 | bearer_id;
tipc_bearer_remove_dest(n->net, l->bearer_id, n->addr);
@@ -370,13 +401,19 @@ void tipc_node_link_down(struct tipc_node *n, int bearer_id)
*slot1 = i;
}
- if (tipc_node_is_up(n))
- tipc_link_failover_send_queue(l);
+ if (!tipc_node_is_up(n)) {
+ tipc_link_reset(l);
+ node_lost_contact(n);
+ return;
+ }
+ /* There is still a working link => initiate failover */
+ tnl = node_active_link(n, 0);
+ tipc_node_fsm_evt(n, NODE_FAILOVER_BEGIN_EVT);
+ n->sync_point = tnl->rcv_nxt + (U16_MAX / 2 - 1);
+ tipc_link_tnl_prepare(l, tnl, FAILOVER_MSG, &xmitq);
tipc_link_reset(l);
-
- if (!tipc_node_is_up(n))
- node_lost_contact(n);
+ tipc_bearer_xmit(n->net, tnl->bearer_id, &xmitq, maddr);
}
bool tipc_node_is_up(struct tipc_node *n)
@@ -652,37 +689,22 @@ illegal_evt:
pr_err("Illegal node fsm evt %x in state %x\n", evt, state);
}
-bool tipc_node_filter_skb(struct tipc_node *n, struct tipc_link *l,
- struct tipc_msg *hdr)
+bool tipc_node_filter_pkt(struct tipc_node *n, struct tipc_msg *hdr)
{
int state = n->state;
if (likely(state == SELF_UP_PEER_UP))
return true;
- if (state == SELF_DOWN_PEER_DOWN)
- return true;
-
- if (state == SELF_UP_PEER_COMING) {
- /* If not traffic msg, peer may still be ESTABLISHING */
- if (tipc_link_is_up(l) && msg_is_traffic(hdr))
- tipc_node_fsm_evt(n, PEER_ESTABL_CONTACT_EVT);
- return true;
- }
-
- if (state == SELF_COMING_PEER_UP)
- return true;
-
if (state == SELF_LEAVING_PEER_DOWN)
return false;
if (state == SELF_DOWN_PEER_LEAVING) {
- if (msg_peer_is_up(hdr))
+ if (msg_peer_node_is_up(hdr))
return false;
- tipc_node_fsm_evt(n, PEER_LOST_CONTACT_EVT);
- return true;
}
- return false;
+
+ return true;
}
static void node_established_contact(struct tipc_node *n_ptr)
@@ -727,10 +749,8 @@ static void node_lost_contact(struct tipc_node *n_ptr)
if (!l_ptr)
continue;
l_ptr->exec_mode = TIPC_LINK_OPEN;
- l_ptr->failover_checkpt = 0;
- l_ptr->failover_pkts = 0;
- kfree_skb(l_ptr->failover_skb);
- l_ptr->failover_skb = NULL;
+ kfree_skb(l_ptr->failover_reasm_skb);
+ l_ptr->failover_reasm_skb = NULL;
tipc_link_reset_fragments(l_ptr);
}
/* Prevent re-contact with node until cleanup is done */
@@ -961,38 +981,111 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
return 0;
}
-/* tipc_node_tnl_init(): handle a received TUNNEL_PROTOCOL packet,
- * in order to control parallel link failover or synchronization
+/**
+ * tipc_node_check_state - check and if necessary update node state
+ * @skb: TIPC packet
+ * @bearer_id: identity of bearer delivering the packet
+ * Returns true if state is ok, otherwise consumes buffer and returns false
*/
-static void tipc_node_tnl_init(struct tipc_node *n, int bearer_id,
- struct sk_buff *skb)
+static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb,
+ int bearer_id)
{
- struct tipc_link *tnl, *pl;
struct tipc_msg *hdr = buf_msg(skb);
+ int usr = msg_user(hdr);
+ int mtyp = msg_type(hdr);
u16 oseqno = msg_seqno(hdr);
- int pb_id = msg_bearer_id(hdr);
+ u16 iseqno = msg_seqno(msg_get_wrapped(hdr));
+ u16 exp_pkts = msg_msgcnt(hdr);
+ u16 rcv_nxt, syncpt, dlv_nxt;
+ int state = n->state;
+ struct tipc_link *l, *pl = NULL;
+ struct sk_buff_head;
+ int i;
- if (pb_id >= MAX_BEARERS)
- return;
+ l = n->links[bearer_id].link;
+ if (!l)
+ return false;
+ rcv_nxt = l->rcv_nxt;
- tnl = n->links[bearer_id].link;
- if (!tnl)
- return;
- /* Ignore if duplicate */
- if (less(oseqno, tnl->rcv_nxt))
- return;
+ if (likely((state == SELF_UP_PEER_UP) && (usr != TUNNEL_PROTOCOL)))
+ return true;
- pl = n->links[pb_id].link;
- if (!pl)
- return;
+ /* Find parallel link, if any */
+ for (i = 0; i < MAX_BEARERS; i++) {
+ if ((i != bearer_id) && n->links[i].link) {
+ pl = n->links[i].link;
+ break;
+ }
+ }
- if (msg_type(hdr) == FAILOVER_MSG) {
- if (tipc_link_is_up(pl)) {
- tipc_node_link_down(n, pb_id);
+ /* Update node accesibility if applicable */
+ if (state == SELF_UP_PEER_COMING) {
+ if (!tipc_link_is_up(l))
+ return true;
+ if (!msg_peer_link_is_up(hdr))
+ return true;
+ tipc_node_fsm_evt(n, PEER_ESTABL_CONTACT_EVT);
+ }
+
+ if (state == SELF_DOWN_PEER_LEAVING) {
+ if (msg_peer_node_is_up(hdr))
+ return false;
+ tipc_node_fsm_evt(n, PEER_LOST_CONTACT_EVT);
+ }
+
+ /* Ignore duplicate packets */
+ if (less(oseqno, rcv_nxt))
+ return true;
+
+ /* Initiate or update failover mode if applicable */
+ if ((usr == TUNNEL_PROTOCOL) && (mtyp == FAILOVER_MSG)) {
+ syncpt = oseqno + exp_pkts - 1;
+ if (pl && tipc_link_is_up(pl)) {
+ tipc_node_link_down(n, pl->bearer_id);
pl->exec_mode = TIPC_LINK_BLOCKED;
}
+ /* If pkts arrive out of order, use lowest calculated syncpt */
+ if (less(syncpt, n->sync_point))
+ n->sync_point = syncpt;
+ }
+
+ /* Open parallel link when tunnel link reaches synch point */
+ if ((n->state == NODE_FAILINGOVER) && (more(rcv_nxt, n->sync_point))) {
+ tipc_node_fsm_evt(n, NODE_FAILOVER_END_EVT);
+ if (pl)
+ pl->exec_mode = TIPC_LINK_OPEN;
+ return true;
+ }
+
+ /* Initiate or update synch mode if applicable */
+ if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG)) {
+ syncpt = iseqno + exp_pkts - 1;
+ if (n->state == SELF_UP_PEER_UP) {
+ n->sync_point = syncpt;
+ tipc_node_fsm_evt(n, NODE_SYNCH_BEGIN_EVT);
+ }
+ l->exec_mode = TIPC_LINK_TUNNEL;
+ if (less(syncpt, n->sync_point))
+ n->sync_point = syncpt;
}
+
+ /* Open tunnel link when parallel link reaches synch point */
+ if ((n->state == NODE_SYNCHING) && (l->exec_mode == TIPC_LINK_TUNNEL)) {
+ if (pl)
+ dlv_nxt = mod(pl->rcv_nxt - skb_queue_len(pl->inputq));
+ if (!pl || more(dlv_nxt, n->sync_point)) {
+ tipc_node_fsm_evt(n, NODE_SYNCH_END_EVT);
+ l->exec_mode = TIPC_LINK_OPEN;
+ return true;
+ }
+ if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG))
+ return true;
+ if (usr == LINK_PROTOCOL)
+ return true;
+ return false;
+ }
+ return true;
}
/**
@@ -1008,12 +1101,11 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
{
struct sk_buff_head xmitq;
struct tipc_node *n;
- struct tipc_link *l;
- struct tipc_msg *hdr;
- struct tipc_media_addr *maddr;
+ struct tipc_msg *hdr = buf_msg(skb);
+ int usr = msg_user(hdr);
int bearer_id = b->identity;
+ struct tipc_link_entry *le;
int rc = 0;
- int usr;
__skb_queue_head_init(&xmitq);
@@ -1022,8 +1114,6 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
goto discard;
/* Handle arrival of a non-unicast link packet */
- hdr = buf_msg(skb);
- usr = msg_user(hdr);
if (unlikely(msg_non_seq(hdr))) {
if (usr == LINK_CONFIG)
tipc_disc_rcv(net, skb, b);
@@ -1036,42 +1126,41 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
n = tipc_node_find(net, msg_prevnode(hdr));
if (unlikely(!n))
goto discard;
- tipc_node_lock(n);
+ le = &n->links[bearer_id];
- /* Prepare links for tunneled reception if applicable */
- if (unlikely(usr == TUNNEL_PROTOCOL))
- tipc_node_tnl_init(n, bearer_id, skb);
+ tipc_node_lock(n);
- /* Locate link endpoint that should handle packet */
- l = n->links[bearer_id].link;
- if (unlikely(!l))
+ /* Is reception permitted at the moment ? */
+ if (!tipc_node_filter_pkt(n, hdr))
goto unlock;
- /* Is reception of this packet permitted at the moment ? */
- if (unlikely(n->state != SELF_UP_PEER_UP))
- if (!tipc_node_filter_skb(n, l, hdr))
- goto unlock;
-
- if (unlikely(usr == LINK_PROTOCOL))
+ if (unlikely(msg_user(hdr) == LINK_PROTOCOL))
tipc_bclink_sync_state(n, hdr);
/* Release acked broadcast messages */
if (unlikely(n->bclink.acked != msg_bcast_ack(hdr)))
tipc_bclink_acknowledge(n, msg_bcast_ack(hdr));
- /* Check protocol and update link state */
- rc = tipc_link_rcv(l, skb, &xmitq);
+ /* Check and if necessary update node state */
+ if (likely(tipc_node_check_state(n, skb, bearer_id))) {
+ rc = tipc_link_rcv(le->link, skb, &xmitq);
+ skb = NULL;
+ }
if (unlikely(rc & TIPC_LINK_UP_EVT))
- tipc_node_link_up(n, bearer_id);
+ tipc_node_link_up(n, bearer_id, &xmitq);
+
if (unlikely(rc & TIPC_LINK_DOWN_EVT))
tipc_node_link_down(n, bearer_id);
- skb = NULL;
unlock:
tipc_node_unlock(n);
- tipc_sk_rcv(net, &n->links[bearer_id].inputq);
- maddr = &n->links[bearer_id].maddr;
- tipc_bearer_xmit(net, bearer_id, &xmitq, maddr);
+
+ if (!skb_queue_empty(&le->inputq))
+ tipc_sk_rcv(net, &le->inputq);
+
+ if (!skb_queue_empty(&xmitq))
+ tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
+
tipc_node_put(n);
discard:
kfree_skb(skb);