diff options
Diffstat (limited to 'net/sunrpc/xprtrdma/rpc_rdma.c')
-rw-r--r-- | net/sunrpc/xprtrdma/rpc_rdma.c | 302 |
1 files changed, 147 insertions, 155 deletions
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 28020ec104d4..190a4de239c8 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -1,6 +1,6 @@ // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause /* - * Copyright (c) 2014-2017 Oracle. All rights reserved. + * Copyright (c) 2014-2020, Oracle and/or its affiliates. * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. * * This software is available to you under a choice of one of two @@ -54,10 +54,6 @@ #include "xprt_rdma.h" #include <trace/events/rpcrdma.h> -#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) -# define RPCDBG_FACILITY RPCDBG_TRANS -#endif - /* Returns size of largest RPC-over-RDMA header in a Call message * * The largest Call header contains a full-size Read list and a @@ -71,7 +67,7 @@ static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs) size = RPCRDMA_HDRLEN_MIN; /* Maximum Read list size */ - size = maxsegs * rpcrdma_readchunk_maxsz * sizeof(__be32); + size += maxsegs * rpcrdma_readchunk_maxsz * sizeof(__be32); /* Minimal Read chunk size */ size += sizeof(__be32); /* segment count */ @@ -94,7 +90,7 @@ static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs) size = RPCRDMA_HDRLEN_MIN; /* Maximum Write list size */ - size = sizeof(__be32); /* segment count */ + size += sizeof(__be32); /* segment count */ size += maxsegs * rpcrdma_segment_maxsz * sizeof(__be32); size += sizeof(__be32); /* list discriminator */ @@ -103,21 +99,20 @@ static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs) /** * rpcrdma_set_max_header_sizes - Initialize inline payload sizes - * @r_xprt: transport instance to initialize + * @ep: endpoint to initialize * * The max_inline fields contain the maximum size of an RPC message * so the marshaling code doesn't have to repeat this calculation * for every RPC. */ -void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *r_xprt) +void rpcrdma_set_max_header_sizes(struct rpcrdma_ep *ep) { - unsigned int maxsegs = r_xprt->rx_ia.ri_max_rdma_segs; - struct rpcrdma_ep *ep = &r_xprt->rx_ep; + unsigned int maxsegs = ep->re_max_rdma_segs; - ep->rep_max_inline_send = - ep->rep_inline_send - rpcrdma_max_call_header_size(maxsegs); - ep->rep_max_inline_recv = - ep->rep_inline_recv - rpcrdma_max_reply_header_size(maxsegs); + ep->re_max_inline_send = + ep->re_inline_send - rpcrdma_max_call_header_size(maxsegs); + ep->re_max_inline_recv = + ep->re_inline_recv - rpcrdma_max_reply_header_size(maxsegs); } /* The client can send a request inline as long as the RPCRDMA header @@ -132,9 +127,10 @@ static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) { struct xdr_buf *xdr = &rqst->rq_snd_buf; + struct rpcrdma_ep *ep = r_xprt->rx_ep; unsigned int count, remaining, offset; - if (xdr->len > r_xprt->rx_ep.rep_max_inline_send) + if (xdr->len > ep->re_max_inline_send) return false; if (xdr->page_len) { @@ -145,7 +141,7 @@ static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt, remaining -= min_t(unsigned int, PAGE_SIZE - offset, remaining); offset = 0; - if (++count > r_xprt->rx_ep.rep_attr.cap.max_send_sge) + if (++count > ep->re_attr.cap.max_send_sge) return false; } } @@ -162,7 +158,7 @@ static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt, static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) { - return rqst->rq_rcv_buf.buflen <= r_xprt->rx_ep.rep_max_inline_recv; + return rqst->rq_rcv_buf.buflen <= r_xprt->rx_ep->re_max_inline_recv; } /* The client is required to provide a Reply chunk if the maximum @@ -176,12 +172,35 @@ rpcrdma_nonpayload_inline(const struct rpcrdma_xprt *r_xprt, const struct xdr_buf *buf = &rqst->rq_rcv_buf; return (buf->head[0].iov_len + buf->tail[0].iov_len) < - r_xprt->rx_ep.rep_max_inline_recv; + r_xprt->rx_ep->re_max_inline_recv; +} + +/* ACL likes to be lazy in allocating pages. For TCP, these + * pages can be allocated during receive processing. Not true + * for RDMA, which must always provision receive buffers + * up front. + */ +static noinline int +rpcrdma_alloc_sparse_pages(struct xdr_buf *buf) +{ + struct page **ppages; + int len; + + len = buf->page_len; + ppages = buf->pages + (buf->page_base >> PAGE_SHIFT); + while (len > 0) { + if (!*ppages) + *ppages = alloc_page(GFP_NOWAIT | __GFP_NOWARN); + if (!*ppages) + return -ENOBUFS; + ppages++; + len -= PAGE_SIZE; + } + + return 0; } -/* Split @vec on page boundaries into SGEs. FMR registers pages, not - * a byte range. Other modes coalesce these SGEs into a single MR - * when they can. +/* Convert @vec to a single SGL element. * * Returns pointer to next available SGE, and bumps the total number * of SGEs consumed. @@ -190,22 +209,11 @@ static struct rpcrdma_mr_seg * rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, unsigned int *n) { - u32 remaining, page_offset; - char *base; - - base = vec->iov_base; - page_offset = offset_in_page(base); - remaining = vec->iov_len; - while (remaining) { - seg->mr_page = NULL; - seg->mr_offset = base; - seg->mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining); - remaining -= seg->mr_len; - base += seg->mr_len; - ++seg; - ++(*n); - page_offset = 0; - } + seg->mr_page = virt_to_page(vec->iov_base); + seg->mr_offset = offset_in_page(vec->iov_base); + seg->mr_len = vec->iov_len; + ++seg; + ++(*n); return seg; } @@ -233,17 +241,8 @@ rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf, ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT); page_base = offset_in_page(xdrbuf->page_base); while (len) { - /* ACL likes to be lazy in allocating pages - ACLs - * are small by default but can get huge. - */ - if (unlikely(xdrbuf->flags & XDRBUF_SPARSE_PAGES)) { - if (!*ppages) - *ppages = alloc_page(GFP_NOWAIT | __GFP_NOWARN); - if (!*ppages) - return -ENOBUFS; - } seg->mr_page = *ppages; - seg->mr_offset = (char *)page_base; + seg->mr_offset = page_base; seg->mr_len = min_t(u32, PAGE_SIZE - page_base, len); len -= seg->mr_len; ++ppages; @@ -252,22 +251,11 @@ rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf, page_base = 0; } - /* When encoding a Read chunk, the tail iovec contains an - * XDR pad and may be omitted. - */ - if (type == rpcrdma_readch && r_xprt->rx_ia.ri_implicit_roundup) - goto out; - - /* When encoding a Write chunk, some servers need to see an - * extra segment for non-XDR-aligned Write chunks. The upper - * layer provides space in the tail iovec that may be used - * for this purpose. - */ - if (type == rpcrdma_writech && r_xprt->rx_ia.ri_implicit_roundup) + if (type == rpcrdma_readch || type == rpcrdma_writech) goto out; if (xdrbuf->tail[0].iov_len) - seg = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, &n); + rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, &n); out: if (unlikely(n > RPCRDMA_MAX_SEGS)) @@ -275,40 +263,6 @@ out: return n; } -static inline int -encode_item_present(struct xdr_stream *xdr) -{ - __be32 *p; - - p = xdr_reserve_space(xdr, sizeof(*p)); - if (unlikely(!p)) - return -EMSGSIZE; - - *p = xdr_one; - return 0; -} - -static inline int -encode_item_not_present(struct xdr_stream *xdr) -{ - __be32 *p; - - p = xdr_reserve_space(xdr, sizeof(*p)); - if (unlikely(!p)) - return -EMSGSIZE; - - *p = xdr_zero; - return 0; -} - -static void -xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mr *mr) -{ - *iptr++ = cpu_to_be32(mr->mr_handle); - *iptr++ = cpu_to_be32(mr->mr_length); - xdr_encode_hyper(iptr, mr->mr_offset); -} - static int encode_rdma_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr) { @@ -318,7 +272,7 @@ encode_rdma_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr) if (unlikely(!p)) return -EMSGSIZE; - xdr_encode_rdma_segment(p, mr); + xdr_encode_rdma_segment(p, mr->mr_handle, mr->mr_length, mr->mr_offset); return 0; } @@ -333,8 +287,8 @@ encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr, return -EMSGSIZE; *p++ = xdr_one; /* Item present */ - *p++ = cpu_to_be32(position); - xdr_encode_rdma_segment(p, mr); + xdr_encode_read_segment(p, position, mr->mr_handle, mr->mr_length, + mr->mr_offset); return 0; } @@ -349,7 +303,6 @@ static struct rpcrdma_mr_seg *rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt, *mr = rpcrdma_mr_get(r_xprt); if (!*mr) goto out_getmr_err; - trace_xprtrdma_mr_get(req); (*mr)->mr_req = req; } @@ -357,7 +310,7 @@ static struct rpcrdma_mr_seg *rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt, return frwr_map(r_xprt, seg, nsegs, writing, req->rl_slot.rq_xid, *mr); out_getmr_err: - trace_xprtrdma_nomrs(req); + trace_xprtrdma_nomrs_err(r_xprt, req); xprt_wait_for_buffer_space(&r_xprt->rx_xprt); rpcrdma_mrs_refresh(r_xprt); return ERR_PTR(-EAGAIN); @@ -414,7 +367,9 @@ static int rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, } while (nsegs); done: - return encode_item_not_present(xdr); + if (xdr_stream_encode_item_absent(xdr) < 0) + return -EMSGSIZE; + return 0; } /* Register and XDR encode the Write list. Supports encoding a list @@ -438,6 +393,7 @@ static int rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, enum rpcrdma_chunktype wtype) { struct xdr_stream *xdr = &req->rl_stream; + struct rpcrdma_ep *ep = r_xprt->rx_ep; struct rpcrdma_mr_seg *seg; struct rpcrdma_mr *mr; int nsegs, nchunks; @@ -453,7 +409,7 @@ static int rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, if (nsegs < 0) return nsegs; - if (encode_item_present(xdr) < 0) + if (xdr_stream_encode_item_present(xdr) < 0) return -EMSGSIZE; segcount = xdr_reserve_space(xdr, sizeof(*segcount)); if (unlikely(!segcount)) @@ -476,11 +432,25 @@ static int rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, nsegs -= mr->mr_nents; } while (nsegs); + if (xdr_pad_size(rqst->rq_rcv_buf.page_len)) { + if (encode_rdma_segment(xdr, ep->re_write_pad_mr) < 0) + return -EMSGSIZE; + + trace_xprtrdma_chunk_wp(rqst->rq_task, ep->re_write_pad_mr, + nsegs); + r_xprt->rx_stats.write_chunk_count++; + r_xprt->rx_stats.total_rdma_request += mr->mr_length; + nchunks++; + nsegs -= mr->mr_nents; + } + /* Update count of segments in this Write chunk */ *segcount = cpu_to_be32(nchunks); done: - return encode_item_not_present(xdr); + if (xdr_stream_encode_item_absent(xdr) < 0) + return -EMSGSIZE; + return 0; } /* Register and XDR encode the Reply chunk. Supports encoding an array @@ -506,15 +476,18 @@ static int rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, int nsegs, nchunks; __be32 *segcount; - if (wtype != rpcrdma_replych) - return encode_item_not_present(xdr); + if (wtype != rpcrdma_replych) { + if (xdr_stream_encode_item_absent(xdr) < 0) + return -EMSGSIZE; + return 0; + } seg = req->rl_segments; nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg); if (nsegs < 0) return nsegs; - if (encode_item_present(xdr) < 0) + if (xdr_stream_encode_item_present(xdr) < 0) return -EMSGSIZE; segcount = xdr_reserve_space(xdr, sizeof(*segcount)); if (unlikely(!segcount)) @@ -894,6 +867,12 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) __be32 *p; int ret; + if (unlikely(rqst->rq_rcv_buf.flags & XDRBUF_SPARSE_PAGES)) { + ret = rpcrdma_alloc_sparse_pages(&rqst->rq_rcv_buf); + if (ret) + return ret; + } + rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0); xdr_init_encode(xdr, &req->rl_hdrbuf, rdmab_data(req->rl_rdmabuf), rqst); @@ -911,8 +890,8 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) * or privacy, direct data placement of individual data items * is not allowed. */ - ddp_allowed = !(rqst->rq_cred->cr_auth->au_flags & - RPCAUTH_AUTH_DATATOUCH); + ddp_allowed = !test_bit(RPCAUTH_AUTH_DATATOUCH, + &rqst->rq_cred->cr_auth->au_flags); /* * Chunks needed for results? @@ -1142,6 +1121,7 @@ static bool rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep) #if defined(CONFIG_SUNRPC_BACKCHANNEL) { + struct rpc_xprt *xprt = &r_xprt->rx_xprt; struct xdr_stream *xdr = &rep->rr_stream; __be32 *p; @@ -1152,11 +1132,11 @@ rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep) p = xdr_inline_decode(xdr, 0); /* Chunk lists */ - if (*p++ != xdr_zero) + if (xdr_item_is_present(p++)) return false; - if (*p++ != xdr_zero) + if (xdr_item_is_present(p++)) return false; - if (*p++ != xdr_zero) + if (xdr_item_is_present(p++)) return false; /* RPC header */ @@ -1165,19 +1145,19 @@ rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep) if (*p != cpu_to_be32(RPC_CALL)) return false; + /* No bc service. */ + if (xprt->bc_serv == NULL) + return false; + /* Now that we are sure this is a backchannel call, * advance to the RPC header. */ p = xdr_inline_decode(xdr, 3 * sizeof(*p)); if (unlikely(!p)) - goto out_short; + return true; rpcrdma_bc_receive_call(r_xprt, rep); return true; - -out_short: - pr_warn("RPC/RDMA short backward direction call\n"); - return true; } #else /* CONFIG_SUNRPC_BACKCHANNEL */ { @@ -1195,10 +1175,7 @@ static int decode_rdma_segment(struct xdr_stream *xdr, u32 *length) if (unlikely(!p)) return -EIO; - handle = be32_to_cpup(p++); - *length = be32_to_cpup(p++); - xdr_decode_hyper(p, &offset); - + xdr_decode_rdma_segment(p, &handle, length, &offset); trace_xprtrdma_decode_seg(handle, *length, offset); return 0; } @@ -1234,7 +1211,7 @@ static int decode_read_list(struct xdr_stream *xdr) p = xdr_inline_decode(xdr, sizeof(*p)); if (unlikely(!p)) return -EIO; - if (unlikely(*p != xdr_zero)) + if (unlikely(xdr_item_is_present(p))) return -EIO; return 0; } @@ -1253,7 +1230,7 @@ static int decode_write_list(struct xdr_stream *xdr, u32 *length) p = xdr_inline_decode(xdr, sizeof(*p)); if (unlikely(!p)) return -EIO; - if (*p == xdr_zero) + if (xdr_item_is_absent(p)) break; if (!first) return -EIO; @@ -1275,7 +1252,7 @@ static int decode_reply_chunk(struct xdr_stream *xdr, u32 *length) return -EIO; *length = 0; - if (*p != xdr_zero) + if (xdr_item_is_present(p)) if (decode_write_chunk(xdr, length)) return -EIO; return 0; @@ -1352,29 +1329,47 @@ rpcrdma_decode_error(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep, p = xdr_inline_decode(xdr, 2 * sizeof(*p)); if (!p) break; - dprintk("RPC: %s: server reports " - "version error (%u-%u), xid %08x\n", __func__, - be32_to_cpup(p), be32_to_cpu(*(p + 1)), - be32_to_cpu(rep->rr_xid)); + trace_xprtrdma_err_vers(rqst, p, p + 1); break; case err_chunk: - dprintk("RPC: %s: server reports " - "header decoding error, xid %08x\n", __func__, - be32_to_cpu(rep->rr_xid)); + trace_xprtrdma_err_chunk(rqst); break; default: - dprintk("RPC: %s: server reports " - "unrecognized error %d, xid %08x\n", __func__, - be32_to_cpup(p), be32_to_cpu(rep->rr_xid)); + trace_xprtrdma_err_unrecognized(rqst, p); } - r_xprt->rx_stats.bad_reply_count++; - return -EREMOTEIO; + return -EIO; +} + +/** + * rpcrdma_unpin_rqst - Release rqst without completing it + * @rep: RPC/RDMA Receive context + * + * This is done when a connection is lost so that a Reply + * can be dropped and its matching Call can be subsequently + * retransmitted on a new connection. + */ +void rpcrdma_unpin_rqst(struct rpcrdma_rep *rep) +{ + struct rpc_xprt *xprt = &rep->rr_rxprt->rx_xprt; + struct rpc_rqst *rqst = rep->rr_rqst; + struct rpcrdma_req *req = rpcr_to_rdmar(rqst); + + req->rl_reply = NULL; + rep->rr_rqst = NULL; + + spin_lock(&xprt->queue_lock); + xprt_unpin_rqst(rqst); + spin_unlock(&xprt->queue_lock); } -/* Perform XID lookup, reconstruction of the RPC reply, and - * RPC completion while holding the transport lock to ensure - * the rep, rqst, and rq_task pointers remain stable. +/** + * rpcrdma_complete_rqst - Pass completed rqst back to RPC + * @rep: RPC/RDMA Receive context + * + * Reconstruct the RPC reply and complete the transaction + * while @rqst is still pinned to ensure the rep, rqst, and + * rq_task pointers remain stable. */ void rpcrdma_complete_rqst(struct rpcrdma_rep *rep) { @@ -1406,13 +1401,11 @@ out: spin_unlock(&xprt->queue_lock); return; -/* If the incoming reply terminated a pending RPC, the next - * RPC call will post a replacement receive buffer as it is - * being marshaled. - */ out_badheader: - trace_xprtrdma_reply_hdr(rep); + trace_xprtrdma_reply_hdr_err(rep); r_xprt->rx_stats.bad_reply_count++; + rqst->rq_task->tk_status = status; + status = 0; goto out; } @@ -1476,21 +1469,20 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep) if (credits == 0) credits = 1; /* don't deadlock */ - else if (credits > r_xprt->rx_ep.rep_max_requests) - credits = r_xprt->rx_ep.rep_max_requests; + else if (credits > r_xprt->rx_ep->re_max_requests) + credits = r_xprt->rx_ep->re_max_requests; + rpcrdma_post_recvs(r_xprt, credits + (buf->rb_bc_srv_max_requests << 1), + false); if (buf->rb_credits != credits) rpcrdma_update_cwnd(r_xprt, credits); - rpcrdma_post_recvs(r_xprt, false); req = rpcr_to_rdmar(rqst); - if (req->rl_reply) { - trace_xprtrdma_leaked_rep(rqst, req->rl_reply); - rpcrdma_recv_buffer_put(req->rl_reply); - } + if (unlikely(req->rl_reply)) + rpcrdma_rep_put(buf, req->rl_reply); req->rl_reply = rep; rep->rr_rqst = rqst; - trace_xprtrdma_reply(rqst->rq_task, rep, req, credits); + trace_xprtrdma_reply(rqst->rq_task, rep, credits); if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE) frwr_reminv(rep, &req->rl_registered); @@ -1502,17 +1494,17 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep) return; out_badversion: - trace_xprtrdma_reply_vers(rep); + trace_xprtrdma_reply_vers_err(rep); goto out; out_norqst: spin_unlock(&xprt->queue_lock); - trace_xprtrdma_reply_rqst(rep); + trace_xprtrdma_reply_rqst_err(rep); goto out; out_shortreply: - trace_xprtrdma_reply_short(rep); + trace_xprtrdma_reply_short_err(rep); out: - rpcrdma_recv_buffer_put(rep); + rpcrdma_rep_put(buf, rep); } |