diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2020-12-15 18:52:30 -0800 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2020-12-15 18:52:30 -0800 |
commit | 1a50ede2b3c846761a71c409f53e9121311a13c2 (patch) | |
tree | 40f228239b077c2db87580dfd66556ad0ca7046f /net | |
parent | 9867cb1fd510187d8f828540bdb48f78fceb70b3 (diff) | |
parent | 716a8bc7f706eeef80ab42c99d9f210eda845c81 (diff) |
Merge tag 'nfsd-5.11' of git://git.linux-nfs.org/projects/cel/cel-2.6
Pull nfsd updates from Chuck Lever:
"Several substantial changes this time around:
- Previously, exporting an NFS mount via NFSD was considered to be an
unsupported feature. With v5.11, the community has attempted to
make re-exporting a first-class feature of NFSD.
This would enable the Linux in-kernel NFS server to be used as an
intermediate cache for a remotely-located primary NFS server, for
example, even with other NFS server implementations, like a NetApp
filer, as the primary.
- A short series of patches brings support for multiple RPC/RDMA data
chunks per RPC transaction to the Linux NFS server's RPC/RDMA
transport implementation.
This is a part of the RPC/RDMA spec that the other premiere
NFS/RDMA implementation (Solaris) has had for a very long time, and
completes the implementation of RPC/RDMA version 1 in the Linux
kernel's NFS server.
- Long ago, NFSv4 support was introduced to NFSD using a series of C
macros that hid dprintk's and goto's. Over time, the kernel's XDR
implementation has been greatly improved, but these C macros have
remained and become fallow. A series of patches in this pull
request completely replaces those macros with the use of current
kernel XDR infrastructure. Benefits include:
- More robust input sanitization in NFSD's NFSv4 XDR decoders.
- Make it easier to use common kernel library functions that use
XDR stream APIs (for example, GSS-API).
- Align the structure of the source code with the RFCs so it is
easier to learn, verify, and maintain our XDR implementation.
- Removal of more than a hundred hidden dprintk() call sites.
- Removal of some explicit manipulation of pages to help make the
eventual transition to xdr->bvec smoother.
- On top of several related fixes in 5.10-rc, there are a few more
fixes to get the Linux NFSD implementation of NFSv4.2 inter-server
copy up to speed.
And as usual, there is a pinch of seasoning in the form of a
collection of unrelated minor bug fixes and clean-ups.
Many thanks to all who contributed this time around!"
* tag 'nfsd-5.11' of git://git.linux-nfs.org/projects/cel/cel-2.6: (131 commits)
nfsd: Record NFSv4 pre/post-op attributes as non-atomic
nfsd: Set PF_LOCAL_THROTTLE on local filesystems only
nfsd: Fix up nfsd to ensure that timeout errors don't result in ESTALE
exportfs: Add a function to return the raw output from fh_to_dentry()
nfsd: close cached files prior to a REMOVE or RENAME that would replace target
nfsd: allow filesystems to opt out of subtree checking
nfsd: add a new EXPORT_OP_NOWCC flag to struct export_operations
Revert "nfsd4: support change_attr_type attribute"
nfsd4: don't query change attribute in v2/v3 case
nfsd: minor nfsd4_change_attribute cleanup
nfsd: simplify nfsd4_change_info
nfsd: only call inode_query_iversion in the I_VERSION case
nfs_common: need lock during iterate through the list
NFSD: Fix 5 seconds delay when doing inter server copy
NFSD: Fix sparse warning in nfs4proc.c
SUNRPC: Remove XDRBUF_SPARSE_PAGES flag in gss_proxy upcall
sunrpc: clean-up cache downcall
nfsd: Fix message level for normal termination
NFSD: Remove macros that are no longer used
NFSD: Replace READ* macros in nfsd4_decode_compound()
...
Diffstat (limited to 'net')
-rw-r--r-- | net/sunrpc/auth_gss/gss_rpc_upcall.c | 15 | ||||
-rw-r--r-- | net/sunrpc/auth_gss/gss_rpc_xdr.c | 3 | ||||
-rw-r--r-- | net/sunrpc/cache.c | 41 | ||||
-rw-r--r-- | net/sunrpc/svc.c | 16 | ||||
-rw-r--r-- | net/sunrpc/svc_xprt.c | 4 | ||||
-rw-r--r-- | net/sunrpc/svcsock.c | 8 | ||||
-rw-r--r-- | net/sunrpc/xdr.c | 78 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/Makefile | 2 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 14 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_pcl.c | 306 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 314 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_rw.c | 600 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_sendto.c | 562 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_transport.c | 2 |
14 files changed, 1315 insertions, 650 deletions
diff --git a/net/sunrpc/auth_gss/gss_rpc_upcall.c b/net/sunrpc/auth_gss/gss_rpc_upcall.c index af9c7f43859c..d1c003a25b0f 100644 --- a/net/sunrpc/auth_gss/gss_rpc_upcall.c +++ b/net/sunrpc/auth_gss/gss_rpc_upcall.c @@ -200,7 +200,7 @@ static int gssp_call(struct net *net, struct rpc_message *msg) static void gssp_free_receive_pages(struct gssx_arg_accept_sec_context *arg) { - int i; + unsigned int i; for (i = 0; i < arg->npages && arg->pages[i]; i++) __free_page(arg->pages[i]); @@ -210,14 +210,19 @@ static void gssp_free_receive_pages(struct gssx_arg_accept_sec_context *arg) static int gssp_alloc_receive_pages(struct gssx_arg_accept_sec_context *arg) { + unsigned int i; + arg->npages = DIV_ROUND_UP(NGROUPS_MAX * 4, PAGE_SIZE); arg->pages = kcalloc(arg->npages, sizeof(struct page *), GFP_KERNEL); - /* - * XXX: actual pages are allocated by xdr layer in - * xdr_partial_copy_from_skb. - */ if (!arg->pages) return -ENOMEM; + for (i = 0; i < arg->npages; i++) { + arg->pages[i] = alloc_page(GFP_KERNEL); + if (!arg->pages[i]) { + gssp_free_receive_pages(arg); + return -ENOMEM; + } + } return 0; } diff --git a/net/sunrpc/auth_gss/gss_rpc_xdr.c b/net/sunrpc/auth_gss/gss_rpc_xdr.c index 2ff7b7083eba..d79f12c2550a 100644 --- a/net/sunrpc/auth_gss/gss_rpc_xdr.c +++ b/net/sunrpc/auth_gss/gss_rpc_xdr.c @@ -771,7 +771,6 @@ void gssx_enc_accept_sec_context(struct rpc_rqst *req, xdr_inline_pages(&req->rq_rcv_buf, PAGE_SIZE/2 /* pretty arbitrary */, arg->pages, 0 /* page base */, arg->npages * PAGE_SIZE); - req->rq_rcv_buf.flags |= XDRBUF_SPARSE_PAGES; done: if (err) dprintk("RPC: gssx_enc_accept_sec_context: %d\n", err); @@ -789,7 +788,7 @@ int gssx_dec_accept_sec_context(struct rpc_rqst *rqstp, scratch = alloc_page(GFP_KERNEL); if (!scratch) return -ENOMEM; - xdr_set_scratch_buffer(xdr, page_address(scratch), PAGE_SIZE); + xdr_set_scratch_page(xdr, scratch); /* res->status */ err = gssx_dec_status(xdr, &res->status); diff --git a/net/sunrpc/cache.c b/net/sunrpc/cache.c index 20c93b68505e..1a2c1c44bb00 100644 --- a/net/sunrpc/cache.c +++ b/net/sunrpc/cache.c @@ -778,7 +778,6 @@ void cache_clean_deferred(void *owner) */ static DEFINE_SPINLOCK(queue_lock); -static DEFINE_MUTEX(queue_io_mutex); struct cache_queue { struct list_head list; @@ -906,44 +905,26 @@ static ssize_t cache_do_downcall(char *kaddr, const char __user *buf, return ret; } -static ssize_t cache_slow_downcall(const char __user *buf, - size_t count, struct cache_detail *cd) -{ - static char write_buf[32768]; /* protected by queue_io_mutex */ - ssize_t ret = -EINVAL; - - if (count >= sizeof(write_buf)) - goto out; - mutex_lock(&queue_io_mutex); - ret = cache_do_downcall(write_buf, buf, count, cd); - mutex_unlock(&queue_io_mutex); -out: - return ret; -} - static ssize_t cache_downcall(struct address_space *mapping, const char __user *buf, size_t count, struct cache_detail *cd) { - struct page *page; - char *kaddr; + char *write_buf; ssize_t ret = -ENOMEM; - if (count >= PAGE_SIZE) - goto out_slow; + if (count >= 32768) { /* 32k is max userland buffer, lets check anyway */ + ret = -EINVAL; + goto out; + } - page = find_or_create_page(mapping, 0, GFP_KERNEL); - if (!page) - goto out_slow; + write_buf = kvmalloc(count + 1, GFP_KERNEL); + if (!write_buf) + goto out; - kaddr = kmap(page); - ret = cache_do_downcall(kaddr, buf, count, cd); - kunmap(page); - unlock_page(page); - put_page(page); + ret = cache_do_downcall(write_buf, buf, count, cd); + kvfree(write_buf); +out: return ret; -out_slow: - return cache_slow_downcall(buf, count, cd); } static ssize_t cache_write(struct file *filp, const char __user *buf, diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c index c211b607239e..4187745887f0 100644 --- a/net/sunrpc/svc.c +++ b/net/sunrpc/svc.c @@ -614,6 +614,10 @@ svc_rqst_alloc(struct svc_serv *serv, struct svc_pool *pool, int node) rqstp->rq_server = serv; rqstp->rq_pool = pool; + rqstp->rq_scratch_page = alloc_pages_node(node, GFP_KERNEL, 0); + if (!rqstp->rq_scratch_page) + goto out_enomem; + rqstp->rq_argp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node); if (!rqstp->rq_argp) goto out_enomem; @@ -842,6 +846,7 @@ void svc_rqst_free(struct svc_rqst *rqstp) { svc_release_buffer(rqstp); + put_page(rqstp->rq_scratch_page); kfree(rqstp->rq_resp); kfree(rqstp->rq_argp); kfree(rqstp->rq_auth_data); @@ -1622,7 +1627,7 @@ u32 svc_max_payload(const struct svc_rqst *rqstp) EXPORT_SYMBOL_GPL(svc_max_payload); /** - * svc_encode_read_payload - mark a range of bytes as a READ payload + * svc_encode_result_payload - mark a range of bytes as a result payload * @rqstp: svc_rqst to operate on * @offset: payload's byte offset in rqstp->rq_res * @length: size of payload, in bytes @@ -1630,12 +1635,13 @@ EXPORT_SYMBOL_GPL(svc_max_payload); * Returns zero on success, or a negative errno if a permanent * error occurred. */ -int svc_encode_read_payload(struct svc_rqst *rqstp, unsigned int offset, - unsigned int length) +int svc_encode_result_payload(struct svc_rqst *rqstp, unsigned int offset, + unsigned int length) { - return rqstp->rq_xprt->xpt_ops->xpo_read_payload(rqstp, offset, length); + return rqstp->rq_xprt->xpt_ops->xpo_result_payload(rqstp, offset, + length); } -EXPORT_SYMBOL_GPL(svc_encode_read_payload); +EXPORT_SYMBOL_GPL(svc_encode_result_payload); /** * svc_fill_write_vector - Construct data argument for VFS write call diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c index 43cf8dbde898..5fb9164aa690 100644 --- a/net/sunrpc/svc_xprt.c +++ b/net/sunrpc/svc_xprt.c @@ -813,8 +813,6 @@ static int svc_handle_xprt(struct svc_rqst *rqstp, struct svc_xprt *xprt) len = svc_deferred_recv(rqstp); else len = xprt->xpt_ops->xpo_recvfrom(rqstp); - if (len > 0) - trace_svc_xdr_recvfrom(rqstp, &rqstp->rq_arg); rqstp->rq_stime = ktime_get(); rqstp->rq_reserved = serv->sv_max_mesg; atomic_add(rqstp->rq_reserved, &xprt->xpt_reserved); @@ -868,7 +866,7 @@ int svc_recv(struct svc_rqst *rqstp, long timeout) if (serv->sv_stats) serv->sv_stats->netcnt++; - trace_svc_recv(rqstp, len); + trace_svc_xdr_recvfrom(rqstp, &rqstp->rq_arg); return len; out_release: rqstp->rq_res.len = 0; diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c index c2752e2b9ce3..b248f2349437 100644 --- a/net/sunrpc/svcsock.c +++ b/net/sunrpc/svcsock.c @@ -181,8 +181,8 @@ static void svc_set_cmsg_data(struct svc_rqst *rqstp, struct cmsghdr *cmh) } } -static int svc_sock_read_payload(struct svc_rqst *rqstp, unsigned int offset, - unsigned int length) +static int svc_sock_result_payload(struct svc_rqst *rqstp, unsigned int offset, + unsigned int length) { return 0; } @@ -635,7 +635,7 @@ static const struct svc_xprt_ops svc_udp_ops = { .xpo_create = svc_udp_create, .xpo_recvfrom = svc_udp_recvfrom, .xpo_sendto = svc_udp_sendto, - .xpo_read_payload = svc_sock_read_payload, + .xpo_result_payload = svc_sock_result_payload, .xpo_release_rqst = svc_udp_release_rqst, .xpo_detach = svc_sock_detach, .xpo_free = svc_sock_free, @@ -1123,7 +1123,7 @@ static const struct svc_xprt_ops svc_tcp_ops = { .xpo_create = svc_tcp_create, .xpo_recvfrom = svc_tcp_recvfrom, .xpo_sendto = svc_tcp_sendto, - .xpo_read_payload = svc_sock_read_payload, + .xpo_result_payload = svc_sock_result_payload, .xpo_release_rqst = svc_tcp_release_rqst, .xpo_detach = svc_tcp_sock_detach, .xpo_free = svc_sock_free, diff --git a/net/sunrpc/xdr.c b/net/sunrpc/xdr.c index 71e03b930b70..757560a3b06b 100644 --- a/net/sunrpc/xdr.c +++ b/net/sunrpc/xdr.c @@ -669,7 +669,7 @@ void xdr_init_encode(struct xdr_stream *xdr, struct xdr_buf *buf, __be32 *p, struct kvec *iov = buf->head; int scratch_len = buf->buflen - buf->page_len - buf->tail[0].iov_len; - xdr_set_scratch_buffer(xdr, NULL, 0); + xdr_reset_scratch_buffer(xdr); BUG_ON(scratch_len < 0); xdr->buf = buf; xdr->iov = iov; @@ -713,7 +713,7 @@ inline void xdr_commit_encode(struct xdr_stream *xdr) page = page_address(*xdr->page_ptr); memcpy(xdr->scratch.iov_base, page, shift); memmove(page, page + shift, (void *)xdr->p - page); - xdr->scratch.iov_len = 0; + xdr_reset_scratch_buffer(xdr); } EXPORT_SYMBOL_GPL(xdr_commit_encode); @@ -743,8 +743,7 @@ static __be32 *xdr_get_next_encode_buffer(struct xdr_stream *xdr, * the "scratch" iov to track any temporarily unused fragment of * space at the end of the previous buffer: */ - xdr->scratch.iov_base = xdr->p; - xdr->scratch.iov_len = frag1bytes; + xdr_set_scratch_buffer(xdr, xdr->p, frag1bytes); p = page_address(*xdr->page_ptr); /* * Note this is where the next encode will start after we've @@ -1052,8 +1051,7 @@ void xdr_init_decode(struct xdr_stream *xdr, struct xdr_buf *buf, __be32 *p, struct rpc_rqst *rqst) { xdr->buf = buf; - xdr->scratch.iov_base = NULL; - xdr->scratch.iov_len = 0; + xdr_reset_scratch_buffer(xdr); xdr->nwords = XDR_QUADLEN(buf->len); if (buf->head[0].iov_len != 0) xdr_set_iov(xdr, buf->head, buf->len); @@ -1101,24 +1099,6 @@ static __be32 * __xdr_inline_decode(struct xdr_stream *xdr, size_t nbytes) return p; } -/** - * xdr_set_scratch_buffer - Attach a scratch buffer for decoding data. - * @xdr: pointer to xdr_stream struct - * @buf: pointer to an empty buffer - * @buflen: size of 'buf' - * - * The scratch buffer is used when decoding from an array of pages. - * If an xdr_inline_decode() call spans across page boundaries, then - * we copy the data into the scratch buffer in order to allow linear - * access. - */ -void xdr_set_scratch_buffer(struct xdr_stream *xdr, void *buf, size_t buflen) -{ - xdr->scratch.iov_base = buf; - xdr->scratch.iov_len = buflen; -} -EXPORT_SYMBOL_GPL(xdr_set_scratch_buffer); - static __be32 *xdr_copy_to_scratch(struct xdr_stream *xdr, size_t nbytes) { __be32 *p; @@ -1379,9 +1359,8 @@ EXPORT_SYMBOL_GPL(xdr_buf_from_iov); * * Returns -1 if base of length are out of bounds. */ -int -xdr_buf_subsegment(struct xdr_buf *buf, struct xdr_buf *subbuf, - unsigned int base, unsigned int len) +int xdr_buf_subsegment(const struct xdr_buf *buf, struct xdr_buf *subbuf, + unsigned int base, unsigned int len) { subbuf->buflen = subbuf->len = len; if (base < buf->head[0].iov_len) { @@ -1429,6 +1408,51 @@ xdr_buf_subsegment(struct xdr_buf *buf, struct xdr_buf *subbuf, EXPORT_SYMBOL_GPL(xdr_buf_subsegment); /** + * xdr_stream_subsegment - set @subbuf to a portion of @xdr + * @xdr: an xdr_stream set up for decoding + * @subbuf: the result buffer + * @nbytes: length of @xdr to extract, in bytes + * + * Sets up @subbuf to represent a portion of @xdr. The portion + * starts at the current offset in @xdr, and extends for a length + * of @nbytes. If this is successful, @xdr is advanced to the next + * position following that portion. + * + * Return values: + * %true: @subbuf has been initialized, and @xdr has been advanced. + * %false: a bounds error has occurred + */ +bool xdr_stream_subsegment(struct xdr_stream *xdr, struct xdr_buf *subbuf, + unsigned int nbytes) +{ + unsigned int remaining, offset, len; + + if (xdr_buf_subsegment(xdr->buf, subbuf, xdr_stream_pos(xdr), nbytes)) + return false; + + if (subbuf->head[0].iov_len) + if (!__xdr_inline_decode(xdr, subbuf->head[0].iov_len)) + return false; + + remaining = subbuf->page_len; + offset = subbuf->page_base; + while (remaining) { + len = min_t(unsigned int, remaining, PAGE_SIZE) - offset; + + if (xdr->p == xdr->end && !xdr_set_next_buffer(xdr)) + return false; + if (!__xdr_inline_decode(xdr, len)) + return false; + + remaining -= len; + offset = 0; + } + + return true; +} +EXPORT_SYMBOL_GPL(xdr_stream_subsegment); + +/** * xdr_buf_trim - lop at most "len" bytes off the end of "buf" * @buf: buf to be trimmed * @len: number of bytes to reduce "buf" by diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile index 8ed0377d7a18..55b21bae866d 100644 --- a/net/sunrpc/xprtrdma/Makefile +++ b/net/sunrpc/xprtrdma/Makefile @@ -4,5 +4,5 @@ obj-$(CONFIG_SUNRPC_XPRT_RDMA) += rpcrdma.o rpcrdma-y := transport.o rpc_rdma.o verbs.o frwr_ops.o \ svc_rdma.o svc_rdma_backchannel.o svc_rdma_transport.o \ svc_rdma_sendto.o svc_rdma_recvfrom.o svc_rdma_rw.o \ - module.o + svc_rdma_pcl.o module.o rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c index 5e7c4ba9e147..63f8be974df2 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c +++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c @@ -74,11 +74,17 @@ out_unlock: */ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst, - struct svc_rdma_send_ctxt *ctxt) + struct svc_rdma_send_ctxt *sctxt) { + struct svc_rdma_recv_ctxt *rctxt; int ret; - ret = svc_rdma_map_reply_msg(rdma, ctxt, NULL, &rqst->rq_snd_buf); + rctxt = svc_rdma_recv_ctxt_get(rdma); + if (!rctxt) + return -EIO; + + ret = svc_rdma_map_reply_msg(rdma, sctxt, rctxt, &rqst->rq_snd_buf); + svc_rdma_recv_ctxt_put(rdma, rctxt); if (ret < 0) return -EIO; @@ -86,8 +92,8 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma, * the rq_buffer before all retransmits are complete. */ get_page(virt_to_page(rqst->rq_buffer)); - ctxt->sc_send_wr.opcode = IB_WR_SEND; - return svc_rdma_send(rdma, ctxt); + sctxt->sc_send_wr.opcode = IB_WR_SEND; + return svc_rdma_send(rdma, sctxt); } /* Server-side transport endpoint wants a whole page for its send diff --git a/net/sunrpc/xprtrdma/svc_rdma_pcl.c b/net/sunrpc/xprtrdma/svc_rdma_pcl.c new file mode 100644 index 000000000000..b63cfeaa2923 --- /dev/null +++ b/net/sunrpc/xprtrdma/svc_rdma_pcl.c @@ -0,0 +1,306 @@ +// SPDX-License-Identifier: GPL-2.0 +/* + * Copyright (c) 2020 Oracle. All rights reserved. + */ + +#include <linux/sunrpc/svc_rdma.h> +#include <linux/sunrpc/rpc_rdma.h> + +#include "xprt_rdma.h" +#include <trace/events/rpcrdma.h> + +/** + * pcl_free - Release all memory associated with a parsed chunk list + * @pcl: parsed chunk list + * + */ +void pcl_free(struct svc_rdma_pcl *pcl) +{ + while (!list_empty(&pcl->cl_chunks)) { + struct svc_rdma_chunk *chunk; + + chunk = pcl_first_chunk(pcl); + list_del(&chunk->ch_list); + kfree(chunk); + } +} + +static struct svc_rdma_chunk *pcl_alloc_chunk(u32 segcount, u32 position) +{ + struct svc_rdma_chunk *chunk; + + chunk = kmalloc(struct_size(chunk, ch_segments, segcount), GFP_KERNEL); + if (!chunk) + return NULL; + + chunk->ch_position = position; + chunk->ch_length = 0; + chunk->ch_payload_length = 0; + chunk->ch_segcount = 0; + return chunk; +} + +static struct svc_rdma_chunk * +pcl_lookup_position(struct svc_rdma_pcl *pcl, u32 position) +{ + struct svc_rdma_chunk *pos; + + pcl_for_each_chunk(pos, pcl) { + if (pos->ch_position == position) + return pos; + } + return NULL; +} + +static void pcl_insert_position(struct svc_rdma_pcl *pcl, + struct svc_rdma_chunk *chunk) +{ + struct svc_rdma_chunk *pos; + + pcl_for_each_chunk(pos, pcl) { + if (pos->ch_position > chunk->ch_position) + break; + } + __list_add(&chunk->ch_list, pos->ch_list.prev, &pos->ch_list); + pcl->cl_count++; +} + +static void pcl_set_read_segment(const struct svc_rdma_recv_ctxt *rctxt, + struct svc_rdma_chunk *chunk, + u32 handle, u32 length, u64 offset) +{ + struct svc_rdma_segment *segment; + + segment = &chunk->ch_segments[chunk->ch_segcount]; + segment->rs_handle = handle; + segment->rs_length = length; + segment->rs_offset = offset; + + trace_svcrdma_decode_rseg(&rctxt->rc_cid, chunk, segment); + + chunk->ch_length += length; + chunk->ch_segcount++; +} + +/** + * pcl_alloc_call - Construct a parsed chunk list for the Call body + * @rctxt: Ingress receive context + * @p: Start of an un-decoded Read list + * + * Assumptions: + * - The incoming Read list has already been sanity checked. + * - cl_count is already set to the number of segments in + * the un-decoded list. + * - The list might not be in order by position. + * + * Return values: + * %true: Parsed chunk list was successfully constructed, and + * cl_count is updated to be the number of chunks (ie. + * unique positions) in the Read list. + * %false: Memory allocation failed. + */ +bool pcl_alloc_call(struct svc_rdma_recv_ctxt *rctxt, __be32 *p) +{ + struct svc_rdma_pcl *pcl = &rctxt->rc_call_pcl; + unsigned int i, segcount = pcl->cl_count; + + pcl->cl_count = 0; + for (i = 0; i < segcount; i++) { + struct svc_rdma_chunk *chunk; + u32 position, handle, length; + u64 offset; + + p++; /* skip the list discriminator */ + p = xdr_decode_read_segment(p, &position, &handle, + &length, &offset); + if (position != 0) + continue; + + if (pcl_is_empty(pcl)) { + chunk = pcl_alloc_chunk(segcount, position); + if (!chunk) + return false; + pcl_insert_position(pcl, chunk); + } else { + chunk = list_first_entry(&pcl->cl_chunks, + struct svc_rdma_chunk, + ch_list); + } + + pcl_set_read_segment(rctxt, chunk, handle, length, offset); + } + + return true; +} + +/** + * pcl_alloc_read - Construct a parsed chunk list for normal Read chunks + * @rctxt: Ingress receive context + * @p: Start of an un-decoded Read list + * + * Assumptions: + * - The incoming Read list has already been sanity checked. + * - cl_count is already set to the number of segments in + * the un-decoded list. + * - The list might not be in order by position. + * + * Return values: + * %true: Parsed chunk list was successfully constructed, and + * cl_count is updated to be the number of chunks (ie. + * unique position values) in the Read list. + * %false: Memory allocation failed. + * + * TODO: + * - Check for chunk range overlaps + */ +bool pcl_alloc_read(struct svc_rdma_recv_ctxt *rctxt, __be32 *p) +{ + struct svc_rdma_pcl *pcl = &rctxt->rc_read_pcl; + unsigned int i, segcount = pcl->cl_count; + + pcl->cl_count = 0; + for (i = 0; i < segcount; i++) { + struct svc_rdma_chunk *chunk; + u32 position, handle, length; + u64 offset; + + p++; /* skip the list discriminator */ + p = xdr_decode_read_segment(p, &position, &handle, + &length, &offset); + if (position == 0) + continue; + + chunk = pcl_lookup_position(pcl, position); + if (!chunk) { + chunk = pcl_alloc_chunk(segcount, position); + if (!chunk) + return false; + pcl_insert_position(pcl, chunk); + } + + pcl_set_read_segment(rctxt, chunk, handle, length, offset); + } + + return true; +} + +/** + * pcl_alloc_write - Construct a parsed chunk list from a Write list + * @rctxt: Ingress receive context + * @pcl: Parsed chunk list to populate + * @p: Start of an un-decoded Write list + * + * Assumptions: + * - The incoming Write list has already been sanity checked, and + * - cl_count is set to the number of chunks in the un-decoded list. + * + * Return values: + * %true: Parsed chunk list was successfully constructed. + * %false: Memory allocation failed. + */ +bool pcl_alloc_write(struct svc_rdma_recv_ctxt *rctxt, + struct svc_rdma_pcl *pcl, __be32 *p) +{ + struct svc_rdma_segment *segment; + struct svc_rdma_chunk *chunk; + unsigned int i, j; + u32 segcount; + + for (i = 0; i < pcl->cl_count; i++) { + p++; /* skip the list discriminator */ + segcount = be32_to_cpup(p++); + + chunk = pcl_alloc_chunk(segcount, 0); + if (!chunk) + return false; + list_add_tail(&chunk->ch_list, &pcl->cl_chunks); + + for (j = 0; j < segcount; j++) { + segment = &chunk->ch_segments[j]; + p = xdr_decode_rdma_segment(p, &segment->rs_handle, + &segment->rs_length, + &segment->rs_offset); + trace_svcrdma_decode_wseg(&rctxt->rc_cid, chunk, j); + + chunk->ch_length += segment->rs_length; + chunk->ch_segcount++; + } + } + return true; +} + +static int pcl_process_region(const struct xdr_buf *xdr, + unsigned int offset, unsigned int length, + int (*actor)(const struct xdr_buf *, void *), + void *data) +{ + struct xdr_buf subbuf; + + if (!length) + return 0; + if (xdr_buf_subsegment(xdr, &subbuf, offset, length)) + return -EMSGSIZE; + return actor(&subbuf, data); +} + +/** + * pcl_process_nonpayloads - Process non-payload regions inside @xdr + * @pcl: Chunk list to process + * @xdr: xdr_buf to process + * @actor: Function to invoke on each non-payload region + * @data: Arguments for @actor + * + * This mechanism must ignore not only result payloads that were already + * sent via RDMA Write, but also XDR padding for those payloads that + * the upper layer has added. + * + * Assumptions: + * The xdr->len and ch_position fields are aligned to 4-byte multiples. + * + * Returns: + * On success, zero, + * %-EMSGSIZE on XDR buffer overflow, or + * The return value of @actor + */ +int pcl_process_nonpayloads(const struct svc_rdma_pcl *pcl, + const struct xdr_buf *xdr, + int (*actor)(const struct xdr_buf *, void *), + void *data) +{ + struct svc_rdma_chunk *chunk, *next; + unsigned int start; + int ret; + + chunk = pcl_first_chunk(pcl); + + /* No result payloads were generated */ + if (!chunk || !chunk->ch_payload_length) + return actor(xdr, data); + + /* Process the region before the first result payload */ + ret = pcl_process_region(xdr, 0, chunk->ch_position, actor, data); + if (ret < 0) + return ret; + + /* Process the regions between each middle result payload */ + while ((next = pcl_next_chunk(pcl, chunk))) { + if (!next->ch_payload_length) + break; + + start = pcl_chunk_end_offset(chunk); + ret = pcl_process_region(xdr, start, next->ch_position - start, + actor, data); + if (ret < 0) + return ret; + + chunk = next; + } + + /* Process the region after the last result payload */ + start = pcl_chunk_end_offset(chunk); + ret = pcl_process_region(xdr, start, xdr->len - start, actor, data); + if (ret < 0) + return ret; + + return 0; +} diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c index c6ea2903c21a..cbdb71247755 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c +++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c @@ -93,6 +93,7 @@ * (see rdma_read_complete() below). */ +#include <linux/slab.h> #include <linux/spinlock.h> #include <asm/unaligned.h> #include <rdma/ib_verbs.h> @@ -143,6 +144,10 @@ svc_rdma_recv_ctxt_alloc(struct svcxprt_rdma *rdma) goto fail2; svc_rdma_recv_cid_init(rdma, &ctxt->rc_cid); + pcl_init(&ctxt->rc_call_pcl); + pcl_init(&ctxt->rc_read_pcl); + pcl_init(&ctxt->rc_write_pcl); + pcl_init(&ctxt->rc_reply_pcl); ctxt->rc_recv_wr.next = NULL; ctxt->rc_recv_wr.wr_cqe = &ctxt->rc_cqe; @@ -189,8 +194,13 @@ void svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma *rdma) } } -static struct svc_rdma_recv_ctxt * -svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma) +/** + * svc_rdma_recv_ctxt_get - Allocate a recv_ctxt + * @rdma: controlling svcxprt_rdma + * + * Returns a recv_ctxt or (rarely) NULL if none are available. + */ +struct svc_rdma_recv_ctxt *svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma) { struct svc_rdma_recv_ctxt *ctxt; struct llist_node *node; @@ -202,7 +212,6 @@ svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma) out: ctxt->rc_page_count = 0; - ctxt->rc_read_payload_length = 0; return ctxt; out_empty: @@ -226,6 +235,11 @@ void svc_rdma_recv_ctxt_put(struct svcxprt_rdma *rdma, for (i = 0; i < ctxt->rc_page_count; i++) put_page(ctxt->rc_pages[i]); + pcl_free(&ctxt->rc_call_pcl); + pcl_free(&ctxt->rc_read_pcl); + pcl_free(&ctxt->rc_write_pcl); + pcl_free(&ctxt->rc_reply_pcl); + if (!ctxt->rc_temp) llist_add(&ctxt->rc_node, &rdma->sc_recv_ctxts); else @@ -385,100 +399,123 @@ static void svc_rdma_build_arg_xdr(struct svc_rqst *rqstp, arg->len = ctxt->rc_byte_len; } -/* This accommodates the largest possible Write chunk. - */ -#define MAX_BYTES_WRITE_CHUNK ((u32)(RPCSVC_MAXPAGES << PAGE_SHIFT)) - -/* This accommodates the largest possible Position-Zero - * Read chunk or Reply chunk. - */ -#define MAX_BYTES_SPECIAL_CHUNK ((u32)((RPCSVC_MAXPAGES + 2) << PAGE_SHIFT)) - -/* Sanity check the Read list. - * - * Implementation limits: - * - This implementation supports only one Read chunk. +/** + * xdr_count_read_segments - Count number of Read segments in Read list + * @rctxt: Ingress receive context + * @p: Start of an un-decoded Read list * - * Sanity checks: - * - Read list does not overflow Receive buffer. - * - Segment size limited by largest NFS data payload. + * Before allocating anything, ensure the ingress Read list is safe + * to use. * - * The segment count is limited to how many segments can - * fit in the transport header without overflowing the - * buffer. That's about 40 Read segments for a 1KB inline - * threshold. + * The segment count is limited to how many segments can fit in the + * transport header without overflowing the buffer. That's about 40 + * Read segments for a 1KB inline threshold. * * Return values: - * %true: Read list is valid. @rctxt's xdr_stream is updated - * to point to the first byte past the Read list. - * %false: Read list is corrupt. @rctxt's xdr_stream is left - * in an unknown state. + * %true: Read list is valid. @rctxt's xdr_stream is updated to point + * to the first byte past the Read list. rc_read_pcl and + * rc_call_pcl cl_count fields are set to the number of + * Read segments in the list. + * %false: Read list is corrupt. @rctxt's xdr_stream is left in an + * unknown state. */ -static bool xdr_check_read_list(struct svc_rdma_recv_ctxt *rctxt) +static bool xdr_count_read_segments(struct svc_rdma_recv_ctxt *rctxt, __be32 *p) { - u32 position, len; - bool first; - __be32 *p; - - p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p)); - if (!p) - return false; - - len = 0; - first = true; + rctxt->rc_call_pcl.cl_count = 0; + rctxt->rc_read_pcl.cl_count = 0; while (xdr_item_is_present(p)) { + u32 position, handle, length; + u64 offset; + p = xdr_inline_decode(&rctxt->rc_stream, rpcrdma_readseg_maxsz * sizeof(*p)); if (!p) return false; - if (first) { - position = be32_to_cpup(p); - first = false; - } else if (be32_to_cpup(p) != position) { - return false; + xdr_decode_read_segment(p, &position, &handle, + &length, &offset); + if (position) { + if (position & 3) + return false; + ++rctxt->rc_read_pcl.cl_count; + } else { + ++rctxt->rc_call_pcl.cl_count; } - p += 2; - len += be32_to_cpup(p); p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p)); if (!p) return false; } - return len <= MAX_BYTES_SPECIAL_CHUNK; + return true; } -/* The segment count is limited to how many segments can - * fit in the transport header without overflowing the - * buffer. That's about 60 Write segments for a 1KB inline - * threshold. +/* Sanity check the Read list. + * + * Sanity checks: + * - Read list does not overflow Receive buffer. + * - Chunk size limited by largest NFS data payload. + * + * Return values: + * %true: Read list is valid. @rctxt's xdr_stream is updated + * to point to the first byte past the Read list. + * %false: Read list is corrupt. @rctxt's xdr_stream is left + * in an unknown state. */ -static bool xdr_check_write_chunk(struct svc_rdma_recv_ctxt *rctxt, u32 maxlen) +static bool xdr_check_read_list(struct svc_rdma_recv_ctxt *rctxt) { - u32 i, segcount, total; __be32 *p; p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p)); if (!p) return false; - segcount = be32_to_cpup(p); + if (!xdr_count_read_segments(rctxt, p)) + return false; + if (!pcl_alloc_call(rctxt, p)) + return false; + return pcl_alloc_read(rctxt, p); +} - total = 0; - for (i = 0; i < segcount; i++) { - u32 handle, length; - u64 offset; +static bool xdr_check_write_chunk(struct svc_rdma_recv_ctxt *rctxt) +{ + u32 segcount; + __be32 *p; - p = xdr_inline_decode(&rctxt->rc_stream, - rpcrdma_segment_maxsz * sizeof(*p)); - if (!p) - return false; + if (xdr_stream_decode_u32(&rctxt->rc_stream, &segcount)) + return false; - xdr_decode_rdma_segment(p, &handle, &length, &offset); - trace_svcrdma_decode_wseg(handle, length, offset); + /* A bogus segcount causes this buffer overflow check to fail. */ + p = xdr_inline_decode(&rctxt->rc_stream, + segcount * rpcrdma_segment_maxsz * sizeof(*p)); + return p != NULL; +} - total += length; +/** + * xdr_count_write_chunks - Count number of Write chunks in Write list + * @rctxt: Received header and decoding state + * @p: start of an un-decoded Write list + * + * Before allocating anything, ensure the ingress Write list is + * safe to use. + * + * Return values: + * %true: Write list is valid. @rctxt's xdr_stream is updated + * to point to the first byte past the Write list, and + * the number of Write chunks is in rc_write_pcl.cl_count. + * %false: Write list is corrupt. @rctxt's xdr_stream is left + * in an indeterminate state. + */ +static bool xdr_count_write_chunks(struct svc_rdma_recv_ctxt *rctxt, __be32 *p) +{ + rctxt->rc_write_pcl.cl_count = 0; + while (xdr_item_is_present(p)) { + if (!xdr_check_write_chunk(rctxt)) + return false; + ++rctxt->rc_write_pcl.cl_count; + p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p)); + if (!p) + return false; } - return total <= maxlen; + return true; } /* Sanity check the Write list. @@ -498,24 +535,18 @@ static bool xdr_check_write_chunk(struct svc_rdma_recv_ctxt *rctxt, u32 maxlen) */ static bool xdr_check_write_list(struct svc_rdma_recv_ctxt *rctxt) { - u32 chcount = 0; __be32 *p; p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p)); if (!p) return false; - rctxt->rc_write_list = p; - while (xdr_item_is_present(p)) { - if (!xdr_check_write_chunk(rctxt, MAX_BYTES_WRITE_CHUNK)) - return false; - ++chcount; - p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p)); - if (!p) - return false; - } - if (!chcount) - rctxt->rc_write_list = NULL; - return chcount < 2; + if (!xdr_count_write_chunks(rctxt, p)) + return false; + if (!pcl_alloc_write(rctxt, &rctxt->rc_write_pcl, p)) + return false; + + rctxt->rc_cur_result_payload = pcl_first_chunk(&rctxt->rc_write_pcl); + return true; } /* Sanity check the Reply chunk. @@ -537,13 +568,14 @@ static bool xdr_check_reply_chunk(struct svc_rdma_recv_ctxt *rctxt) p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p)); if (!p) return false; - rctxt->rc_reply_chunk = NULL; - if (xdr_item_is_present(p)) { - if (!xdr_check_write_chunk(rctxt, MAX_BYTES_SPECIAL_CHUNK)) - return false; - rctxt->rc_reply_chunk = p; - } - return true; + + if (!xdr_item_is_present(p)) + return true; + if (!xdr_check_write_chunk(rctxt)) + return false; + + rctxt->rc_reply_pcl.cl_count = 1; + return pcl_alloc_write(rctxt, &rctxt->rc_reply_pcl, p); } /* RPC-over-RDMA Version One private extension: Remote Invalidation. @@ -552,60 +584,53 @@ static bool xdr_check_reply_chunk(struct svc_rdma_recv_ctxt *rctxt) * * If there is exactly one distinct R_key in the received transport * header, set rc_inv_rkey to that R_key. Otherwise, set it to zero. - * - * Perform this operation while the received transport header is - * still in the CPU cache. */ static void svc_rdma_get_inv_rkey(struct svcxprt_rdma *rdma, struct svc_rdma_recv_ctxt *ctxt) { - __be32 inv_rkey, *p; - u32 i, segcount; + struct svc_rdma_segment *segment; + struct svc_rdma_chunk *chunk; + u32 inv_rkey; ctxt->rc_inv_rkey = 0; if (!rdma->sc_snd_w_inv) return; - inv_rkey = xdr_zero; - p = ctxt->rc_recv_buf; - p += rpcrdma_fixed_maxsz; - - /* Read list */ - while (xdr_item_is_present(p++)) { - p++; /* position */ - if (inv_rkey == xdr_zero) - inv_rkey = *p; - else if (inv_rkey != *p) - return; - p += 4; + inv_rkey = 0; + pcl_for_each_chunk(chunk, &ctxt->rc_call_pcl) { + pcl_for_each_segment(segment, chunk) { + if (inv_rkey == 0) + inv_rkey = segment->rs_handle; + else if (inv_rkey != segment->rs_handle) + return; + } } - - /* Write list */ - while (xdr_item_is_present(p++)) { - segcount = be32_to_cpup(p++); - for (i = 0; i < segcount; i++) { - if (inv_rkey == xdr_zero) - inv_rkey = *p; - else if (inv_rkey != *p) + pcl_for_each_chunk(chunk, &ctxt->rc_read_pcl) { + pcl_for_each_segment(segment, chunk) { + if (inv_rkey == 0) + inv_rkey = segment->rs_handle; + else if (inv_rkey != segment->rs_handle) return; - p += 4; } } - - /* Reply chunk */ - if (xdr_item_is_present(p++)) { - segcount = be32_to_cpup(p++); - for (i = 0; i < segcount; i++) { - if (inv_rkey == xdr_zero) - inv_rkey = *p; - else if (inv_rkey != *p) + pcl_for_each_chunk(chunk, &ctxt->rc_write_pcl) { + pcl_for_each_segment(segment, chunk) { + if (inv_rkey == 0) + inv_rkey = segment->rs_handle; + else if (inv_rkey != segment->rs_handle) return; - p += 4; } } - - ctxt->rc_inv_rkey = be32_to_cpu(inv_rkey); + pcl_for_each_chunk(chunk, &ctxt->rc_reply_pcl) { + pcl_for_each_segment(segment, chunk) { + if (inv_rkey == 0) + inv_rkey = segment->rs_handle; + else if (inv_rkey != segment->rs_handle) + return; + } + } + ctxt->rc_inv_rkey = inv_rkey; } /** @@ -641,7 +666,8 @@ static int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg, if (*p != rpcrdma_version) goto out_version; p += 2; - switch (*p) { + rctxt->rc_msgtype = *p; + switch (rctxt->rc_msgtype) { case rdma_msg: break; case rdma_nomsg: @@ -735,30 +761,28 @@ static void svc_rdma_send_error(struct svcxprt_rdma *rdma, * the RPC/RDMA header small and fixed in size, so it is * straightforward to check the RPC header's direction field. */ -static bool svc_rdma_is_backchannel_reply(struct svc_xprt *xprt, - __be32 *rdma_resp) +static bool svc_rdma_is_reverse_direction_reply(struct svc_xprt *xprt, + struct svc_rdma_recv_ctxt *rctxt) { - __be32 *p; + __be32 *p = rctxt->rc_recv_buf; if (!xprt->xpt_bc_xprt) return false; - p = rdma_resp + 3; - if (*p++ != rdma_msg) + if (rctxt->rc_msgtype != rdma_msg) return false; - if (*p++ != xdr_zero) + if (!pcl_is_empty(&rctxt->rc_call_pcl)) return false; - if (*p++ != xdr_zero) + if (!pcl_is_empty(&rctxt->rc_read_pcl)) return false; - if (*p++ != xdr_zero) + if (!pcl_is_empty(&rctxt->rc_write_pcl)) return false; - - /* XID sanity */ - if (*p++ != *rdma_resp) + if (!pcl_is_empty(&rctxt->rc_reply_pcl)) return false; - /* call direction */ - if (*p == cpu_to_be32(RPC_CALL)) + + /* RPC call direction */ + if (*(p + 8) == cpu_to_be32(RPC_CALL)) return false; return true; @@ -800,7 +824,6 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) struct svcxprt_rdma *rdma_xprt = container_of(xprt, struct svcxprt_rdma, sc_xprt); struct svc_rdma_recv_ctxt *ctxt; - __be32 *p; int ret; rqstp->rq_xprt_ctxt = NULL; @@ -833,7 +856,6 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) rqstp->rq_respages = rqstp->rq_pages; rqstp->rq_next_page = rqstp->rq_respages; - p = (__be32 *)rqstp->rq_arg.head[0].iov_base; ret = svc_rdma_xdr_decode_req(&rqstp->rq_arg, ctxt); if (ret < 0) goto out_err; @@ -841,14 +863,14 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) goto out_drop; rqstp->rq_xprt_hlen = ret; - if (svc_rdma_is_backchannel_reply(xprt, p)) + if (svc_rdma_is_reverse_direction_reply(xprt, ctxt)) goto out_backchannel; svc_rdma_get_inv_rkey(rdma_xprt, ctxt); - p += rpcrdma_fixed_maxsz; - if (*p != xdr_zero) - goto out_readchunk; + if (!pcl_is_empty(&ctxt->rc_read_pcl) || + !pcl_is_empty(&ctxt->rc_call_pcl)) + goto out_readlist; complete: rqstp->rq_xprt_ctxt = ctxt; @@ -856,10 +878,10 @@ complete: svc_xprt_copy_addrs(rqstp, xprt); return rqstp->rq_arg.len; -out_readchunk: - ret = svc_rdma_recv_read_chunk(rdma_xprt, rqstp, ctxt, p); +out_readlist: + ret = svc_rdma_process_read_list(rdma_xprt, rqstp, ctxt); if (ret < 0) - goto out_postfail; + goto out_readfail; return 0; out_err: @@ -867,7 +889,7 @@ out_err: svc_rdma_recv_ctxt_put(rdma_xprt, ctxt); return 0; -out_postfail: +out_readfail: if (ret == -EINVAL) svc_rdma_send_error(rdma_xprt, ctxt, ret); svc_rdma_recv_ctxt_put(rdma_xprt, ctxt); diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c index 80a0c0e87590..0b63e1321d74 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_rw.c +++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c @@ -190,14 +190,14 @@ static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc, * - Stores arguments for the SGL constructor functions */ struct svc_rdma_write_info { + const struct svc_rdma_chunk *wi_chunk; + /* write state of this chunk */ unsigned int wi_seg_off; unsigned int wi_seg_no; - unsigned int wi_nsegs; - __be32 *wi_segs; /* SGL constructor arguments */ - struct xdr_buf *wi_xdr; + const struct xdr_buf *wi_xdr; unsigned char *wi_base; unsigned int wi_next_off; @@ -205,7 +205,8 @@ struct svc_rdma_write_info { }; static struct svc_rdma_write_info * -svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, __be32 *chunk) +svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, + const struct svc_rdma_chunk *chunk) { struct svc_rdma_write_info *info; @@ -213,10 +214,9 @@ svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, __be32 *chunk) if (!info) return info; + info->wi_chunk = chunk; info->wi_seg_off = 0; info->wi_seg_no = 0; - info->wi_nsegs = be32_to_cpup(++chunk); - info->wi_segs = ++chunk; svc_rdma_cc_init(rdma, &info->wi_cc); info->wi_cc.cc_cqe.done = svc_rdma_write_done; return info; @@ -258,11 +258,11 @@ static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc) /* State for pulling a Read chunk. */ struct svc_rdma_read_info { + struct svc_rqst *ri_rqst; struct svc_rdma_recv_ctxt *ri_readctxt; - unsigned int ri_position; unsigned int ri_pageno; unsigned int ri_pageoff; - unsigned int ri_chunklen; + unsigned int ri_totalbytes; struct svc_rdma_chunk_ctxt ri_cc; }; @@ -358,7 +358,6 @@ static int svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt *cc) do { if (atomic_sub_return(cc->cc_sqecount, &rdma->sc_sq_avail) > 0) { - trace_svcrdma_post_chunk(&cc->cc_cid, cc->cc_sqecount); ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr); if (ret) break; @@ -405,7 +404,7 @@ static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info, struct svc_rdma_rw_ctxt *ctxt) { unsigned int sge_no, sge_bytes, page_off, page_no; - struct xdr_buf *xdr = info->wi_xdr; + const struct xdr_buf *xdr = info->wi_xdr; struct scatterlist *sg; struct page **page; @@ -443,40 +442,36 @@ svc_rdma_build_writes(struct svc_rdma_write_info *info, { struct svc_rdma_chunk_ctxt *cc = &info->wi_cc; struct svcxprt_rdma *rdma = cc->cc_rdma; + const struct svc_rdma_segment *seg; struct svc_rdma_rw_ctxt *ctxt; - __be32 *seg; int ret; - seg = info->wi_segs + info->wi_seg_no * rpcrdma_segment_maxsz; do { unsigned int write_len; - u32 handle, length; u64 offset; - if (info->wi_seg_no >= info->wi_nsegs) + seg = &info->wi_chunk->ch_segments[info->wi_seg_no]; + if (!seg) goto out_overflow; - xdr_decode_rdma_segment(seg, &handle, &length, &offset); - offset += info->wi_seg_off; - - write_len = min(remaining, length - info->wi_seg_off); + write_len = min(remaining, seg->rs_length - info->wi_seg_off); + if (!write_len) + goto out_overflow; ctxt = svc_rdma_get_rw_ctxt(rdma, (write_len >> PAGE_SHIFT) + 2); if (!ctxt) return -ENOMEM; constructor(info, write_len, ctxt); - ret = svc_rdma_rw_ctx_init(rdma, ctxt, offset, handle, + offset = seg->rs_offset + info->wi_seg_off; + ret = svc_rdma_rw_ctx_init(rdma, ctxt, offset, seg->rs_handle, DMA_TO_DEVICE); if (ret < 0) return -EIO; - trace_svcrdma_send_wseg(handle, write_len, offset); - list_add(&ctxt->rw_list, &cc->cc_rwctxts); cc->cc_sqecount += ret; - if (write_len == length - info->wi_seg_off) { - seg += 4; + if (write_len == seg->rs_length - info->wi_seg_off) { info->wi_seg_no++; info->wi_seg_off = 0; } else { @@ -489,31 +484,46 @@ svc_rdma_build_writes(struct svc_rdma_write_info *info, out_overflow: trace_svcrdma_small_wrch_err(rdma, remaining, info->wi_seg_no, - info->wi_nsegs); + info->wi_chunk->ch_segcount); return -E2BIG; } -/* Send one of an xdr_buf's kvecs by itself. To send a Reply - * chunk, the whole RPC Reply is written back to the client. - * This function writes either the head or tail of the xdr_buf - * containing the Reply. +/** + * svc_rdma_iov_write - Construct RDMA Writes from an iov + * @info: pointer to write arguments + * @iov: kvec to write + * + * Returns: + * On succes, returns zero + * %-E2BIG if the client-provided Write chunk is too small + * %-ENOMEM if a resource has been exhausted + * %-EIO if an rdma-rw error occurred */ -static int svc_rdma_send_xdr_kvec(struct svc_rdma_write_info *info, - struct kvec *vec) +static int svc_rdma_iov_write(struct svc_rdma_write_info *info, + const struct kvec *iov) { - info->wi_base = vec->iov_base; + info->wi_base = iov->iov_base; return svc_rdma_build_writes(info, svc_rdma_vec_to_sg, - vec->iov_len); + iov->iov_len); } -/* Send an xdr_buf's page list by itself. A Write chunk is just - * the page list. A Reply chunk is @xdr's head, page list, and - * tail. This function is shared between the two types of chunk. +/** + * svc_rdma_pages_write - Construct RDMA Writes from pages + * @info: pointer to write arguments + * @xdr: xdr_buf with pages to write + * @offset: offset into the content of @xdr + * @length: number of bytes to write + * + * Returns: + * On succes, returns zero + * %-E2BIG if the client-provided Write chunk is too small + * %-ENOMEM if a resource has been exhausted + * %-EIO if an rdma-rw error occurred */ -static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info, - struct xdr_buf *xdr, - unsigned int offset, - unsigned long length) +static int svc_rdma_pages_write(struct svc_rdma_write_info *info, + const struct xdr_buf *xdr, + unsigned int offset, + unsigned long length) { info->wi_xdr = xdr; info->wi_next_off = offset - xdr->head[0].iov_len; @@ -522,12 +532,48 @@ static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info, } /** + * svc_rdma_xb_write - Construct RDMA Writes to write an xdr_buf + * @xdr: xdr_buf to write + * @data: pointer to write arguments + * + * Returns: + * On succes, returns zero + * %-E2BIG if the client-provided Write chunk is too small + * %-ENOMEM if a resource has been exhausted + * %-EIO if an rdma-rw error occurred + */ +static int svc_rdma_xb_write(const struct xdr_buf *xdr, void *data) +{ + struct svc_rdma_write_info *info = data; + int ret; + + if (xdr->head[0].iov_len) { + ret = svc_rdma_iov_write(info, &xdr->head[0]); + if (ret < 0) + return ret; + } + + if (xdr->page_len) { + ret = svc_rdma_pages_write(info, xdr, xdr->head[0].iov_len, + xdr->page_len); + if (ret < 0) + return ret; + } + + if (xdr->tail[0].iov_len) { + ret = svc_rdma_iov_write(info, &xdr->tail[0]); + if (ret < 0) + return ret; + } + + return xdr->len; +} + +/** * svc_rdma_send_write_chunk - Write all segments in a Write chunk * @rdma: controlling RDMA transport - * @wr_ch: Write chunk provided by client + * @chunk: Write chunk provided by the client * @xdr: xdr_buf containing the data payload - * @offset: payload's byte offset in @xdr - * @length: size of payload, in bytes * * Returns a non-negative number of bytes the chunk consumed, or * %-E2BIG if the payload was larger than the Write chunk, @@ -536,30 +582,28 @@ static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info, * %-ENOTCONN if posting failed (connection is lost), * %-EIO if rdma_rw initialization failed (DMA mapping, etc). */ -int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch, - struct xdr_buf *xdr, - unsigned int offset, unsigned long length) +int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, + const struct svc_rdma_chunk *chunk, + const struct xdr_buf *xdr) { struct svc_rdma_write_info *info; + struct svc_rdma_chunk_ctxt *cc; int ret; - if (!length) - return 0; - - info = svc_rdma_write_info_alloc(rdma, wr_ch); + info = svc_rdma_write_info_alloc(rdma, chunk); if (!info) return -ENOMEM; + cc = &info->wi_cc; - ret = svc_rdma_send_xdr_pagelist(info, xdr, offset, length); - if (ret < 0) + ret = svc_rdma_xb_write(xdr, info); + if (ret != xdr->len) goto out_err; - ret = svc_rdma_post_chunk_ctxt(&info->wi_cc); + trace_svcrdma_post_write_chunk(&cc->cc_cid, cc->cc_sqecount); + ret = svc_rdma_post_chunk_ctxt(cc); if (ret < 0) goto out_err; - - trace_svcrdma_send_write_chunk(xdr->page_len); - return length; + return xdr->len; out_err: svc_rdma_write_info_free(info); @@ -581,62 +625,62 @@ out_err: */ int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, const struct svc_rdma_recv_ctxt *rctxt, - struct xdr_buf *xdr) + const struct xdr_buf *xdr) { struct svc_rdma_write_info *info; - int consumed, ret; + struct svc_rdma_chunk_ctxt *cc; + struct svc_rdma_chunk *chunk; + int ret; - info = svc_rdma_write_info_alloc(rdma, rctxt->rc_reply_chunk); + if (pcl_is_empty(&rctxt->rc_reply_pcl)) + return 0; + + chunk = pcl_first_chunk(&rctxt->rc_reply_pcl); + info = svc_rdma_write_info_alloc(rdma, chunk); if (!info) return -ENOMEM; + cc = &info->wi_cc; - ret = svc_rdma_send_xdr_kvec(info, &xdr->head[0]); + ret = pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr, + svc_rdma_xb_write, info); if (ret < 0) goto out_err; - consumed = xdr->head[0].iov_len; - - /* Send the page list in the Reply chunk only if the - * client did not provide Write chunks. - */ - if (!rctxt->rc_write_list && xdr->page_len) { - ret = svc_rdma_send_xdr_pagelist(info, xdr, - xdr->head[0].iov_len, - xdr->page_len); - if (ret < 0) - goto out_err; - consumed += xdr->page_len; - } - - if (xdr->tail[0].iov_len) { - ret = svc_rdma_send_xdr_kvec(info, &xdr->tail[0]); - if (ret < 0) - goto out_err; - consumed += xdr->tail[0].iov_len; - } - ret = svc_rdma_post_chunk_ctxt(&info->wi_cc); + trace_svcrdma_post_reply_chunk(&cc->cc_cid, cc->cc_sqecount); + ret = svc_rdma_post_chunk_ctxt(cc); if (ret < 0) goto out_err; - trace_svcrdma_send_reply_chunk(consumed); - return consumed; + return xdr->len; out_err: svc_rdma_write_info_free(info); return ret; } +/** + * svc_rdma_build_read_segment - Build RDMA Read WQEs to pull one RDMA segment + * @info: context for ongoing I/O + * @segment: co-ordinates of remote memory to be read + * + * Returns: + * %0: the Read WR chain was constructed successfully + * %-EINVAL: there were not enough rq_pages to finish + * %-ENOMEM: allocating a local resources failed + * %-EIO: a DMA mapping error occurred + */ static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info, - struct svc_rqst *rqstp, - u32 rkey, u32 len, u64 offset) + const struct svc_rdma_segment *segment) { struct svc_rdma_recv_ctxt *head = info->ri_readctxt; struct svc_rdma_chunk_ctxt *cc = &info->ri_cc; + struct svc_rqst *rqstp = info->ri_rqst; struct svc_rdma_rw_ctxt *ctxt; - unsigned int sge_no, seg_len; + unsigned int sge_no, seg_len, len; struct scatterlist *sg; int ret; + len = segment->rs_length; sge_no = PAGE_ALIGN(info->ri_pageoff + len) >> PAGE_SHIFT; ctxt = svc_rdma_get_rw_ctxt(cc->cc_rdma, sge_no); if (!ctxt) @@ -670,8 +714,8 @@ static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info, goto out_overrun; } - ret = svc_rdma_rw_ctx_init(cc->cc_rdma, ctxt, offset, rkey, - DMA_FROM_DEVICE); + ret = svc_rdma_rw_ctx_init(cc->cc_rdma, ctxt, segment->rs_offset, + segment->rs_handle, DMA_FROM_DEVICE); if (ret < 0) return -EIO; @@ -684,54 +728,177 @@ out_overrun: return -EINVAL; } -/* Walk the segments in the Read chunk starting at @p and construct - * RDMA Read operations to pull the chunk to the server. +/** + * svc_rdma_build_read_chunk - Build RDMA Read WQEs to pull one RDMA chunk + * @info: context for ongoing I/O + * @chunk: Read chunk to pull + * + * Return values: + * %0: the Read WR chain was constructed successfully + * %-EINVAL: there were not enough resources to finish + * %-ENOMEM: allocating a local resources failed + * %-EIO: a DMA mapping error occurred */ -static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp, - struct svc_rdma_read_info *info, - __be32 *p) +static int svc_rdma_build_read_chunk(struct svc_rdma_read_info *info, + const struct svc_rdma_chunk *chunk) { + const struct svc_rdma_segment *segment; int ret; ret = -EINVAL; - info->ri_chunklen = 0; - while (*p++ != xdr_zero && be32_to_cpup(p++) == info->ri_position) { - u32 handle, length; - u64 offset; + pcl_for_each_segment(segment, chunk) { + ret = svc_rdma_build_read_segment(info, segment); + if (ret < 0) + break; + info->ri_totalbytes += segment->rs_length; + } + return ret; +} + +/** + * svc_rdma_copy_inline_range - Copy part of the inline content into pages + * @info: context for RDMA Reads + * @offset: offset into the Receive buffer of region to copy + * @remaining: length of region to copy + * + * Take a page at a time from rqstp->rq_pages and copy the inline + * content from the Receive buffer into that page. Update + * info->ri_pageno and info->ri_pageoff so that the next RDMA Read + * result will land contiguously with the copied content. + * + * Return values: + * %0: Inline content was successfully copied + * %-EINVAL: offset or length was incorrect + */ +static int svc_rdma_copy_inline_range(struct svc_rdma_read_info *info, + unsigned int offset, + unsigned int remaining) +{ + struct svc_rdma_recv_ctxt *head = info->ri_readctxt; + unsigned char *dst, *src = head->rc_recv_buf; + struct svc_rqst *rqstp = info->ri_rqst; + unsigned int page_no, numpages; + + numpages = PAGE_ALIGN(info->ri_pageoff + remaining) >> PAGE_SHIFT; + for (page_no = 0; page_no < numpages; page_no++) { + unsigned int page_len; + + page_len = min_t(unsigned int, remaining, + PAGE_SIZE - info->ri_pageoff); + + head->rc_arg.pages[info->ri_pageno] = + rqstp->rq_pages[info->ri_pageno]; + if (!info->ri_pageoff) + head->rc_page_count++; + + dst = page_address(head->rc_arg.pages[info->ri_pageno]); + memcpy(dst + info->ri_pageno, src + offset, page_len); + + info->ri_totalbytes += page_len; + info->ri_pageoff += page_len; + if (info->ri_pageoff == PAGE_SIZE) { + info->ri_pageno++; + info->ri_pageoff = 0; + } + remaining -= page_len; + offset += page_len; + } + + return -EINVAL; +} + +/** + * svc_rdma_read_multiple_chunks - Construct RDMA Reads to pull data item Read chunks + * @info: context for RDMA Reads + * + * The chunk data lands in head->rc_arg as a series of contiguous pages, + * like an incoming TCP call. + * + * Return values: + * %0: RDMA Read WQEs were successfully built + * %-EINVAL: client provided too many chunks or segments, + * %-ENOMEM: rdma_rw context pool was exhausted, + * %-ENOTCONN: posting failed (connection is lost), + * %-EIO: rdma_rw initialization failed (DMA mapping, etc). + */ +static noinline int svc_rdma_read_multiple_chunks(struct svc_rdma_read_info *info) +{ + struct svc_rdma_recv_ctxt *head = info->ri_readctxt; + const struct svc_rdma_pcl *pcl = &head->rc_read_pcl; + struct svc_rdma_chunk *chunk, *next; + struct xdr_buf *buf = &head->rc_arg; + unsigned int start, length; + int ret; - p = xdr_decode_rdma_segment(p, &handle, &length, &offset); - ret = svc_rdma_build_read_segment(info, rqstp, handle, length, - offset); + start = 0; + chunk = pcl_first_chunk(pcl); + length = chunk->ch_position; + ret = svc_rdma_copy_inline_range(info, start, length); + if (ret < 0) + return ret; + + pcl_for_each_chunk(chunk, pcl) { + ret = svc_rdma_build_read_chunk(info, chunk); if (ret < 0) + return ret; + + next = pcl_next_chunk(pcl, chunk); + if (!next) break; - trace_svcrdma_send_rseg(handle, length, offset); - info->ri_chunklen += length; + start += length; + length = next->ch_position - info->ri_totalbytes; + ret = svc_rdma_copy_inline_range(info, start, length); + if (ret < 0) + return ret; } - return ret; + start += length; + length = head->rc_byte_len - start; + ret = svc_rdma_copy_inline_range(info, start, length); + if (ret < 0) + return ret; + + buf->len += info->ri_totalbytes; + buf->buflen += info->ri_totalbytes; + + head->rc_hdr_count = 1; + buf->head[0].iov_base = page_address(head->rc_pages[0]); + buf->head[0].iov_len = min_t(size_t, PAGE_SIZE, info->ri_totalbytes); + buf->page_len = info->ri_totalbytes - buf->head[0].iov_len; + return 0; } -/* Construct RDMA Reads to pull over a normal Read chunk. The chunk - * data lands in the page list of head->rc_arg.pages. +/** + * svc_rdma_read_data_item - Construct RDMA Reads to pull data item Read chunks + * @info: context for RDMA Reads + * + * The chunk data lands in the page list of head->rc_arg.pages. * * Currently NFSD does not look at the head->rc_arg.tail[0] iovec. * Therefore, XDR round-up of the Read chunk and trailing * inline content must both be added at the end of the pagelist. + * + * Return values: + * %0: RDMA Read WQEs were successfully built + * %-EINVAL: client provided too many chunks or segments, + * %-ENOMEM: rdma_rw context pool was exhausted, + * %-ENOTCONN: posting failed (connection is lost), + * %-EIO: rdma_rw initialization failed (DMA mapping, etc). */ -static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp, - struct svc_rdma_read_info *info, - __be32 *p) +static int svc_rdma_read_data_item(struct svc_rdma_read_info *info) { struct svc_rdma_recv_ctxt *head = info->ri_readctxt; + struct xdr_buf *buf = &head->rc_arg; + struct svc_rdma_chunk *chunk; + unsigned int length; int ret; - ret = svc_rdma_build_read_chunk(rqstp, info, p); + chunk = pcl_first_chunk(&head->rc_read_pcl); + ret = svc_rdma_build_read_chunk(info, chunk); if (ret < 0) goto out; - trace_svcrdma_send_read_chunk(info->ri_chunklen, info->ri_position); - head->rc_hdr_count = 0; /* Split the Receive buffer between the head and tail @@ -739,11 +906,9 @@ static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp, * chunk is not included in either the pagelist or in * the tail. */ - head->rc_arg.tail[0].iov_base = - head->rc_arg.head[0].iov_base + info->ri_position; - head->rc_arg.tail[0].iov_len = - head->rc_arg.head[0].iov_len - info->ri_position; - head->rc_arg.head[0].iov_len = info->ri_position; + buf->tail[0].iov_base = buf->head[0].iov_base + chunk->ch_position; + buf->tail[0].iov_len = buf->head[0].iov_len - chunk->ch_position; + buf->head[0].iov_len = chunk->ch_position; /* Read chunk may need XDR roundup (see RFC 8166, s. 3.4.5.2). * @@ -754,50 +919,149 @@ static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp, * Currently these chunks always start at page offset 0, * thus the rounded-up length never crosses a page boundary. */ - info->ri_chunklen = XDR_QUADLEN(info->ri_chunklen) << 2; - - head->rc_arg.page_len = info->ri_chunklen; - head->rc_arg.len += info->ri_chunklen; - head->rc_arg.buflen += info->ri_chunklen; + length = XDR_QUADLEN(info->ri_totalbytes) << 2; + buf->page_len = length; + buf->len += length; + buf->buflen += length; out: return ret; } -/* Construct RDMA Reads to pull over a Position Zero Read chunk. - * The start of the data lands in the first page just after - * the Transport header, and the rest lands in the page list of +/** + * svc_rdma_read_chunk_range - Build RDMA Read WQEs for portion of a chunk + * @info: context for RDMA Reads + * @chunk: parsed Call chunk to pull + * @offset: offset of region to pull + * @length: length of region to pull + * + * Return values: + * %0: RDMA Read WQEs were successfully built + * %-EINVAL: there were not enough resources to finish + * %-ENOMEM: rdma_rw context pool was exhausted, + * %-ENOTCONN: posting failed (connection is lost), + * %-EIO: rdma_rw initialization failed (DMA mapping, etc). + */ +static int svc_rdma_read_chunk_range(struct svc_rdma_read_info *info, + const struct svc_rdma_chunk *chunk, + unsigned int offset, unsigned int length) +{ + const struct svc_rdma_segment *segment; + int ret; + + ret = -EINVAL; + pcl_for_each_segment(segment, chunk) { + struct svc_rdma_segment dummy; + + if (offset > segment->rs_length) { + offset -= segment->rs_length; + continue; + } + + dummy.rs_handle = segment->rs_handle; + dummy.rs_length = min_t(u32, length, segment->rs_length) - offset; + dummy.rs_offset = segment->rs_offset + offset; + + ret = svc_rdma_build_read_segment(info, &dummy); + if (ret < 0) + break; + + info->ri_totalbytes += dummy.rs_length; + length -= dummy.rs_length; + offset = 0; + } + return ret; +} + +/** + * svc_rdma_read_call_chunk - Build RDMA Read WQEs to pull a Long Message + * @info: context for RDMA Reads + * + * Return values: + * %0: RDMA Read WQEs were successfully built + * %-EINVAL: there were not enough resources to finish + * %-ENOMEM: rdma_rw context pool was exhausted, + * %-ENOTCONN: posting failed (connection is lost), + * %-EIO: rdma_rw initialization failed (DMA mapping, etc). + */ +static int svc_rdma_read_call_chunk(struct svc_rdma_read_info *info) +{ + struct svc_rdma_recv_ctxt *head = info->ri_readctxt; + const struct svc_rdma_chunk *call_chunk = + pcl_first_chunk(&head->rc_call_pcl); + const struct svc_rdma_pcl *pcl = &head->rc_read_pcl; + struct svc_rdma_chunk *chunk, *next; + unsigned int start, length; + int ret; + + if (pcl_is_empty(pcl)) + return svc_rdma_build_read_chunk(info, call_chunk); + + start = 0; + chunk = pcl_first_chunk(pcl); + length = chunk->ch_position; + ret = svc_rdma_read_chunk_range(info, call_chunk, start, length); + if (ret < 0) + return ret; + + pcl_for_each_chunk(chunk, pcl) { + ret = svc_rdma_build_read_chunk(info, chunk); + if (ret < 0) + return ret; + + next = pcl_next_chunk(pcl, chunk); + if (!next) + break; + + start += length; + length = next->ch_position - info->ri_totalbytes; + ret = svc_rdma_read_chunk_range(info, call_chunk, + start, length); + if (ret < 0) + return ret; + } + + start += length; + length = call_chunk->ch_length - start; + return svc_rdma_read_chunk_range(info, call_chunk, start, length); +} + +/** + * svc_rdma_read_special - Build RDMA Read WQEs to pull a Long Message + * @info: context for RDMA Reads + * + * The start of the data lands in the first page just after the + * Transport header, and the rest lands in the page list of * head->rc_arg.pages. * * Assumptions: - * - A PZRC has an XDR-aligned length (no implicit round-up). - * - There can be no trailing inline content (IOW, we assume - * a PZRC is never sent in an RDMA_MSG message, though it's - * allowed by spec). + * - A PZRC is never sent in an RDMA_MSG message, though it's + * allowed by spec. + * + * Return values: + * %0: RDMA Read WQEs were successfully built + * %-EINVAL: client provided too many chunks or segments, + * %-ENOMEM: rdma_rw context pool was exhausted, + * %-ENOTCONN: posting failed (connection is lost), + * %-EIO: rdma_rw initialization failed (DMA mapping, etc). */ -static int svc_rdma_build_pz_read_chunk(struct svc_rqst *rqstp, - struct svc_rdma_read_info *info, - __be32 *p) +static noinline int svc_rdma_read_special(struct svc_rdma_read_info *info) { struct svc_rdma_recv_ctxt *head = info->ri_readctxt; + struct xdr_buf *buf = &head->rc_arg; int ret; - ret = svc_rdma_build_read_chunk(rqstp, info, p); + ret = svc_rdma_read_call_chunk(info); if (ret < 0) goto out; - trace_svcrdma_send_pzr(info->ri_chunklen); - - head->rc_arg.len += info->ri_chunklen; - head->rc_arg.buflen += info->ri_chunklen; + buf->len += info->ri_totalbytes; + buf->buflen += info->ri_totalbytes; head->rc_hdr_count = 1; - head->rc_arg.head[0].iov_base = page_address(head->rc_pages[0]); - head->rc_arg.head[0].iov_len = min_t(size_t, PAGE_SIZE, - info->ri_chunklen); - - head->rc_arg.page_len = info->ri_chunklen - - head->rc_arg.head[0].iov_len; + buf->head[0].iov_base = page_address(head->rc_pages[0]); + buf->head[0].iov_len = min_t(size_t, PAGE_SIZE, info->ri_totalbytes); + buf->page_len = info->ri_totalbytes - buf->head[0].iov_len; out: return ret; @@ -824,26 +1088,34 @@ static void svc_rdma_save_io_pages(struct svc_rqst *rqstp, } /** - * svc_rdma_recv_read_chunk - Pull a Read chunk from the client + * svc_rdma_process_read_list - Pull list of Read chunks from the client * @rdma: controlling RDMA transport * @rqstp: set of pages to use as Read sink buffers * @head: pages under I/O collect here - * @p: pointer to start of Read chunk * - * Returns: - * %0 if all needed RDMA Reads were posted successfully, - * %-EINVAL if client provided too many segments, - * %-ENOMEM if rdma_rw context pool was exhausted, - * %-ENOTCONN if posting failed (connection is lost), - * %-EIO if rdma_rw initialization failed (DMA mapping, etc). + * The RPC/RDMA protocol assumes that the upper layer's XDR decoders + * pull each Read chunk as they decode an incoming RPC message. * - * Assumptions: - * - All Read segments in @p have the same Position value. + * On Linux, however, the server needs to have a fully-constructed RPC + * message in rqstp->rq_arg when there is a positive return code from + * ->xpo_recvfrom. So the Read list is safety-checked immediately when + * it is received, then here the whole Read list is pulled all at once. + * The ingress RPC message is fully reconstructed once all associated + * RDMA Reads have completed. + * + * Return values: + * %1: all needed RDMA Reads were posted successfully, + * %-EINVAL: client provided too many chunks or segments, + * %-ENOMEM: rdma_rw context pool was exhausted, + * %-ENOTCONN: posting failed (connection is lost), + * %-EIO: rdma_rw initialization failed (DMA mapping, etc). */ -int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp, - struct svc_rdma_recv_ctxt *head, __be32 *p) +int svc_rdma_process_read_list(struct svcxprt_rdma *rdma, + struct svc_rqst *rqstp, + struct svc_rdma_recv_ctxt *head) { struct svc_rdma_read_info *info; + struct svc_rdma_chunk_ctxt *cc; int ret; /* The request (with page list) is constructed in @@ -861,23 +1133,29 @@ int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp, info = svc_rdma_read_info_alloc(rdma); if (!info) return -ENOMEM; + cc = &info->ri_cc; + info->ri_rqst = rqstp; info->ri_readctxt = head; info->ri_pageno = 0; info->ri_pageoff = 0; - - info->ri_position = be32_to_cpup(p + 1); - if (info->ri_position) - ret = svc_rdma_build_normal_read_chunk(rqstp, info, p); - else - ret = svc_rdma_build_pz_read_chunk(rqstp, info, p); + info->ri_totalbytes = 0; + + if (pcl_is_empty(&head->rc_call_pcl)) { + if (head->rc_read_pcl.cl_count == 1) + ret = svc_rdma_read_data_item(info); + else + ret = svc_rdma_read_multiple_chunks(info); + } else + ret = svc_rdma_read_special(info); if (ret < 0) goto out_err; - ret = svc_rdma_post_chunk_ctxt(&info->ri_cc); + trace_svcrdma_post_read_chunk(&cc->cc_cid, cc->cc_sqecount); + ret = svc_rdma_post_chunk_ctxt(cc); if (ret < 0) goto out_err; svc_rdma_save_io_pages(rqstp, 0, head->rc_page_count); - return 0; + return 1; out_err: svc_rdma_read_info_free(info); diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c index c3d588b149aa..68af79d4f04f 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c @@ -358,49 +358,42 @@ static ssize_t svc_rdma_encode_read_list(struct svc_rdma_send_ctxt *sctxt) /** * svc_rdma_encode_write_segment - Encode one Write segment - * @src: matching Write chunk in the RPC Call header * @sctxt: Send context for the RPC Reply + * @chunk: Write chunk to push * @remaining: remaining bytes of the payload left in the Write chunk + * @segno: which segment in the chunk * * Return values: * On success, returns length in bytes of the Reply XDR buffer - * that was consumed by the Write segment + * that was consumed by the Write segment, and updates @remaining * %-EMSGSIZE on XDR buffer overflow */ -static ssize_t svc_rdma_encode_write_segment(__be32 *src, - struct svc_rdma_send_ctxt *sctxt, - unsigned int *remaining) +static ssize_t svc_rdma_encode_write_segment(struct svc_rdma_send_ctxt *sctxt, + const struct svc_rdma_chunk *chunk, + u32 *remaining, unsigned int segno) { + const struct svc_rdma_segment *segment = &chunk->ch_segments[segno]; + const size_t len = rpcrdma_segment_maxsz * sizeof(__be32); + u32 length; __be32 *p; - const size_t len = rpcrdma_segment_maxsz * sizeof(*p); - u32 handle, length; - u64 offset; p = xdr_reserve_space(&sctxt->sc_stream, len); if (!p) return -EMSGSIZE; - xdr_decode_rdma_segment(src, &handle, &length, &offset); - - if (*remaining < length) { - /* segment only partly filled */ - length = *remaining; - *remaining = 0; - } else { - /* entire segment was consumed */ - *remaining -= length; - } - xdr_encode_rdma_segment(p, handle, length, offset); - - trace_svcrdma_encode_wseg(handle, length, offset); + length = min_t(u32, *remaining, segment->rs_length); + *remaining -= length; + xdr_encode_rdma_segment(p, segment->rs_handle, length, + segment->rs_offset); + trace_svcrdma_encode_wseg(sctxt, segno, segment->rs_handle, length, + segment->rs_offset); return len; } /** * svc_rdma_encode_write_chunk - Encode one Write chunk - * @src: matching Write chunk in the RPC Call header * @sctxt: Send context for the RPC Reply - * @remaining: size in bytes of the payload in the Write chunk + * @chunk: Write chunk to push * * Copy a Write chunk from the Call transport header to the * Reply transport header. Update each segment's length field @@ -411,33 +404,28 @@ static ssize_t svc_rdma_encode_write_segment(__be32 *src, * that was consumed by the Write chunk * %-EMSGSIZE on XDR buffer overflow */ -static ssize_t svc_rdma_encode_write_chunk(__be32 *src, - struct svc_rdma_send_ctxt *sctxt, - unsigned int remaining) +static ssize_t svc_rdma_encode_write_chunk(struct svc_rdma_send_ctxt *sctxt, + const struct svc_rdma_chunk *chunk) { - unsigned int i, nsegs; + u32 remaining = chunk->ch_payload_length; + unsigned int segno; ssize_t len, ret; len = 0; - trace_svcrdma_encode_write_chunk(remaining); - - src++; ret = xdr_stream_encode_item_present(&sctxt->sc_stream); if (ret < 0) - return -EMSGSIZE; + return ret; len += ret; - nsegs = be32_to_cpup(src++); - ret = xdr_stream_encode_u32(&sctxt->sc_stream, nsegs); + ret = xdr_stream_encode_u32(&sctxt->sc_stream, chunk->ch_segcount); if (ret < 0) - return -EMSGSIZE; + return ret; len += ret; - for (i = nsegs; i; i--) { - ret = svc_rdma_encode_write_segment(src, sctxt, &remaining); + for (segno = 0; segno < chunk->ch_segcount; segno++) { + ret = svc_rdma_encode_write_segment(sctxt, chunk, &remaining, segno); if (ret < 0) - return -EMSGSIZE; - src += rpcrdma_segment_maxsz; + return ret; len += ret; } @@ -448,32 +436,25 @@ static ssize_t svc_rdma_encode_write_chunk(__be32 *src, * svc_rdma_encode_write_list - Encode RPC Reply's Write chunk list * @rctxt: Reply context with information about the RPC Call * @sctxt: Send context for the RPC Reply - * @length: size in bytes of the payload in the first Write chunk - * - * The client provides a Write chunk list in the Call message. Fill - * in the segments in the first Write chunk in the Reply's transport - * header with the number of bytes consumed in each segment. - * Remaining chunks are returned unused. - * - * Assumptions: - * - Client has provided only one Write chunk * * Return values: * On success, returns length in bytes of the Reply XDR buffer * that was consumed by the Reply's Write list * %-EMSGSIZE on XDR buffer overflow */ -static ssize_t -svc_rdma_encode_write_list(const struct svc_rdma_recv_ctxt *rctxt, - struct svc_rdma_send_ctxt *sctxt, - unsigned int length) +static ssize_t svc_rdma_encode_write_list(struct svc_rdma_recv_ctxt *rctxt, + struct svc_rdma_send_ctxt *sctxt) { + struct svc_rdma_chunk *chunk; ssize_t len, ret; - ret = svc_rdma_encode_write_chunk(rctxt->rc_write_list, sctxt, length); - if (ret < 0) - return ret; - len = ret; + len = 0; + pcl_for_each_chunk(chunk, &rctxt->rc_write_pcl) { + ret = svc_rdma_encode_write_chunk(sctxt, chunk); + if (ret < 0) + return ret; + len += ret; + } /* Terminate the Write list */ ret = xdr_stream_encode_item_absent(&sctxt->sc_stream); @@ -489,56 +470,174 @@ svc_rdma_encode_write_list(const struct svc_rdma_recv_ctxt *rctxt, * @sctxt: Send context for the RPC Reply * @length: size in bytes of the payload in the Reply chunk * - * Assumptions: - * - Reply can always fit in the client-provided Reply chunk - * * Return values: * On success, returns length in bytes of the Reply XDR buffer * that was consumed by the Reply's Reply chunk * %-EMSGSIZE on XDR buffer overflow + * %-E2BIG if the RPC message is larger than the Reply chunk */ static ssize_t -svc_rdma_encode_reply_chunk(const struct svc_rdma_recv_ctxt *rctxt, +svc_rdma_encode_reply_chunk(struct svc_rdma_recv_ctxt *rctxt, struct svc_rdma_send_ctxt *sctxt, unsigned int length) { - return svc_rdma_encode_write_chunk(rctxt->rc_reply_chunk, sctxt, - length); + struct svc_rdma_chunk *chunk; + + if (pcl_is_empty(&rctxt->rc_reply_pcl)) + return xdr_stream_encode_item_absent(&sctxt->sc_stream); + + chunk = pcl_first_chunk(&rctxt->rc_reply_pcl); + if (length > chunk->ch_length) + return -E2BIG; + + chunk->ch_payload_length = length; + return svc_rdma_encode_write_chunk(sctxt, chunk); } -static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma, - struct svc_rdma_send_ctxt *ctxt, - struct page *page, - unsigned long offset, - unsigned int len) +struct svc_rdma_map_data { + struct svcxprt_rdma *md_rdma; + struct svc_rdma_send_ctxt *md_ctxt; +}; + +/** + * svc_rdma_page_dma_map - DMA map one page + * @data: pointer to arguments + * @page: struct page to DMA map + * @offset: offset into the page + * @len: number of bytes to map + * + * Returns: + * %0 if DMA mapping was successful + * %-EIO if the page cannot be DMA mapped + */ +static int svc_rdma_page_dma_map(void *data, struct page *page, + unsigned long offset, unsigned int len) { + struct svc_rdma_map_data *args = data; + struct svcxprt_rdma *rdma = args->md_rdma; + struct svc_rdma_send_ctxt *ctxt = args->md_ctxt; struct ib_device *dev = rdma->sc_cm_id->device; dma_addr_t dma_addr; + ++ctxt->sc_cur_sge_no; + dma_addr = ib_dma_map_page(dev, page, offset, len, DMA_TO_DEVICE); - trace_svcrdma_dma_map_page(rdma, dma_addr, len); if (ib_dma_mapping_error(dev, dma_addr)) goto out_maperr; + trace_svcrdma_dma_map_page(rdma, dma_addr, len); ctxt->sc_sges[ctxt->sc_cur_sge_no].addr = dma_addr; ctxt->sc_sges[ctxt->sc_cur_sge_no].length = len; ctxt->sc_send_wr.num_sge++; return 0; out_maperr: + trace_svcrdma_dma_map_err(rdma, dma_addr, len); return -EIO; } -/* ib_dma_map_page() is used here because svc_rdma_dma_unmap() +/** + * svc_rdma_iov_dma_map - DMA map an iovec + * @data: pointer to arguments + * @iov: kvec to DMA map + * + * ib_dma_map_page() is used here because svc_rdma_dma_unmap() * handles DMA-unmap and it uses ib_dma_unmap_page() exclusively. + * + * Returns: + * %0 if DMA mapping was successful + * %-EIO if the iovec cannot be DMA mapped */ -static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma, - struct svc_rdma_send_ctxt *ctxt, - unsigned char *base, - unsigned int len) +static int svc_rdma_iov_dma_map(void *data, const struct kvec *iov) { - return svc_rdma_dma_map_page(rdma, ctxt, virt_to_page(base), - offset_in_page(base), len); + if (!iov->iov_len) + return 0; + return svc_rdma_page_dma_map(data, virt_to_page(iov->iov_base), + offset_in_page(iov->iov_base), + iov->iov_len); +} + +/** + * svc_rdma_xb_dma_map - DMA map all segments of an xdr_buf + * @xdr: xdr_buf containing portion of an RPC message to transmit + * @data: pointer to arguments + * + * Returns: + * %0 if DMA mapping was successful + * %-EIO if DMA mapping failed + * + * On failure, any DMA mappings that have been already done must be + * unmapped by the caller. + */ +static int svc_rdma_xb_dma_map(const struct xdr_buf *xdr, void *data) +{ + unsigned int len, remaining; + unsigned long pageoff; + struct page **ppages; + int ret; + + ret = svc_rdma_iov_dma_map(data, &xdr->head[0]); + if (ret < 0) + return ret; + + ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); + pageoff = offset_in_page(xdr->page_base); + remaining = xdr->page_len; + while (remaining) { + len = min_t(u32, PAGE_SIZE - pageoff, remaining); + + ret = svc_rdma_page_dma_map(data, *ppages++, pageoff, len); + if (ret < 0) + return ret; + + remaining -= len; + pageoff = 0; + } + + ret = svc_rdma_iov_dma_map(data, &xdr->tail[0]); + if (ret < 0) + return ret; + + return xdr->len; +} + +struct svc_rdma_pullup_data { + u8 *pd_dest; + unsigned int pd_length; + unsigned int pd_num_sges; +}; + +/** + * svc_rdma_xb_count_sges - Count how many SGEs will be needed + * @xdr: xdr_buf containing portion of an RPC message to transmit + * @data: pointer to arguments + * + * Returns: + * Number of SGEs needed to Send the contents of @xdr inline + */ +static int svc_rdma_xb_count_sges(const struct xdr_buf *xdr, + void *data) +{ + struct svc_rdma_pullup_data *args = data; + unsigned int remaining; + unsigned long offset; + + if (xdr->head[0].iov_len) + ++args->pd_num_sges; + + offset = offset_in_page(xdr->page_base); + remaining = xdr->page_len; + while (remaining) { + ++args->pd_num_sges; + remaining -= min_t(u32, PAGE_SIZE - offset, remaining); + offset = 0; + } + + if (xdr->tail[0].iov_len) + ++args->pd_num_sges; + + args->pd_length += xdr->len; + return 0; } /** @@ -549,48 +648,71 @@ static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma, * @xdr: xdr_buf containing RPC message to transmit * * Returns: - * %true if pull-up must be used - * %false otherwise + * %true if pull-up must be used + * %false otherwise */ -static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma, - struct svc_rdma_send_ctxt *sctxt, +static bool svc_rdma_pull_up_needed(const struct svcxprt_rdma *rdma, + const struct svc_rdma_send_ctxt *sctxt, const struct svc_rdma_recv_ctxt *rctxt, - struct xdr_buf *xdr) + const struct xdr_buf *xdr) { - int elements; + /* Resources needed for the transport header */ + struct svc_rdma_pullup_data args = { + .pd_length = sctxt->sc_hdrbuf.len, + .pd_num_sges = 1, + }; + int ret; - /* For small messages, copying bytes is cheaper than DMA mapping. - */ - if (sctxt->sc_hdrbuf.len + xdr->len < RPCRDMA_PULLUP_THRESH) + ret = pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr, + svc_rdma_xb_count_sges, &args); + if (ret < 0) + return false; + + if (args.pd_length < RPCRDMA_PULLUP_THRESH) return true; + return args.pd_num_sges >= rdma->sc_max_send_sges; +} - /* Check whether the xdr_buf has more elements than can - * fit in a single RDMA Send. - */ - /* xdr->head */ - elements = 1; - - /* xdr->pages */ - if (!rctxt || !rctxt->rc_write_list) { - unsigned int remaining; - unsigned long pageoff; - - pageoff = xdr->page_base & ~PAGE_MASK; - remaining = xdr->page_len; - while (remaining) { - ++elements; - remaining -= min_t(u32, PAGE_SIZE - pageoff, - remaining); - pageoff = 0; - } +/** + * svc_rdma_xb_linearize - Copy region of xdr_buf to flat buffer + * @xdr: xdr_buf containing portion of an RPC message to copy + * @data: pointer to arguments + * + * Returns: + * Always zero. + */ +static int svc_rdma_xb_linearize(const struct xdr_buf *xdr, + void *data) +{ + struct svc_rdma_pullup_data *args = data; + unsigned int len, remaining; + unsigned long pageoff; + struct page **ppages; + + if (xdr->head[0].iov_len) { + memcpy(args->pd_dest, xdr->head[0].iov_base, xdr->head[0].iov_len); + args->pd_dest += xdr->head[0].iov_len; } - /* xdr->tail */ - if (xdr->tail[0].iov_len) - ++elements; + ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); + pageoff = offset_in_page(xdr->page_base); + remaining = xdr->page_len; + while (remaining) { + len = min_t(u32, PAGE_SIZE - pageoff, remaining); + memcpy(args->pd_dest, page_address(*ppages) + pageoff, len); + remaining -= len; + args->pd_dest += len; + pageoff = 0; + ppages++; + } - /* assume 1 SGE is needed for the transport header */ - return elements >= rdma->sc_max_send_sges; + if (xdr->tail[0].iov_len) { + memcpy(args->pd_dest, xdr->tail[0].iov_base, xdr->tail[0].iov_len); + args->pd_dest += xdr->tail[0].iov_len; + } + + args->pd_length += xdr->len; + return 0; } /** @@ -603,54 +725,30 @@ static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma, * The device is not capable of sending the reply directly. * Assemble the elements of @xdr into the transport header buffer. * - * Returns zero on success, or a negative errno on failure. + * Assumptions: + * pull_up_needed has determined that @xdr will fit in the buffer. + * + * Returns: + * %0 if pull-up was successful + * %-EMSGSIZE if a buffer manipulation problem occurred */ -static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma, +static int svc_rdma_pull_up_reply_msg(const struct svcxprt_rdma *rdma, struct svc_rdma_send_ctxt *sctxt, const struct svc_rdma_recv_ctxt *rctxt, const struct xdr_buf *xdr) { - unsigned char *dst, *tailbase; - unsigned int taillen; - - dst = sctxt->sc_xprt_buf + sctxt->sc_hdrbuf.len; - memcpy(dst, xdr->head[0].iov_base, xdr->head[0].iov_len); - dst += xdr->head[0].iov_len; - - tailbase = xdr->tail[0].iov_base; - taillen = xdr->tail[0].iov_len; - if (rctxt && rctxt->rc_write_list) { - u32 xdrpad; - - xdrpad = xdr_pad_size(xdr->page_len); - if (taillen && xdrpad) { - tailbase += xdrpad; - taillen -= xdrpad; - } - } else { - unsigned int len, remaining; - unsigned long pageoff; - struct page **ppages; - - ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); - pageoff = xdr->page_base & ~PAGE_MASK; - remaining = xdr->page_len; - while (remaining) { - len = min_t(u32, PAGE_SIZE - pageoff, remaining); - - memcpy(dst, page_address(*ppages) + pageoff, len); - remaining -= len; - dst += len; - pageoff = 0; - ppages++; - } - } + struct svc_rdma_pullup_data args = { + .pd_dest = sctxt->sc_xprt_buf + sctxt->sc_hdrbuf.len, + }; + int ret; - if (taillen) - memcpy(dst, tailbase, taillen); + ret = pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr, + svc_rdma_xb_linearize, &args); + if (ret < 0) + return ret; - sctxt->sc_sges[0].length += xdr->len; - trace_svcrdma_send_pullup(sctxt->sc_sges[0].length); + sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len + args.pd_length; + trace_svcrdma_send_pullup(sctxt, args.pd_length); return 0; } @@ -660,22 +758,22 @@ static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma, * @rctxt: Write and Reply chunks provided by client * @xdr: prepared xdr_buf containing RPC message * - * Load the xdr_buf into the ctxt's sge array, and DMA map each - * element as it is added. The Send WR's num_sge field is set. + * Returns: + * %0 if DMA mapping was successful. + * %-EMSGSIZE if a buffer manipulation problem occurred + * %-EIO if DMA mapping failed * - * Returns zero on success, or a negative errno on failure. + * The Send WR's num_sge field is set in all cases. */ int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma, struct svc_rdma_send_ctxt *sctxt, const struct svc_rdma_recv_ctxt *rctxt, - struct xdr_buf *xdr) + const struct xdr_buf *xdr) { - unsigned int len, remaining; - unsigned long page_off; - struct page **ppages; - unsigned char *base; - u32 xdr_pad; - int ret; + struct svc_rdma_map_data args = { + .md_rdma = rdma, + .md_ctxt = sctxt, + }; /* Set up the (persistently-mapped) transport header SGE. */ sctxt->sc_send_wr.num_sge = 1; @@ -684,7 +782,7 @@ int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma, /* If there is a Reply chunk, nothing follows the transport * header, and we're done here. */ - if (rctxt && rctxt->rc_reply_chunk) + if (!pcl_is_empty(&rctxt->rc_reply_pcl)) return 0; /* For pull-up, svc_rdma_send() will sync the transport header. @@ -693,58 +791,8 @@ int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma, if (svc_rdma_pull_up_needed(rdma, sctxt, rctxt, xdr)) return svc_rdma_pull_up_reply_msg(rdma, sctxt, rctxt, xdr); - ++sctxt->sc_cur_sge_no; - ret = svc_rdma_dma_map_buf(rdma, sctxt, - xdr->head[0].iov_base, - xdr->head[0].iov_len); - if (ret < 0) - return ret; - - /* If a Write chunk is present, the xdr_buf's page list - * is not included inline. However the Upper Layer may - * have added XDR padding in the tail buffer, and that - * should not be included inline. - */ - if (rctxt && rctxt->rc_write_list) { - base = xdr->tail[0].iov_base; - len = xdr->tail[0].iov_len; - xdr_pad = xdr_pad_size(xdr->page_len); - - if (len && xdr_pad) { - base += xdr_pad; - len -= xdr_pad; - } - - goto tail; - } - - ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); - page_off = xdr->page_base & ~PAGE_MASK; - remaining = xdr->page_len; - while (remaining) { - len = min_t(u32, PAGE_SIZE - page_off, remaining); - - ++sctxt->sc_cur_sge_no; - ret = svc_rdma_dma_map_page(rdma, sctxt, *ppages++, - page_off, len); - if (ret < 0) - return ret; - - remaining -= len; - page_off = 0; - } - - base = xdr->tail[0].iov_base; - len = xdr->tail[0].iov_len; -tail: - if (len) { - ++sctxt->sc_cur_sge_no; - ret = svc_rdma_dma_map_buf(rdma, sctxt, base, len); - if (ret < 0) - return ret; - } - - return 0; + return pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr, + svc_rdma_xb_dma_map, &args); } /* The svc_rqst and all resources it owns are released as soon as @@ -894,9 +942,6 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) container_of(xprt, struct svcxprt_rdma, sc_xprt); struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt; __be32 *rdma_argp = rctxt->rc_recv_buf; - __be32 *wr_lst = rctxt->rc_write_list; - __be32 *rp_ch = rctxt->rc_reply_chunk; - struct xdr_buf *xdr = &rqstp->rq_res; struct svc_rdma_send_ctxt *sctxt; __be32 *p; int ret; @@ -914,45 +959,22 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) rpcrdma_fixed_maxsz * sizeof(*p)); if (!p) goto err0; + + ret = svc_rdma_send_reply_chunk(rdma, rctxt, &rqstp->rq_res); + if (ret < 0) + goto err2; + *p++ = *rdma_argp; *p++ = *(rdma_argp + 1); *p++ = rdma->sc_fc_credits; - *p = rp_ch ? rdma_nomsg : rdma_msg; + *p = pcl_is_empty(&rctxt->rc_reply_pcl) ? rdma_msg : rdma_nomsg; if (svc_rdma_encode_read_list(sctxt) < 0) goto err0; - if (wr_lst) { - /* XXX: Presume the client sent only one Write chunk */ - unsigned long offset; - unsigned int length; - - if (rctxt->rc_read_payload_length) { - offset = rctxt->rc_read_payload_offset; - length = rctxt->rc_read_payload_length; - } else { - offset = xdr->head[0].iov_len; - length = xdr->page_len; - } - ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr, offset, - length); - if (ret < 0) - goto err2; - if (svc_rdma_encode_write_list(rctxt, sctxt, length) < 0) - goto err0; - } else { - if (xdr_stream_encode_item_absent(&sctxt->sc_stream) < 0) - goto err0; - } - if (rp_ch) { - ret = svc_rdma_send_reply_chunk(rdma, rctxt, &rqstp->rq_res); - if (ret < 0) - goto err2; - if (svc_rdma_encode_reply_chunk(rctxt, sctxt, ret) < 0) - goto err0; - } else { - if (xdr_stream_encode_item_absent(&sctxt->sc_stream) < 0) - goto err0; - } + if (svc_rdma_encode_write_list(rctxt, sctxt) < 0) + goto err0; + if (svc_rdma_encode_reply_chunk(rctxt, sctxt, ret) < 0) + goto err0; ret = svc_rdma_send_reply_msg(rdma, sctxt, rctxt, rqstp); if (ret < 0) @@ -979,28 +1001,46 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) } /** - * svc_rdma_read_payload - special processing for a READ payload + * svc_rdma_result_payload - special processing for a result payload * @rqstp: svc_rqst to operate on * @offset: payload's byte offset in @xdr * @length: size of payload, in bytes * - * Returns zero on success. - * - * For the moment, just record the xdr_buf location of the READ - * payload. svc_rdma_sendto will use that location later when - * we actually send the payload. + * Return values: + * %0 if successful or nothing needed to be done + * %-EMSGSIZE on XDR buffer overflow + * %-E2BIG if the payload was larger than the Write chunk + * %-EINVAL if client provided too many segments + * %-ENOMEM if rdma_rw context pool was exhausted + * %-ENOTCONN if posting failed (connection is lost) + * %-EIO if rdma_rw initialization failed (DMA mapping, etc) */ -int svc_rdma_read_payload(struct svc_rqst *rqstp, unsigned int offset, - unsigned int length) +int svc_rdma_result_payload(struct svc_rqst *rqstp, unsigned int offset, + unsigned int length) { struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt; + struct svc_rdma_chunk *chunk; + struct svcxprt_rdma *rdma; + struct xdr_buf subbuf; + int ret; - /* XXX: Just one READ payload slot for now, since our - * transport implementation currently supports only one - * Write chunk. - */ - rctxt->rc_read_payload_offset = offset; - rctxt->rc_read_payload_length = length; + chunk = rctxt->rc_cur_result_payload; + if (!length || !chunk) + return 0; + rctxt->rc_cur_result_payload = + pcl_next_chunk(&rctxt->rc_write_pcl, chunk); + if (length > chunk->ch_length) + return -E2BIG; + chunk->ch_position = offset; + chunk->ch_payload_length = length; + + if (xdr_buf_subsegment(&rqstp->rq_res, &subbuf, offset, length)) + return -EMSGSIZE; + + rdma = container_of(rqstp->rq_xprt, struct svcxprt_rdma, sc_xprt); + ret = svc_rdma_send_write_chunk(rdma, chunk, &subbuf); + if (ret < 0) + return ret; return 0; } diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c index fb044792b571..afba4e9d5425 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c @@ -80,7 +80,7 @@ static const struct svc_xprt_ops svc_rdma_ops = { .xpo_create = svc_rdma_create, .xpo_recvfrom = svc_rdma_recvfrom, .xpo_sendto = svc_rdma_sendto, - .xpo_read_payload = svc_rdma_read_payload, + .xpo_result_payload = svc_rdma_result_payload, .xpo_release_rqst = svc_rdma_release_rqst, .xpo_detach = svc_rdma_detach, .xpo_free = svc_rdma_free, |