diff options
Diffstat (limited to 'net/smc/smc_cdc.c')
-rw-r--r-- | net/smc/smc_cdc.c | 180 |
1 files changed, 132 insertions, 48 deletions
diff --git a/net/smc/smc_cdc.c b/net/smc/smc_cdc.c index 164f1584861b..53f63bfbaf5f 100644 --- a/net/smc/smc_cdc.c +++ b/net/smc/smc_cdc.c @@ -31,10 +31,6 @@ static void smc_cdc_tx_handler(struct smc_wr_tx_pend_priv *pnd_snd, struct smc_sock *smc; int diff; - if (!conn) - /* already dismissed */ - return; - smc = container_of(conn, struct smc_sock, conn); bh_lock_sock(&smc->sk); if (!wc_status) { @@ -47,25 +43,48 @@ static void smc_cdc_tx_handler(struct smc_wr_tx_pend_priv *pnd_snd, /* guarantee 0 <= sndbuf_space <= sndbuf_desc->len */ smp_mb__after_atomic(); smc_curs_copy(&conn->tx_curs_fin, &cdcpend->cursor, conn); + smc_curs_copy(&conn->local_tx_ctrl_fin, &cdcpend->p_cursor, + conn); + conn->tx_cdc_seq_fin = cdcpend->ctrl_seq; } + + if (atomic_dec_and_test(&conn->cdc_pend_tx_wr)) { + /* If user owns the sock_lock, mark the connection need sending. + * User context will later try to send when it release sock_lock + * in smc_release_cb() + */ + if (sock_owned_by_user(&smc->sk)) + conn->tx_in_release_sock = true; + else + smc_tx_pending(conn); + + if (unlikely(wq_has_sleeper(&conn->cdc_pend_tx_wq))) + wake_up(&conn->cdc_pend_tx_wq); + } + WARN_ON(atomic_read(&conn->cdc_pend_tx_wr) < 0); + smc_tx_sndbuf_nonfull(smc); bh_unlock_sock(&smc->sk); } int smc_cdc_get_free_slot(struct smc_connection *conn, + struct smc_link *link, struct smc_wr_buf **wr_buf, struct smc_rdma_wr **wr_rdma_buf, struct smc_cdc_tx_pend **pend) { - struct smc_link *link = &conn->lgr->lnk[SMC_SINGLE_LINK]; int rc; rc = smc_wr_tx_get_free_slot(link, smc_cdc_tx_handler, wr_buf, wr_rdma_buf, (struct smc_wr_tx_pend_priv **)pend); - if (conn->killed) + if (conn->killed) { /* abnormal termination */ + if (!rc) + smc_wr_tx_put_slot(link, + (struct smc_wr_tx_pend_priv *)(*pend)); rc = -EPIPE; + } return rc; } @@ -91,39 +110,96 @@ int smc_cdc_msg_send(struct smc_connection *conn, struct smc_wr_buf *wr_buf, struct smc_cdc_tx_pend *pend) { + struct smc_link *link = conn->lnk; union smc_host_cursor cfed; - struct smc_link *link; int rc; - link = &conn->lgr->lnk[SMC_SINGLE_LINK]; - smc_cdc_add_pending_send(conn, pend); conn->tx_cdc_seq++; conn->local_tx_ctrl.seqno = conn->tx_cdc_seq; smc_host_msg_to_cdc((struct smc_cdc_msg *)wr_buf, conn, &cfed); + + atomic_inc(&conn->cdc_pend_tx_wr); + smp_mb__after_atomic(); /* Make sure cdc_pend_tx_wr added before post */ + rc = smc_wr_tx_send(link, (struct smc_wr_tx_pend_priv *)pend); if (!rc) { smc_curs_copy(&conn->rx_curs_confirmed, &cfed, conn); conn->local_rx_ctrl.prod_flags.cons_curs_upd_req = 0; + } else { + conn->tx_cdc_seq--; + conn->local_tx_ctrl.seqno = conn->tx_cdc_seq; + atomic_dec(&conn->cdc_pend_tx_wr); } return rc; } +/* send a validation msg indicating the move of a conn to an other QP link */ +int smcr_cdc_msg_send_validation(struct smc_connection *conn, + struct smc_cdc_tx_pend *pend, + struct smc_wr_buf *wr_buf) +{ + struct smc_host_cdc_msg *local = &conn->local_tx_ctrl; + struct smc_link *link = conn->lnk; + struct smc_cdc_msg *peer; + int rc; + + peer = (struct smc_cdc_msg *)wr_buf; + peer->common.type = local->common.type; + peer->len = local->len; + peer->seqno = htons(conn->tx_cdc_seq_fin); /* seqno last compl. tx */ + peer->token = htonl(local->token); + peer->prod_flags.failover_validation = 1; + + /* We need to set pend->conn here to make sure smc_cdc_tx_handler() + * can handle properly + */ + smc_cdc_add_pending_send(conn, pend); + + atomic_inc(&conn->cdc_pend_tx_wr); + smp_mb__after_atomic(); /* Make sure cdc_pend_tx_wr added before post */ + + rc = smc_wr_tx_send(link, (struct smc_wr_tx_pend_priv *)pend); + if (unlikely(rc)) + atomic_dec(&conn->cdc_pend_tx_wr); + + return rc; +} + static int smcr_cdc_get_slot_and_msg_send(struct smc_connection *conn) { struct smc_cdc_tx_pend *pend; struct smc_wr_buf *wr_buf; + struct smc_link *link; + bool again = false; int rc; - rc = smc_cdc_get_free_slot(conn, &wr_buf, NULL, &pend); +again: + link = conn->lnk; + if (!smc_wr_tx_link_hold(link)) + return -ENOLINK; + rc = smc_cdc_get_free_slot(conn, link, &wr_buf, NULL, &pend); if (rc) - return rc; + goto put_out; spin_lock_bh(&conn->send_lock); + if (link != conn->lnk) { + /* link of connection changed, try again one time*/ + spin_unlock_bh(&conn->send_lock); + smc_wr_tx_put_slot(link, + (struct smc_wr_tx_pend_priv *)pend); + smc_wr_tx_link_put(link); + if (again) + return -ENOLINK; + again = true; + goto again; + } rc = smc_cdc_msg_send(conn, wr_buf, pend); spin_unlock_bh(&conn->send_lock); +put_out: + smc_wr_tx_link_put(link); return rc; } @@ -131,7 +207,8 @@ int smc_cdc_get_slot_and_msg_send(struct smc_connection *conn) { int rc; - if (!conn->lgr || (conn->lgr->is_smcd && conn->lgr->peer_shutdown)) + if (!smc_conn_lgr_valid(conn) || + (conn->lgr->is_smcd && conn->lgr->peer_shutdown)) return -EPIPE; if (conn->lgr->is_smcd) { @@ -145,31 +222,9 @@ int smc_cdc_get_slot_and_msg_send(struct smc_connection *conn) return rc; } -static bool smc_cdc_tx_filter(struct smc_wr_tx_pend_priv *tx_pend, - unsigned long data) +void smc_cdc_wait_pend_tx_wr(struct smc_connection *conn) { - struct smc_connection *conn = (struct smc_connection *)data; - struct smc_cdc_tx_pend *cdc_pend = - (struct smc_cdc_tx_pend *)tx_pend; - - return cdc_pend->conn == conn; -} - -static void smc_cdc_tx_dismisser(struct smc_wr_tx_pend_priv *tx_pend) -{ - struct smc_cdc_tx_pend *cdc_pend = - (struct smc_cdc_tx_pend *)tx_pend; - - cdc_pend->conn = NULL; -} - -void smc_cdc_tx_dismiss_slots(struct smc_connection *conn) -{ - struct smc_link *link = &conn->lgr->lnk[SMC_SINGLE_LINK]; - - smc_wr_tx_dismiss_slots(link, SMC_CDC_MSG_TYPE, - smc_cdc_tx_filter, smc_cdc_tx_dismisser, - (unsigned long)conn); + wait_event(conn->cdc_pend_tx_wq, !atomic_read(&conn->cdc_pend_tx_wr)); } /* Send a SMC-D CDC header. @@ -239,6 +294,28 @@ static void smc_cdc_handle_urg_data_arrival(struct smc_sock *smc, sk_send_sigurg(&smc->sk); } +static void smc_cdc_msg_validate(struct smc_sock *smc, struct smc_cdc_msg *cdc, + struct smc_link *link) +{ + struct smc_connection *conn = &smc->conn; + u16 recv_seq = ntohs(cdc->seqno); + s16 diff; + + /* check that seqnum was seen before */ + diff = conn->local_rx_ctrl.seqno - recv_seq; + if (diff < 0) { /* diff larger than 0x7fff */ + /* drop connection */ + conn->out_of_sync = 1; /* prevent any further receives */ + spin_lock_bh(&conn->send_lock); + conn->local_tx_ctrl.conn_state_flags.peer_conn_abort = 1; + conn->lnk = link; + spin_unlock_bh(&conn->send_lock); + sock_hold(&smc->sk); /* sock_put in abort_work */ + if (!queue_work(smc_close_wq, &conn->abort_work)) + sock_put(&smc->sk); + } +} + static void smc_cdc_msg_recv_action(struct smc_sock *smc, struct smc_cdc_msg *cdc) { @@ -283,8 +360,12 @@ static void smc_cdc_msg_recv_action(struct smc_sock *smc, /* trigger sndbuf consumer: RDMA write into peer RMBE and CDC */ if ((diff_cons && smc_tx_prepared_sends(conn)) || conn->local_rx_ctrl.prod_flags.cons_curs_upd_req || - conn->local_rx_ctrl.prod_flags.urg_data_pending) - smc_tx_sndbuf_nonempty(conn); + conn->local_rx_ctrl.prod_flags.urg_data_pending) { + if (!sock_owned_by_user(&smc->sk)) + smc_tx_pending(conn); + else + conn->tx_in_release_sock = true; + } if (diff_cons && conn->urg_tx_pend && atomic_read(&conn->peer_rmbe_space) == conn->peer_rmbe_size) { @@ -303,7 +384,7 @@ static void smc_cdc_msg_recv_action(struct smc_sock *smc, smc->clcsock->sk->sk_shutdown |= RCV_SHUTDOWN; sock_set_flag(&smc->sk, SOCK_DONE); sock_hold(&smc->sk); /* sock_put in close_work */ - if (!schedule_work(&conn->close_work)) + if (!queue_work(smc_close_wq, &conn->close_work)) sock_put(&smc->sk); } } @@ -324,9 +405,9 @@ static void smc_cdc_msg_recv(struct smc_sock *smc, struct smc_cdc_msg *cdc) * Context: * - tasklet context */ -static void smcd_cdc_rx_tsklet(unsigned long data) +static void smcd_cdc_rx_tsklet(struct tasklet_struct *t) { - struct smc_connection *conn = (struct smc_connection *)data; + struct smc_connection *conn = from_tasklet(conn, t, rx_tsklet); struct smcd_cdc_msg *data_cdc; struct smcd_cdc_msg cdc; struct smc_sock *smc; @@ -346,7 +427,7 @@ static void smcd_cdc_rx_tsklet(unsigned long data) */ void smcd_cdc_rx_init(struct smc_connection *conn) { - tasklet_init(&conn->rx_tsklet, smcd_cdc_rx_tsklet, (unsigned long)conn); + tasklet_setup(&conn->rx_tsklet, smcd_cdc_rx_tsklet); } /***************************** init, exit, misc ******************************/ @@ -369,16 +450,19 @@ static void smc_cdc_rx_handler(struct ib_wc *wc, void *buf) read_lock_bh(&lgr->conns_lock); conn = smc_lgr_find_conn(ntohl(cdc->token), lgr); read_unlock_bh(&lgr->conns_lock); - if (!conn) + if (!conn || conn->out_of_sync) return; smc = container_of(conn, struct smc_sock, conn); - if (!cdc->prod_flags.failover_validation) { - if (smc_cdc_before(ntohs(cdc->seqno), - conn->local_rx_ctrl.seqno)) - /* received seqno is old */ - return; + if (cdc->prod_flags.failover_validation) { + smc_cdc_msg_validate(smc, cdc, link); + return; } + if (smc_cdc_before(ntohs(cdc->seqno), + conn->local_rx_ctrl.seqno)) + /* received seqno is old */ + return; + smc_cdc_msg_recv(smc, cdc); } |