aboutsummaryrefslogtreecommitdiffstats
path: root/drivers/staging/lustre/lustre/ptlrpc/client.c
diff options
context:
space:
mode:
Diffstat (limited to 'drivers/staging/lustre/lustre/ptlrpc/client.c')
-rw-r--r--drivers/staging/lustre/lustre/ptlrpc/client.c72
1 files changed, 45 insertions, 27 deletions
diff --git a/drivers/staging/lustre/lustre/ptlrpc/client.c b/drivers/staging/lustre/lustre/ptlrpc/client.c
index afdc0df60154..d2f4cd5a9a9d 100644
--- a/drivers/staging/lustre/lustre/ptlrpc/client.c
+++ b/drivers/staging/lustre/lustre/ptlrpc/client.c
@@ -1304,13 +1304,6 @@ static int after_reply(struct ptlrpc_request *req)
spin_unlock(&req->rq_lock);
req->rq_nr_resend++;
- /* allocate new xid to avoid reply reconstruction */
- if (!req->rq_bulk) {
- /* new xid is already allocated for bulk in ptlrpc_check_set() */
- req->rq_xid = ptlrpc_next_xid();
- DEBUG_REQ(D_RPCTRACE, req, "Allocating new xid for resend on EINPROGRESS");
- }
-
/* Readjust the timeout for current conditions */
ptlrpc_at_set_req_timeout(req);
/*
@@ -1802,18 +1795,9 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
spin_lock(&req->rq_lock);
req->rq_resend = 1;
spin_unlock(&req->rq_lock);
- if (req->rq_bulk) {
- __u64 old_xid;
-
- if (!ptlrpc_unregister_bulk(req, 1))
- continue;
-
- /* ensure previous bulk fails */
- old_xid = req->rq_xid;
- req->rq_xid = ptlrpc_next_xid();
- CDEBUG(D_HA, "resend bulk old x%llu new x%llu\n",
- old_xid, req->rq_xid);
- }
+ if (req->rq_bulk &&
+ !ptlrpc_unregister_bulk(req, 1))
+ continue;
}
/*
* rq_wait_ctx is only touched by ptlrpcd,
@@ -2664,14 +2648,6 @@ void ptlrpc_resend_req(struct ptlrpc_request *req)
req->rq_resend = 1;
req->rq_net_err = 0;
req->rq_timedout = 0;
- if (req->rq_bulk) {
- __u64 old_xid = req->rq_xid;
-
- /* ensure previous bulk fails */
- req->rq_xid = ptlrpc_next_xid();
- CDEBUG(D_HA, "resend bulk old x%llu new x%llu\n",
- old_xid, req->rq_xid);
- }
ptlrpc_client_wake_req(req);
spin_unlock(&req->rq_lock);
}
@@ -3072,6 +3048,48 @@ __u64 ptlrpc_next_xid(void)
}
/**
+ * If request has a new allocated XID (new request or EINPROGRESS resend),
+ * use this XID as matchbits of bulk, otherwise allocate a new matchbits for
+ * request to ensure previous bulk fails and avoid problems with lost replies
+ * and therefore several transfers landing into the same buffer from different
+ * sending attempts.
+ */
+void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
+{
+ struct ptlrpc_bulk_desc *bd = req->rq_bulk;
+
+ LASSERT(bd);
+
+ if (!req->rq_resend || req->rq_nr_resend) {
+ /* this request has a new xid, just use it as bulk matchbits */
+ req->rq_mbits = req->rq_xid;
+
+ } else { /* needs to generate a new matchbits for resend */
+ u64 old_mbits = req->rq_mbits;
+
+ if ((bd->bd_import->imp_connect_data.ocd_connect_flags &
+ OBD_CONNECT_BULK_MBITS)) {
+ req->rq_mbits = ptlrpc_next_xid();
+ } else {
+ /* old version transfers rq_xid to peer as matchbits */
+ req->rq_mbits = ptlrpc_next_xid();
+ req->rq_xid = req->rq_mbits;
+ }
+
+ CDEBUG(D_HA, "resend bulk old x%llu new x%llu\n",
+ old_mbits, req->rq_mbits);
+ }
+
+ /*
+ * For multi-bulk RPCs, rq_mbits is the last mbits needed for bulks so
+ * that server can infer the number of bulks that were prepared,
+ * see LU-1431
+ */
+ req->rq_mbits += ((bd->bd_iov_count + LNET_MAX_IOV - 1) /
+ LNET_MAX_IOV) - 1;
+}
+
+/**
* Get a glimpse at what next xid value might have been.
* Returns possible next xid.
*/