aboutsummaryrefslogtreecommitdiffstats
path: root/net/sunrpc/xprtrdma/verbs.c
diff options
context:
space:
mode:
authorChuck Lever <chuck.lever@oracle.com>2018-12-19 10:58:29 -0500
committerAnna Schumaker <Anna.Schumaker@Netapp.com>2019-01-02 12:05:16 -0500
commit6d2d0ee27c7a12371a0ca51a5db414204901228c (patch)
tree4dcab040d2dec42c080a0f4cdfb59fac8bb5e1d4 /net/sunrpc/xprtrdma/verbs.c
parentxprtrdma: Refactor Receive accounting (diff)
downloadlinux-dev-6d2d0ee27c7a12371a0ca51a5db414204901228c.tar.xz
linux-dev-6d2d0ee27c7a12371a0ca51a5db414204901228c.zip
xprtrdma: Replace rpcrdma_receive_wq with a per-xprt workqueue
To address a connection-close ordering problem, we need the ability to drain the RPC completions running on rpcrdma_receive_wq for just one transport. Give each transport its own RPC completion workqueue, and drain that workqueue when disconnecting the transport. Signed-off-by: Chuck Lever <chuck.lever@oracle.com> Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
Diffstat (limited to 'net/sunrpc/xprtrdma/verbs.c')
-rw-r--r--net/sunrpc/xprtrdma/verbs.c67
1 files changed, 36 insertions, 31 deletions
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index e4461e7c1b0c..cff3a5df0b90 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -80,33 +80,23 @@ static int rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt, bool temp);
static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb);
static void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
-struct workqueue_struct *rpcrdma_receive_wq __read_mostly;
-
-int
-rpcrdma_alloc_wq(void)
+/* Wait for outstanding transport work to finish.
+ */
+static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
{
- struct workqueue_struct *recv_wq;
-
- recv_wq = alloc_workqueue("xprtrdma_receive",
- WQ_MEM_RECLAIM | WQ_HIGHPRI,
- 0);
- if (!recv_wq)
- return -ENOMEM;
-
- rpcrdma_receive_wq = recv_wq;
- return 0;
-}
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
-void
-rpcrdma_destroy_wq(void)
-{
- struct workqueue_struct *wq;
+ /* Flush Receives, then wait for deferred Reply work
+ * to complete.
+ */
+ ib_drain_qp(ia->ri_id->qp);
+ drain_workqueue(buf->rb_completion_wq);
- if (rpcrdma_receive_wq) {
- wq = rpcrdma_receive_wq;
- rpcrdma_receive_wq = NULL;
- destroy_workqueue(wq);
- }
+ /* Deferred Reply processing might have scheduled
+ * local invalidations.
+ */
+ ib_drain_sq(ia->ri_id->qp);
}
/**
@@ -483,7 +473,7 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
* connection is already gone.
*/
if (ia->ri_id->qp) {
- ib_drain_qp(ia->ri_id->qp);
+ rpcrdma_xprt_drain(r_xprt);
rdma_destroy_qp(ia->ri_id);
ia->ri_id->qp = NULL;
}
@@ -825,8 +815,10 @@ out_noupdate:
return rc;
}
-/*
- * rpcrdma_ep_disconnect
+/**
+ * rpcrdma_ep_disconnect - Disconnect underlying transport
+ * @ep: endpoint to disconnect
+ * @ia: associated interface adapter
*
* This is separate from destroy to facilitate the ability
* to reconnect without recreating the endpoint.
@@ -837,19 +829,20 @@ out_noupdate:
void
rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
+ struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt,
+ rx_ep);
int rc;
+ /* returns without wait if ID is not connected */
rc = rdma_disconnect(ia->ri_id);
if (!rc)
- /* returns without wait if not connected */
wait_event_interruptible(ep->rep_connect_wait,
ep->rep_connected != 1);
else
ep->rep_connected = rc;
- trace_xprtrdma_disconnect(container_of(ep, struct rpcrdma_xprt,
- rx_ep), rc);
+ trace_xprtrdma_disconnect(r_xprt, rc);
- ib_drain_qp(ia->ri_id->qp);
+ rpcrdma_xprt_drain(r_xprt);
}
/* Fixed-size circular FIFO queue. This implementation is wait-free and
@@ -1183,6 +1176,13 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
if (rc)
goto out;
+ buf->rb_completion_wq = alloc_workqueue("rpcrdma-%s",
+ WQ_MEM_RECLAIM | WQ_HIGHPRI,
+ 0,
+ r_xprt->rx_xprt.address_strings[RPC_DISPLAY_ADDR]);
+ if (!buf->rb_completion_wq)
+ goto out;
+
return 0;
out:
rpcrdma_buffer_destroy(buf);
@@ -1241,6 +1241,11 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
cancel_delayed_work_sync(&buf->rb_refresh_worker);
+ if (buf->rb_completion_wq) {
+ destroy_workqueue(buf->rb_completion_wq);
+ buf->rb_completion_wq = NULL;
+ }
+
rpcrdma_sendctxs_destroy(buf);
while (!list_empty(&buf->rb_recv_bufs)) {