aboutsummaryrefslogtreecommitdiffstats
path: root/net/sunrpc/xprtrdma/rpc_rdma.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/xprtrdma/rpc_rdma.c')
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c302
1 files changed, 147 insertions, 155 deletions
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 28020ec104d4..190a4de239c8 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -1,6 +1,6 @@
// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
/*
- * Copyright (c) 2014-2017 Oracle. All rights reserved.
+ * Copyright (c) 2014-2020, Oracle and/or its affiliates.
* Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
*
* This software is available to you under a choice of one of two
@@ -54,10 +54,6 @@
#include "xprt_rdma.h"
#include <trace/events/rpcrdma.h>
-#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
-# define RPCDBG_FACILITY RPCDBG_TRANS
-#endif
-
/* Returns size of largest RPC-over-RDMA header in a Call message
*
* The largest Call header contains a full-size Read list and a
@@ -71,7 +67,7 @@ static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs)
size = RPCRDMA_HDRLEN_MIN;
/* Maximum Read list size */
- size = maxsegs * rpcrdma_readchunk_maxsz * sizeof(__be32);
+ size += maxsegs * rpcrdma_readchunk_maxsz * sizeof(__be32);
/* Minimal Read chunk size */
size += sizeof(__be32); /* segment count */
@@ -94,7 +90,7 @@ static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs)
size = RPCRDMA_HDRLEN_MIN;
/* Maximum Write list size */
- size = sizeof(__be32); /* segment count */
+ size += sizeof(__be32); /* segment count */
size += maxsegs * rpcrdma_segment_maxsz * sizeof(__be32);
size += sizeof(__be32); /* list discriminator */
@@ -103,21 +99,20 @@ static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs)
/**
* rpcrdma_set_max_header_sizes - Initialize inline payload sizes
- * @r_xprt: transport instance to initialize
+ * @ep: endpoint to initialize
*
* The max_inline fields contain the maximum size of an RPC message
* so the marshaling code doesn't have to repeat this calculation
* for every RPC.
*/
-void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *r_xprt)
+void rpcrdma_set_max_header_sizes(struct rpcrdma_ep *ep)
{
- unsigned int maxsegs = r_xprt->rx_ia.ri_max_rdma_segs;
- struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+ unsigned int maxsegs = ep->re_max_rdma_segs;
- ep->rep_max_inline_send =
- ep->rep_inline_send - rpcrdma_max_call_header_size(maxsegs);
- ep->rep_max_inline_recv =
- ep->rep_inline_recv - rpcrdma_max_reply_header_size(maxsegs);
+ ep->re_max_inline_send =
+ ep->re_inline_send - rpcrdma_max_call_header_size(maxsegs);
+ ep->re_max_inline_recv =
+ ep->re_inline_recv - rpcrdma_max_reply_header_size(maxsegs);
}
/* The client can send a request inline as long as the RPCRDMA header
@@ -132,9 +127,10 @@ static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt,
struct rpc_rqst *rqst)
{
struct xdr_buf *xdr = &rqst->rq_snd_buf;
+ struct rpcrdma_ep *ep = r_xprt->rx_ep;
unsigned int count, remaining, offset;
- if (xdr->len > r_xprt->rx_ep.rep_max_inline_send)
+ if (xdr->len > ep->re_max_inline_send)
return false;
if (xdr->page_len) {
@@ -145,7 +141,7 @@ static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt,
remaining -= min_t(unsigned int,
PAGE_SIZE - offset, remaining);
offset = 0;
- if (++count > r_xprt->rx_ep.rep_attr.cap.max_send_sge)
+ if (++count > ep->re_attr.cap.max_send_sge)
return false;
}
}
@@ -162,7 +158,7 @@ static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt,
static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
struct rpc_rqst *rqst)
{
- return rqst->rq_rcv_buf.buflen <= r_xprt->rx_ep.rep_max_inline_recv;
+ return rqst->rq_rcv_buf.buflen <= r_xprt->rx_ep->re_max_inline_recv;
}
/* The client is required to provide a Reply chunk if the maximum
@@ -176,12 +172,35 @@ rpcrdma_nonpayload_inline(const struct rpcrdma_xprt *r_xprt,
const struct xdr_buf *buf = &rqst->rq_rcv_buf;
return (buf->head[0].iov_len + buf->tail[0].iov_len) <
- r_xprt->rx_ep.rep_max_inline_recv;
+ r_xprt->rx_ep->re_max_inline_recv;
+}
+
+/* ACL likes to be lazy in allocating pages. For TCP, these
+ * pages can be allocated during receive processing. Not true
+ * for RDMA, which must always provision receive buffers
+ * up front.
+ */
+static noinline int
+rpcrdma_alloc_sparse_pages(struct xdr_buf *buf)
+{
+ struct page **ppages;
+ int len;
+
+ len = buf->page_len;
+ ppages = buf->pages + (buf->page_base >> PAGE_SHIFT);
+ while (len > 0) {
+ if (!*ppages)
+ *ppages = alloc_page(GFP_NOWAIT | __GFP_NOWARN);
+ if (!*ppages)
+ return -ENOBUFS;
+ ppages++;
+ len -= PAGE_SIZE;
+ }
+
+ return 0;
}
-/* Split @vec on page boundaries into SGEs. FMR registers pages, not
- * a byte range. Other modes coalesce these SGEs into a single MR
- * when they can.
+/* Convert @vec to a single SGL element.
*
* Returns pointer to next available SGE, and bumps the total number
* of SGEs consumed.
@@ -190,22 +209,11 @@ static struct rpcrdma_mr_seg *
rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg,
unsigned int *n)
{
- u32 remaining, page_offset;
- char *base;
-
- base = vec->iov_base;
- page_offset = offset_in_page(base);
- remaining = vec->iov_len;
- while (remaining) {
- seg->mr_page = NULL;
- seg->mr_offset = base;
- seg->mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining);
- remaining -= seg->mr_len;
- base += seg->mr_len;
- ++seg;
- ++(*n);
- page_offset = 0;
- }
+ seg->mr_page = virt_to_page(vec->iov_base);
+ seg->mr_offset = offset_in_page(vec->iov_base);
+ seg->mr_len = vec->iov_len;
+ ++seg;
+ ++(*n);
return seg;
}
@@ -233,17 +241,8 @@ rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf,
ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT);
page_base = offset_in_page(xdrbuf->page_base);
while (len) {
- /* ACL likes to be lazy in allocating pages - ACLs
- * are small by default but can get huge.
- */
- if (unlikely(xdrbuf->flags & XDRBUF_SPARSE_PAGES)) {
- if (!*ppages)
- *ppages = alloc_page(GFP_NOWAIT | __GFP_NOWARN);
- if (!*ppages)
- return -ENOBUFS;
- }
seg->mr_page = *ppages;
- seg->mr_offset = (char *)page_base;
+ seg->mr_offset = page_base;
seg->mr_len = min_t(u32, PAGE_SIZE - page_base, len);
len -= seg->mr_len;
++ppages;
@@ -252,22 +251,11 @@ rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf,
page_base = 0;
}
- /* When encoding a Read chunk, the tail iovec contains an
- * XDR pad and may be omitted.
- */
- if (type == rpcrdma_readch && r_xprt->rx_ia.ri_implicit_roundup)
- goto out;
-
- /* When encoding a Write chunk, some servers need to see an
- * extra segment for non-XDR-aligned Write chunks. The upper
- * layer provides space in the tail iovec that may be used
- * for this purpose.
- */
- if (type == rpcrdma_writech && r_xprt->rx_ia.ri_implicit_roundup)
+ if (type == rpcrdma_readch || type == rpcrdma_writech)
goto out;
if (xdrbuf->tail[0].iov_len)
- seg = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, &n);
+ rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, &n);
out:
if (unlikely(n > RPCRDMA_MAX_SEGS))
@@ -275,40 +263,6 @@ out:
return n;
}
-static inline int
-encode_item_present(struct xdr_stream *xdr)
-{
- __be32 *p;
-
- p = xdr_reserve_space(xdr, sizeof(*p));
- if (unlikely(!p))
- return -EMSGSIZE;
-
- *p = xdr_one;
- return 0;
-}
-
-static inline int
-encode_item_not_present(struct xdr_stream *xdr)
-{
- __be32 *p;
-
- p = xdr_reserve_space(xdr, sizeof(*p));
- if (unlikely(!p))
- return -EMSGSIZE;
-
- *p = xdr_zero;
- return 0;
-}
-
-static void
-xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mr *mr)
-{
- *iptr++ = cpu_to_be32(mr->mr_handle);
- *iptr++ = cpu_to_be32(mr->mr_length);
- xdr_encode_hyper(iptr, mr->mr_offset);
-}
-
static int
encode_rdma_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr)
{
@@ -318,7 +272,7 @@ encode_rdma_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr)
if (unlikely(!p))
return -EMSGSIZE;
- xdr_encode_rdma_segment(p, mr);
+ xdr_encode_rdma_segment(p, mr->mr_handle, mr->mr_length, mr->mr_offset);
return 0;
}
@@ -333,8 +287,8 @@ encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr,
return -EMSGSIZE;
*p++ = xdr_one; /* Item present */
- *p++ = cpu_to_be32(position);
- xdr_encode_rdma_segment(p, mr);
+ xdr_encode_read_segment(p, position, mr->mr_handle, mr->mr_length,
+ mr->mr_offset);
return 0;
}
@@ -349,7 +303,6 @@ static struct rpcrdma_mr_seg *rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt,
*mr = rpcrdma_mr_get(r_xprt);
if (!*mr)
goto out_getmr_err;
- trace_xprtrdma_mr_get(req);
(*mr)->mr_req = req;
}
@@ -357,7 +310,7 @@ static struct rpcrdma_mr_seg *rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt,
return frwr_map(r_xprt, seg, nsegs, writing, req->rl_slot.rq_xid, *mr);
out_getmr_err:
- trace_xprtrdma_nomrs(req);
+ trace_xprtrdma_nomrs_err(r_xprt, req);
xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
rpcrdma_mrs_refresh(r_xprt);
return ERR_PTR(-EAGAIN);
@@ -414,7 +367,9 @@ static int rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
} while (nsegs);
done:
- return encode_item_not_present(xdr);
+ if (xdr_stream_encode_item_absent(xdr) < 0)
+ return -EMSGSIZE;
+ return 0;
}
/* Register and XDR encode the Write list. Supports encoding a list
@@ -438,6 +393,7 @@ static int rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt,
enum rpcrdma_chunktype wtype)
{
struct xdr_stream *xdr = &req->rl_stream;
+ struct rpcrdma_ep *ep = r_xprt->rx_ep;
struct rpcrdma_mr_seg *seg;
struct rpcrdma_mr *mr;
int nsegs, nchunks;
@@ -453,7 +409,7 @@ static int rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt,
if (nsegs < 0)
return nsegs;
- if (encode_item_present(xdr) < 0)
+ if (xdr_stream_encode_item_present(xdr) < 0)
return -EMSGSIZE;
segcount = xdr_reserve_space(xdr, sizeof(*segcount));
if (unlikely(!segcount))
@@ -476,11 +432,25 @@ static int rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt,
nsegs -= mr->mr_nents;
} while (nsegs);
+ if (xdr_pad_size(rqst->rq_rcv_buf.page_len)) {
+ if (encode_rdma_segment(xdr, ep->re_write_pad_mr) < 0)
+ return -EMSGSIZE;
+
+ trace_xprtrdma_chunk_wp(rqst->rq_task, ep->re_write_pad_mr,
+ nsegs);
+ r_xprt->rx_stats.write_chunk_count++;
+ r_xprt->rx_stats.total_rdma_request += mr->mr_length;
+ nchunks++;
+ nsegs -= mr->mr_nents;
+ }
+
/* Update count of segments in this Write chunk */
*segcount = cpu_to_be32(nchunks);
done:
- return encode_item_not_present(xdr);
+ if (xdr_stream_encode_item_absent(xdr) < 0)
+ return -EMSGSIZE;
+ return 0;
}
/* Register and XDR encode the Reply chunk. Supports encoding an array
@@ -506,15 +476,18 @@ static int rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
int nsegs, nchunks;
__be32 *segcount;
- if (wtype != rpcrdma_replych)
- return encode_item_not_present(xdr);
+ if (wtype != rpcrdma_replych) {
+ if (xdr_stream_encode_item_absent(xdr) < 0)
+ return -EMSGSIZE;
+ return 0;
+ }
seg = req->rl_segments;
nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg);
if (nsegs < 0)
return nsegs;
- if (encode_item_present(xdr) < 0)
+ if (xdr_stream_encode_item_present(xdr) < 0)
return -EMSGSIZE;
segcount = xdr_reserve_space(xdr, sizeof(*segcount));
if (unlikely(!segcount))
@@ -894,6 +867,12 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
__be32 *p;
int ret;
+ if (unlikely(rqst->rq_rcv_buf.flags & XDRBUF_SPARSE_PAGES)) {
+ ret = rpcrdma_alloc_sparse_pages(&rqst->rq_rcv_buf);
+ if (ret)
+ return ret;
+ }
+
rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0);
xdr_init_encode(xdr, &req->rl_hdrbuf, rdmab_data(req->rl_rdmabuf),
rqst);
@@ -911,8 +890,8 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
* or privacy, direct data placement of individual data items
* is not allowed.
*/
- ddp_allowed = !(rqst->rq_cred->cr_auth->au_flags &
- RPCAUTH_AUTH_DATATOUCH);
+ ddp_allowed = !test_bit(RPCAUTH_AUTH_DATATOUCH,
+ &rqst->rq_cred->cr_auth->au_flags);
/*
* Chunks needed for results?
@@ -1142,6 +1121,7 @@ static bool
rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep)
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
{
+ struct rpc_xprt *xprt = &r_xprt->rx_xprt;
struct xdr_stream *xdr = &rep->rr_stream;
__be32 *p;
@@ -1152,11 +1132,11 @@ rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep)
p = xdr_inline_decode(xdr, 0);
/* Chunk lists */
- if (*p++ != xdr_zero)
+ if (xdr_item_is_present(p++))
return false;
- if (*p++ != xdr_zero)
+ if (xdr_item_is_present(p++))
return false;
- if (*p++ != xdr_zero)
+ if (xdr_item_is_present(p++))
return false;
/* RPC header */
@@ -1165,19 +1145,19 @@ rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep)
if (*p != cpu_to_be32(RPC_CALL))
return false;
+ /* No bc service. */
+ if (xprt->bc_serv == NULL)
+ return false;
+
/* Now that we are sure this is a backchannel call,
* advance to the RPC header.
*/
p = xdr_inline_decode(xdr, 3 * sizeof(*p));
if (unlikely(!p))
- goto out_short;
+ return true;
rpcrdma_bc_receive_call(r_xprt, rep);
return true;
-
-out_short:
- pr_warn("RPC/RDMA short backward direction call\n");
- return true;
}
#else /* CONFIG_SUNRPC_BACKCHANNEL */
{
@@ -1195,10 +1175,7 @@ static int decode_rdma_segment(struct xdr_stream *xdr, u32 *length)
if (unlikely(!p))
return -EIO;
- handle = be32_to_cpup(p++);
- *length = be32_to_cpup(p++);
- xdr_decode_hyper(p, &offset);
-
+ xdr_decode_rdma_segment(p, &handle, length, &offset);
trace_xprtrdma_decode_seg(handle, *length, offset);
return 0;
}
@@ -1234,7 +1211,7 @@ static int decode_read_list(struct xdr_stream *xdr)
p = xdr_inline_decode(xdr, sizeof(*p));
if (unlikely(!p))
return -EIO;
- if (unlikely(*p != xdr_zero))
+ if (unlikely(xdr_item_is_present(p)))
return -EIO;
return 0;
}
@@ -1253,7 +1230,7 @@ static int decode_write_list(struct xdr_stream *xdr, u32 *length)
p = xdr_inline_decode(xdr, sizeof(*p));
if (unlikely(!p))
return -EIO;
- if (*p == xdr_zero)
+ if (xdr_item_is_absent(p))
break;
if (!first)
return -EIO;
@@ -1275,7 +1252,7 @@ static int decode_reply_chunk(struct xdr_stream *xdr, u32 *length)
return -EIO;
*length = 0;
- if (*p != xdr_zero)
+ if (xdr_item_is_present(p))
if (decode_write_chunk(xdr, length))
return -EIO;
return 0;
@@ -1352,29 +1329,47 @@ rpcrdma_decode_error(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
p = xdr_inline_decode(xdr, 2 * sizeof(*p));
if (!p)
break;
- dprintk("RPC: %s: server reports "
- "version error (%u-%u), xid %08x\n", __func__,
- be32_to_cpup(p), be32_to_cpu(*(p + 1)),
- be32_to_cpu(rep->rr_xid));
+ trace_xprtrdma_err_vers(rqst, p, p + 1);
break;
case err_chunk:
- dprintk("RPC: %s: server reports "
- "header decoding error, xid %08x\n", __func__,
- be32_to_cpu(rep->rr_xid));
+ trace_xprtrdma_err_chunk(rqst);
break;
default:
- dprintk("RPC: %s: server reports "
- "unrecognized error %d, xid %08x\n", __func__,
- be32_to_cpup(p), be32_to_cpu(rep->rr_xid));
+ trace_xprtrdma_err_unrecognized(rqst, p);
}
- r_xprt->rx_stats.bad_reply_count++;
- return -EREMOTEIO;
+ return -EIO;
+}
+
+/**
+ * rpcrdma_unpin_rqst - Release rqst without completing it
+ * @rep: RPC/RDMA Receive context
+ *
+ * This is done when a connection is lost so that a Reply
+ * can be dropped and its matching Call can be subsequently
+ * retransmitted on a new connection.
+ */
+void rpcrdma_unpin_rqst(struct rpcrdma_rep *rep)
+{
+ struct rpc_xprt *xprt = &rep->rr_rxprt->rx_xprt;
+ struct rpc_rqst *rqst = rep->rr_rqst;
+ struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
+
+ req->rl_reply = NULL;
+ rep->rr_rqst = NULL;
+
+ spin_lock(&xprt->queue_lock);
+ xprt_unpin_rqst(rqst);
+ spin_unlock(&xprt->queue_lock);
}
-/* Perform XID lookup, reconstruction of the RPC reply, and
- * RPC completion while holding the transport lock to ensure
- * the rep, rqst, and rq_task pointers remain stable.
+/**
+ * rpcrdma_complete_rqst - Pass completed rqst back to RPC
+ * @rep: RPC/RDMA Receive context
+ *
+ * Reconstruct the RPC reply and complete the transaction
+ * while @rqst is still pinned to ensure the rep, rqst, and
+ * rq_task pointers remain stable.
*/
void rpcrdma_complete_rqst(struct rpcrdma_rep *rep)
{
@@ -1406,13 +1401,11 @@ out:
spin_unlock(&xprt->queue_lock);
return;
-/* If the incoming reply terminated a pending RPC, the next
- * RPC call will post a replacement receive buffer as it is
- * being marshaled.
- */
out_badheader:
- trace_xprtrdma_reply_hdr(rep);
+ trace_xprtrdma_reply_hdr_err(rep);
r_xprt->rx_stats.bad_reply_count++;
+ rqst->rq_task->tk_status = status;
+ status = 0;
goto out;
}
@@ -1476,21 +1469,20 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
if (credits == 0)
credits = 1; /* don't deadlock */
- else if (credits > r_xprt->rx_ep.rep_max_requests)
- credits = r_xprt->rx_ep.rep_max_requests;
+ else if (credits > r_xprt->rx_ep->re_max_requests)
+ credits = r_xprt->rx_ep->re_max_requests;
+ rpcrdma_post_recvs(r_xprt, credits + (buf->rb_bc_srv_max_requests << 1),
+ false);
if (buf->rb_credits != credits)
rpcrdma_update_cwnd(r_xprt, credits);
- rpcrdma_post_recvs(r_xprt, false);
req = rpcr_to_rdmar(rqst);
- if (req->rl_reply) {
- trace_xprtrdma_leaked_rep(rqst, req->rl_reply);
- rpcrdma_recv_buffer_put(req->rl_reply);
- }
+ if (unlikely(req->rl_reply))
+ rpcrdma_rep_put(buf, req->rl_reply);
req->rl_reply = rep;
rep->rr_rqst = rqst;
- trace_xprtrdma_reply(rqst->rq_task, rep, req, credits);
+ trace_xprtrdma_reply(rqst->rq_task, rep, credits);
if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
frwr_reminv(rep, &req->rl_registered);
@@ -1502,17 +1494,17 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
return;
out_badversion:
- trace_xprtrdma_reply_vers(rep);
+ trace_xprtrdma_reply_vers_err(rep);
goto out;
out_norqst:
spin_unlock(&xprt->queue_lock);
- trace_xprtrdma_reply_rqst(rep);
+ trace_xprtrdma_reply_rqst_err(rep);
goto out;
out_shortreply:
- trace_xprtrdma_reply_short(rep);
+ trace_xprtrdma_reply_short_err(rep);
out:
- rpcrdma_recv_buffer_put(rep);
+ rpcrdma_rep_put(buf, rep);
}