diff options
Diffstat (limited to '')
-rw-r--r-- | net/tipc/socket.c | 382 |
1 files changed, 223 insertions, 159 deletions
diff --git a/net/tipc/socket.c b/net/tipc/socket.c index caa4d663fd90..66666805b53c 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -1,7 +1,7 @@ /* * net/tipc/socket.c: TIPC socket API * - * Copyright (c) 2001-2007, 2012-2014, Ericsson AB + * Copyright (c) 2001-2007, 2012-2015, Ericsson AB * Copyright (c) 2004-2008, 2010-2013, Wind River Systems * All rights reserved. * @@ -41,6 +41,7 @@ #include "node.h" #include "link.h" #include "config.h" +#include "name_distr.h" #include "socket.h" #define SS_LISTENING -1 /* socket is listening */ @@ -69,8 +70,6 @@ * @pub_count: total # of publications port has made during its lifetime * @probing_state: * @probing_intv: - * @port: port - interacts with 'sk' and with the rest of the TIPC stack - * @peer_name: the peer of the connection, if any * @conn_timeout: the time we can wait for an unresponded setup request * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue * @link_cong: non-zero if owner must sleep because of link congestion @@ -177,6 +176,11 @@ static const struct nla_policy tipc_nl_sock_policy[TIPC_NLA_SOCK_MAX + 1] = { * - port reference */ +static u32 tsk_own_node(struct tipc_sock *tsk) +{ + return msg_prevnode(&tsk->phdr); +} + static u32 tsk_peer_node(struct tipc_sock *tsk) { return msg_destnode(&tsk->phdr); @@ -249,11 +253,11 @@ static void tsk_rej_rx_queue(struct sock *sk) { struct sk_buff *skb; u32 dnode; - struct net *net = sock_net(sk); + u32 own_node = tsk_own_node(tipc_sk(sk)); while ((skb = __skb_dequeue(&sk->sk_receive_queue))) { - if (tipc_msg_reverse(net, skb, &dnode, TIPC_ERR_NO_PORT)) - tipc_link_xmit_skb(net, skb, dnode, 0); + if (tipc_msg_reverse(own_node, skb, &dnode, TIPC_ERR_NO_PORT)) + tipc_link_xmit_skb(sock_net(sk), skb, dnode, 0); } } @@ -305,6 +309,7 @@ static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg) static int tipc_sk_create(struct net *net, struct socket *sock, int protocol, int kern) { + struct tipc_net *tn; const struct proto_ops *ops; socket_state state; struct sock *sk; @@ -346,7 +351,8 @@ static int tipc_sk_create(struct net *net, struct socket *sock, tsk->max_pkt = MAX_PKT_DEFAULT; INIT_LIST_HEAD(&tsk->publications); msg = &tsk->phdr; - tipc_msg_init(net, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG, + tn = net_generic(sock_net(sk), tipc_net_id); + tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG, NAMED_H_SIZE, 0); /* Finish initializing socket data structures */ @@ -471,7 +477,6 @@ static int tipc_release(struct socket *sock) { struct sock *sk = sock->sk; struct net *net; - struct tipc_net *tn; struct tipc_sock *tsk; struct sk_buff *skb; u32 dnode, probing_state; @@ -484,8 +489,6 @@ static int tipc_release(struct socket *sock) return 0; net = sock_net(sk); - tn = net_generic(net, tipc_net_id); - tsk = tipc_sk(sk); lock_sock(sk); @@ -507,7 +510,7 @@ static int tipc_release(struct socket *sock) tsk->connected = 0; tipc_node_remove_conn(net, dnode, tsk->portid); } - if (tipc_msg_reverse(net, skb, &dnode, + if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, TIPC_ERR_NO_PORT)) tipc_link_xmit_skb(net, skb, dnode, 0); } @@ -520,9 +523,9 @@ static int tipc_release(struct socket *sock) sock_put(sk); tipc_sk_remove(tsk); if (tsk->connected) { - skb = tipc_msg_create(net, TIPC_CRITICAL_IMPORTANCE, + skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, SHORT_H_SIZE, 0, dnode, - tn->own_addr, tsk_peer_port(tsk), + tsk_own_node(tsk), tsk_peer_port(tsk), tsk->portid, TIPC_ERR_NO_PORT); if (skb) tipc_link_xmit_skb(net, skb, dnode, tsk->portid); @@ -730,9 +733,10 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, struct msghdr *msg, size_t dsz, long timeo) { struct sock *sk = sock->sk; + struct tipc_sock *tsk = tipc_sk(sk); struct net *net = sock_net(sk); - struct tipc_msg *mhdr = &tipc_sk(sk)->phdr; - struct sk_buff_head head; + struct tipc_msg *mhdr = &tsk->phdr; + struct sk_buff_head *pktchain = &sk->sk_write_queue; struct iov_iter save = msg->msg_iter; uint mtu; int rc; @@ -748,13 +752,12 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, new_mtu: mtu = tipc_bclink_get_mtu(); - __skb_queue_head_init(&head); - rc = tipc_msg_build(net, mhdr, msg, 0, dsz, mtu, &head); + rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, pktchain); if (unlikely(rc < 0)) return rc; do { - rc = tipc_bclink_xmit(net, &head); + rc = tipc_bclink_xmit(net, pktchain); if (likely(rc >= 0)) { rc = dsz; break; @@ -768,62 +771,78 @@ new_mtu: tipc_sk(sk)->link_cong = 1; rc = tipc_wait_for_sndmsg(sock, &timeo); if (rc) - __skb_queue_purge(&head); + __skb_queue_purge(pktchain); } while (!rc); return rc; } -/* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets +/** + * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets + * @arrvq: queue with arriving messages, to be cloned after destination lookup + * @inputq: queue with cloned messages, delivered to socket after dest lookup + * + * Multi-threaded: parallel calls with reference to same queues may occur */ -void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *buf) +void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, + struct sk_buff_head *inputq) { - struct tipc_msg *msg = buf_msg(buf); - struct tipc_port_list dports = {0, NULL, }; - struct tipc_port_list *item; - struct sk_buff *b; - uint i, last, dst = 0; + struct tipc_msg *msg; + struct tipc_plist dports; + u32 portid; u32 scope = TIPC_CLUSTER_SCOPE; - - if (in_own_node(net, msg_orignode(msg))) - scope = TIPC_NODE_SCOPE; - - /* Create destination port list: */ - tipc_nametbl_mc_translate(net, msg_nametype(msg), msg_namelower(msg), - msg_nameupper(msg), scope, &dports); - last = dports.count; - if (!last) { - kfree_skb(buf); - return; - } - - for (item = &dports; item; item = item->next) { - for (i = 0; i < PLSIZE && ++dst <= last; i++) { - b = (dst != last) ? skb_clone(buf, GFP_ATOMIC) : buf; - if (!b) { - pr_warn("Failed do clone mcast rcv buffer\n"); + struct sk_buff_head tmpq; + uint hsz; + struct sk_buff *skb, *_skb; + + __skb_queue_head_init(&tmpq); + tipc_plist_init(&dports); + + skb = tipc_skb_peek(arrvq, &inputq->lock); + for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) { + msg = buf_msg(skb); + hsz = skb_headroom(skb) + msg_hdr_sz(msg); + + if (in_own_node(net, msg_orignode(msg))) + scope = TIPC_NODE_SCOPE; + + /* Create destination port list and message clones: */ + tipc_nametbl_mc_translate(net, + msg_nametype(msg), msg_namelower(msg), + msg_nameupper(msg), scope, &dports); + portid = tipc_plist_pop(&dports); + for (; portid; portid = tipc_plist_pop(&dports)) { + _skb = __pskb_copy(skb, hsz, GFP_ATOMIC); + if (_skb) { + msg_set_destport(buf_msg(_skb), portid); + __skb_queue_tail(&tmpq, _skb); continue; } - msg_set_destport(msg, item->ports[i]); - tipc_sk_rcv(net, b); + pr_warn("Failed to clone mcast rcv buffer\n"); + } + /* Append to inputq if not already done by other thread */ + spin_lock_bh(&inputq->lock); + if (skb_peek(arrvq) == skb) { + skb_queue_splice_tail_init(&tmpq, inputq); + kfree_skb(__skb_dequeue(arrvq)); } + spin_unlock_bh(&inputq->lock); + __skb_queue_purge(&tmpq); + kfree_skb(skb); } - tipc_port_list_free(&dports); + tipc_sk_rcv(net, inputq); } /** * tipc_sk_proto_rcv - receive a connection mng protocol message * @tsk: receiving socket - * @dnode: node to send response message to, if any - * @buf: buffer containing protocol message - * Returns 0 (TIPC_OK) if message was consumed, 1 (TIPC_FWD_MSG) if - * (CONN_PROBE_REPLY) message should be forwarded. + * @skb: pointer to message buffer. Set to NULL if buffer is consumed. */ -static int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode, - struct sk_buff *buf) +static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff **skb) { - struct tipc_msg *msg = buf_msg(buf); + struct tipc_msg *msg = buf_msg(*skb); int conn_cong; - + u32 dnode; + u32 own_node = tsk_own_node(tsk); /* Ignore if connection cannot be validated: */ if (!tsk_peer_msg(tsk, msg)) goto exit; @@ -836,15 +855,15 @@ static int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode, if (conn_cong) tsk->sk.sk_write_space(&tsk->sk); } else if (msg_type(msg) == CONN_PROBE) { - if (!tipc_msg_reverse(sock_net(&tsk->sk), buf, dnode, TIPC_OK)) - return TIPC_OK; - msg_set_type(msg, CONN_PROBE_REPLY); - return TIPC_FWD_MSG; + if (tipc_msg_reverse(own_node, *skb, &dnode, TIPC_OK)) { + msg_set_type(msg, CONN_PROBE_REPLY); + return; + } } /* Do nothing if msg_type() == CONN_PROBE_REPLY */ exit: - kfree_skb(buf); - return TIPC_OK; + kfree_skb(*skb); + *skb = NULL; } static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p) @@ -895,7 +914,7 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock, struct net *net = sock_net(sk); struct tipc_msg *mhdr = &tsk->phdr; u32 dnode, dport; - struct sk_buff_head head; + struct sk_buff_head *pktchain = &sk->sk_write_queue; struct sk_buff *skb; struct tipc_name_seq *seq = &dest->addr.nameseq; struct iov_iter save; @@ -970,15 +989,14 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock, save = m->msg_iter; new_mtu: mtu = tipc_node_get_mtu(net, dnode, tsk->portid); - __skb_queue_head_init(&head); - rc = tipc_msg_build(net, mhdr, m, 0, dsz, mtu, &head); + rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, pktchain); if (rc < 0) goto exit; do { - skb = skb_peek(&head); + skb = skb_peek(pktchain); TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong; - rc = tipc_link_xmit(net, &head, dnode, tsk->portid); + rc = tipc_link_xmit(net, pktchain, dnode, tsk->portid); if (likely(rc >= 0)) { if (sock->state != SS_READY) sock->state = SS_CONNECTING; @@ -994,7 +1012,7 @@ new_mtu: tsk->link_cong = 1; rc = tipc_wait_for_sndmsg(sock, &timeo); if (rc) - __skb_queue_purge(&head); + __skb_queue_purge(pktchain); } while (!rc); exit: if (iocb) @@ -1052,7 +1070,7 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock, struct net *net = sock_net(sk); struct tipc_sock *tsk = tipc_sk(sk); struct tipc_msg *mhdr = &tsk->phdr; - struct sk_buff_head head; + struct sk_buff_head *pktchain = &sk->sk_write_queue; DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); u32 portid = tsk->portid; int rc = -EINVAL; @@ -1089,13 +1107,12 @@ next: save = m->msg_iter; mtu = tsk->max_pkt; send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE); - __skb_queue_head_init(&head); - rc = tipc_msg_build(net, mhdr, m, sent, send, mtu, &head); + rc = tipc_msg_build(mhdr, m, sent, send, mtu, pktchain); if (unlikely(rc < 0)) goto exit; do { if (likely(!tsk_conn_cong(tsk))) { - rc = tipc_link_xmit(net, &head, dnode, portid); + rc = tipc_link_xmit(net, pktchain, dnode, portid); if (likely(!rc)) { tsk->sent_unacked++; sent += send; @@ -1115,7 +1132,7 @@ next: } rc = tipc_wait_for_sndpkt(sock, &timeo); if (rc) - __skb_queue_purge(&head); + __skb_queue_purge(pktchain); } while (!rc); exit: if (iocb) @@ -1263,7 +1280,6 @@ static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg, static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack) { struct net *net = sock_net(&tsk->sk); - struct tipc_net *tn = net_generic(net, tipc_net_id); struct sk_buff *skb = NULL; struct tipc_msg *msg; u32 peer_port = tsk_peer_port(tsk); @@ -1271,9 +1287,9 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack) if (!tsk->connected) return; - skb = tipc_msg_create(net, CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0, - dnode, tn->own_addr, peer_port, tsk->portid, - TIPC_OK); + skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0, + dnode, tsk_own_node(tsk), peer_port, + tsk->portid, TIPC_OK); if (!skb) return; msg = buf_msg(skb); @@ -1564,16 +1580,16 @@ static void tipc_data_ready(struct sock *sk) /** * filter_connect - Handle all incoming messages for a connection-based socket * @tsk: TIPC socket - * @msg: message + * @skb: pointer to message buffer. Set to NULL if buffer is consumed * * Returns 0 (TIPC_OK) if everything ok, -TIPC_ERR_NO_PORT otherwise */ -static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf) +static int filter_connect(struct tipc_sock *tsk, struct sk_buff **skb) { struct sock *sk = &tsk->sk; struct net *net = sock_net(sk); struct socket *sock = sk->sk_socket; - struct tipc_msg *msg = buf_msg(*buf); + struct tipc_msg *msg = buf_msg(*skb); int retval = -TIPC_ERR_NO_PORT; if (msg_mcast(msg)) @@ -1623,8 +1639,8 @@ static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf) * connect() routine if sleeping. */ if (msg_data_sz(msg) == 0) { - kfree_skb(*buf); - *buf = NULL; + kfree_skb(*skb); + *skb = NULL; if (waitqueue_active(sk_sleep(sk))) wake_up_interruptible(sk_sleep(sk)); } @@ -1676,32 +1692,33 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf) /** * filter_rcv - validate incoming message * @sk: socket - * @buf: message + * @skb: pointer to message. Set to NULL if buffer is consumed. * * Enqueues message on receive queue if acceptable; optionally handles * disconnect indication for a connected socket. * - * Called with socket lock already taken; port lock may also be taken. + * Called with socket lock already taken * - * Returns 0 (TIPC_OK) if message was consumed, -TIPC error code if message - * to be rejected, 1 (TIPC_FWD_MSG) if (CONN_MANAGER) message to be forwarded + * Returns 0 (TIPC_OK) if message was ok, -TIPC error code if rejected */ -static int filter_rcv(struct sock *sk, struct sk_buff *buf) +static int filter_rcv(struct sock *sk, struct sk_buff **skb) { struct socket *sock = sk->sk_socket; struct tipc_sock *tsk = tipc_sk(sk); - struct tipc_msg *msg = buf_msg(buf); - unsigned int limit = rcvbuf_limit(sk, buf); - u32 onode; + struct tipc_msg *msg = buf_msg(*skb); + unsigned int limit = rcvbuf_limit(sk, *skb); int rc = TIPC_OK; - if (unlikely(msg_user(msg) == CONN_MANAGER)) - return tipc_sk_proto_rcv(tsk, &onode, buf); + if (unlikely(msg_user(msg) == CONN_MANAGER)) { + tipc_sk_proto_rcv(tsk, skb); + return TIPC_OK; + } if (unlikely(msg_user(msg) == SOCK_WAKEUP)) { - kfree_skb(buf); + kfree_skb(*skb); tsk->link_cong = 0; sk->sk_write_space(sk); + *skb = NULL; return TIPC_OK; } @@ -1713,21 +1730,22 @@ static int filter_rcv(struct sock *sk, struct sk_buff *buf) if (msg_connected(msg)) return -TIPC_ERR_NO_PORT; } else { - rc = filter_connect(tsk, &buf); - if (rc != TIPC_OK || buf == NULL) + rc = filter_connect(tsk, skb); + if (rc != TIPC_OK || !*skb) return rc; } /* Reject message if there isn't room to queue it */ - if (sk_rmem_alloc_get(sk) + buf->truesize >= limit) + if (sk_rmem_alloc_get(sk) + (*skb)->truesize >= limit) return -TIPC_ERR_OVERLOAD; /* Enqueue message */ - TIPC_SKB_CB(buf)->handle = NULL; - __skb_queue_tail(&sk->sk_receive_queue, buf); - skb_set_owner_r(buf, sk); + TIPC_SKB_CB(*skb)->handle = NULL; + __skb_queue_tail(&sk->sk_receive_queue, *skb); + skb_set_owner_r(*skb, sk); sk->sk_data_ready(sk); + *skb = NULL; return TIPC_OK; } @@ -1736,79 +1754,126 @@ static int filter_rcv(struct sock *sk, struct sk_buff *buf) * @sk: socket * @skb: message * - * Caller must hold socket lock, but not port lock. + * Caller must hold socket lock * * Returns 0 */ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb) { - int rc; - u32 onode; + int err; + atomic_t *dcnt; + u32 dnode; struct tipc_sock *tsk = tipc_sk(sk); struct net *net = sock_net(sk); uint truesize = skb->truesize; - rc = filter_rcv(sk, skb); - - if (likely(!rc)) { - if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT) - atomic_add(truesize, &tsk->dupl_rcvcnt); + err = filter_rcv(sk, &skb); + if (likely(!skb)) { + dcnt = &tsk->dupl_rcvcnt; + if (atomic_read(dcnt) < TIPC_CONN_OVERLOAD_LIMIT) + atomic_add(truesize, dcnt); return 0; } - - if ((rc < 0) && !tipc_msg_reverse(net, skb, &onode, -rc)) - return 0; - - tipc_link_xmit_skb(net, skb, onode, 0); - + if (!err || tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, -err)) + tipc_link_xmit_skb(net, skb, dnode, tsk->portid); return 0; } /** - * tipc_sk_rcv - handle incoming message - * @skb: buffer containing arriving message - * Consumes buffer - * Returns 0 if success, or errno: -EHOSTUNREACH + * tipc_sk_enqueue - extract all buffers with destination 'dport' from + * inputq and try adding them to socket or backlog queue + * @inputq: list of incoming buffers with potentially different destinations + * @sk: socket where the buffers should be enqueued + * @dport: port number for the socket + * @_skb: returned buffer to be forwarded or rejected, if applicable + * + * Caller must hold socket lock + * + * Returns TIPC_OK if all buffers enqueued, otherwise -TIPC_ERR_OVERLOAD + * or -TIPC_ERR_NO_PORT */ -int tipc_sk_rcv(struct net *net, struct sk_buff *skb) +static int tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk, + u32 dport, struct sk_buff **_skb) { - struct tipc_sock *tsk; - struct sock *sk; - u32 dport = msg_destport(buf_msg(skb)); - int rc = TIPC_OK; - uint limit; - u32 dnode; + unsigned int lim; + atomic_t *dcnt; + int err; + struct sk_buff *skb; + unsigned long time_limit = jiffies + 2; - /* Validate destination and message */ - tsk = tipc_sk_lookup(net, dport); - if (unlikely(!tsk)) { - rc = tipc_msg_eval(net, skb, &dnode); - goto exit; + while (skb_queue_len(inputq)) { + skb = tipc_skb_dequeue(inputq, dport); + if (unlikely(!skb)) + return TIPC_OK; + /* Return if softirq window exhausted */ + if (unlikely(time_after_eq(jiffies, time_limit))) + return TIPC_OK; + if (!sock_owned_by_user(sk)) { + err = filter_rcv(sk, &skb); + if (likely(!skb)) + continue; + *_skb = skb; + return err; + } + dcnt = &tipc_sk(sk)->dupl_rcvcnt; + if (sk->sk_backlog.len) + atomic_set(dcnt, 0); + lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt); + if (likely(!sk_add_backlog(sk, skb, lim))) + continue; + *_skb = skb; + return -TIPC_ERR_OVERLOAD; } - sk = &tsk->sk; + return TIPC_OK; +} - /* Queue message */ - spin_lock_bh(&sk->sk_lock.slock); +/** + * tipc_sk_rcv - handle a chain of incoming buffers + * @inputq: buffer list containing the buffers + * Consumes all buffers in list until inputq is empty + * Note: may be called in multiple threads referring to the same queue + * Returns 0 if last buffer was accepted, otherwise -EHOSTUNREACH + * Only node local calls check the return value, sending single-buffer queues + */ +int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq) +{ + u32 dnode, dport = 0; + int err = -TIPC_ERR_NO_PORT; + struct sk_buff *skb; + struct tipc_sock *tsk; + struct tipc_net *tn; + struct sock *sk; - if (!sock_owned_by_user(sk)) { - rc = filter_rcv(sk, skb); - } else { - if (sk->sk_backlog.len == 0) - atomic_set(&tsk->dupl_rcvcnt, 0); - limit = rcvbuf_limit(sk, skb) + atomic_read(&tsk->dupl_rcvcnt); - if (sk_add_backlog(sk, skb, limit)) - rc = -TIPC_ERR_OVERLOAD; + while (skb_queue_len(inputq)) { + skb = NULL; + dport = tipc_skb_peek_port(inputq, dport); + tsk = tipc_sk_lookup(net, dport); + if (likely(tsk)) { + sk = &tsk->sk; + if (likely(spin_trylock_bh(&sk->sk_lock.slock))) { + err = tipc_sk_enqueue(inputq, sk, dport, &skb); + spin_unlock_bh(&sk->sk_lock.slock); + dport = 0; + } + sock_put(sk); + } else { + skb = tipc_skb_dequeue(inputq, dport); + } + if (likely(!skb)) + continue; + if (tipc_msg_lookup_dest(net, skb, &dnode, &err)) + goto xmit; + if (!err) { + dnode = msg_destnode(buf_msg(skb)); + goto xmit; + } + tn = net_generic(net, tipc_net_id); + if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err)) + continue; +xmit: + tipc_link_xmit_skb(net, skb, dnode, dport); } - spin_unlock_bh(&sk->sk_lock.slock); - sock_put(sk); - if (likely(!rc)) - return 0; -exit: - if ((rc < 0) && !tipc_msg_reverse(net, skb, &dnode, -rc)) - return -EHOSTUNREACH; - - tipc_link_xmit_skb(net, skb, dnode, 0); - return (rc < 0) ? -EHOSTUNREACH : 0; + return err ? -EHOSTUNREACH : 0; } static int tipc_wait_for_connect(struct socket *sock, long *timeo_p) @@ -2065,7 +2130,6 @@ static int tipc_shutdown(struct socket *sock, int how) { struct sock *sk = sock->sk; struct net *net = sock_net(sk); - struct tipc_net *tn = net_generic(net, tipc_net_id); struct tipc_sock *tsk = tipc_sk(sk); struct sk_buff *skb; u32 dnode; @@ -2088,16 +2152,17 @@ restart: kfree_skb(skb); goto restart; } - if (tipc_msg_reverse(net, skb, &dnode, + if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, TIPC_CONN_SHUTDOWN)) tipc_link_xmit_skb(net, skb, dnode, tsk->portid); tipc_node_remove_conn(net, dnode, tsk->portid); } else { dnode = tsk_peer_node(tsk); - skb = tipc_msg_create(net, TIPC_CRITICAL_IMPORTANCE, + + skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, SHORT_H_SIZE, - 0, dnode, tn->own_addr, + 0, dnode, tsk_own_node(tsk), tsk_peer_port(tsk), tsk->portid, TIPC_CONN_SHUTDOWN); tipc_link_xmit_skb(net, skb, dnode, tsk->portid); @@ -2129,10 +2194,9 @@ static void tipc_sk_timeout(unsigned long data) { struct tipc_sock *tsk = (struct tipc_sock *)data; struct sock *sk = &tsk->sk; - struct net *net = sock_net(sk); - struct tipc_net *tn = net_generic(net, tipc_net_id); struct sk_buff *skb = NULL; u32 peer_port, peer_node; + u32 own_node = tsk_own_node(tsk); bh_lock_sock(sk); if (!tsk->connected) { @@ -2144,13 +2208,13 @@ static void tipc_sk_timeout(unsigned long data) if (tsk->probing_state == TIPC_CONN_PROBING) { /* Previous probe not answered -> self abort */ - skb = tipc_msg_create(net, TIPC_CRITICAL_IMPORTANCE, + skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, SHORT_H_SIZE, 0, - tn->own_addr, peer_node, tsk->portid, + own_node, peer_node, tsk->portid, peer_port, TIPC_ERR_NO_PORT); } else { - skb = tipc_msg_create(net, CONN_MANAGER, CONN_PROBE, INT_H_SIZE, - 0, peer_node, tn->own_addr, + skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE, + INT_H_SIZE, 0, peer_node, own_node, peer_port, tsk->portid, TIPC_OK); tsk->probing_state = TIPC_CONN_PROBING; sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv); |