diff options
Diffstat (limited to 'drivers/staging/lustre/lustre/ptlrpc/client.c')
-rw-r--r-- | drivers/staging/lustre/lustre/ptlrpc/client.c | 72 |
1 files changed, 45 insertions, 27 deletions
diff --git a/drivers/staging/lustre/lustre/ptlrpc/client.c b/drivers/staging/lustre/lustre/ptlrpc/client.c index afdc0df60154..d2f4cd5a9a9d 100644 --- a/drivers/staging/lustre/lustre/ptlrpc/client.c +++ b/drivers/staging/lustre/lustre/ptlrpc/client.c @@ -1304,13 +1304,6 @@ static int after_reply(struct ptlrpc_request *req) spin_unlock(&req->rq_lock); req->rq_nr_resend++; - /* allocate new xid to avoid reply reconstruction */ - if (!req->rq_bulk) { - /* new xid is already allocated for bulk in ptlrpc_check_set() */ - req->rq_xid = ptlrpc_next_xid(); - DEBUG_REQ(D_RPCTRACE, req, "Allocating new xid for resend on EINPROGRESS"); - } - /* Readjust the timeout for current conditions */ ptlrpc_at_set_req_timeout(req); /* @@ -1802,18 +1795,9 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) spin_lock(&req->rq_lock); req->rq_resend = 1; spin_unlock(&req->rq_lock); - if (req->rq_bulk) { - __u64 old_xid; - - if (!ptlrpc_unregister_bulk(req, 1)) - continue; - - /* ensure previous bulk fails */ - old_xid = req->rq_xid; - req->rq_xid = ptlrpc_next_xid(); - CDEBUG(D_HA, "resend bulk old x%llu new x%llu\n", - old_xid, req->rq_xid); - } + if (req->rq_bulk && + !ptlrpc_unregister_bulk(req, 1)) + continue; } /* * rq_wait_ctx is only touched by ptlrpcd, @@ -2664,14 +2648,6 @@ void ptlrpc_resend_req(struct ptlrpc_request *req) req->rq_resend = 1; req->rq_net_err = 0; req->rq_timedout = 0; - if (req->rq_bulk) { - __u64 old_xid = req->rq_xid; - - /* ensure previous bulk fails */ - req->rq_xid = ptlrpc_next_xid(); - CDEBUG(D_HA, "resend bulk old x%llu new x%llu\n", - old_xid, req->rq_xid); - } ptlrpc_client_wake_req(req); spin_unlock(&req->rq_lock); } @@ -3072,6 +3048,48 @@ __u64 ptlrpc_next_xid(void) } /** + * If request has a new allocated XID (new request or EINPROGRESS resend), + * use this XID as matchbits of bulk, otherwise allocate a new matchbits for + * request to ensure previous bulk fails and avoid problems with lost replies + * and therefore several transfers landing into the same buffer from different + * sending attempts. + */ +void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req) +{ + struct ptlrpc_bulk_desc *bd = req->rq_bulk; + + LASSERT(bd); + + if (!req->rq_resend || req->rq_nr_resend) { + /* this request has a new xid, just use it as bulk matchbits */ + req->rq_mbits = req->rq_xid; + + } else { /* needs to generate a new matchbits for resend */ + u64 old_mbits = req->rq_mbits; + + if ((bd->bd_import->imp_connect_data.ocd_connect_flags & + OBD_CONNECT_BULK_MBITS)) { + req->rq_mbits = ptlrpc_next_xid(); + } else { + /* old version transfers rq_xid to peer as matchbits */ + req->rq_mbits = ptlrpc_next_xid(); + req->rq_xid = req->rq_mbits; + } + + CDEBUG(D_HA, "resend bulk old x%llu new x%llu\n", + old_mbits, req->rq_mbits); + } + + /* + * For multi-bulk RPCs, rq_mbits is the last mbits needed for bulks so + * that server can infer the number of bulks that were prepared, + * see LU-1431 + */ + req->rq_mbits += ((bd->bd_iov_count + LNET_MAX_IOV - 1) / + LNET_MAX_IOV) - 1; +} + +/** * Get a glimpse at what next xid value might have been. * Returns possible next xid. */ |