summaryrefslogtreecommitdiff
path: root/net
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2020-04-04 11:13:51 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2020-04-04 11:13:51 -0700
commitb3d8e4228268f9cfacc2e88aa61b6d0ce776e207 (patch)
tree2139af090542ab58bf7fa8dbbb4072ece1edb43e /net
parentbdabb68931b9360bf18b498062f1ac90bec46633 (diff)
parent1a33d8a284b1e85e03b8c7b1ea8fb985fccd1d71 (diff)
Merge tag 'nfsd-5.7' of git://git.linux-nfs.org/projects/cel/cel-2.6
Pull nfsd updates from Chuck Lever: - Fix EXCHANGE_ID response when NFSD runs in a container - A battery of new static trace points - Socket transports now use bio_vec to send Replies - NFS/RDMA now supports filesystems with no .splice_read method - Favor memcpy() over DMA mapping for small RPC/RDMA Replies - Add pre-requisites for supporting multiple Write chunks - Numerous minor fixes and clean-ups [ Chuck is filling in for Bruce this time while he and his family settle into a new house ] * tag 'nfsd-5.7' of git://git.linux-nfs.org/projects/cel/cel-2.6: (39 commits) svcrdma: Fix leak of transport addresses SUNRPC: Fix a potential buffer overflow in 'svc_print_xprts()' SUNRPC/cache: don't allow invalid entries to be flushed nfsd: fsnotify on rmdir under nfsd/clients/ nfsd4: kill warnings on testing stateids with mismatched clientids nfsd: remove read permission bit for ctl sysctl NFSD: Fix NFS server build errors sunrpc: Add tracing for cache events SUNRPC/cache: Allow garbage collection of invalid cache entries nfsd: export upcalls must not return ESTALE when mountd is down nfsd: Add tracepoints for update of the expkey and export cache entries nfsd: Add tracepoints for exp_find_key() and exp_get_by_name() nfsd: Add tracing to nfsd_set_fh_dentry() nfsd: Don't add locks to closed or closing open stateids SUNRPC: Teach server to use xprt_sock_sendmsg for socket sends SUNRPC: Refactor xs_sendpages() svcrdma: Avoid DMA mapping small RPC Replies svcrdma: Fix double sync of transport header buffer svcrdma: Refactor chunk list encoders SUNRPC: Add encoders for list item discriminators ...
Diffstat (limited to 'net')
-rw-r--r--net/sunrpc/auth_gss/auth_gss.c2
-rw-r--r--net/sunrpc/auth_gss/svcauth_gss.c60
-rw-r--r--net/sunrpc/cache.c128
-rw-r--r--net/sunrpc/clnt.c1
-rw-r--r--net/sunrpc/socklib.c141
-rw-r--r--net/sunrpc/socklib.h15
-rw-r--r--net/sunrpc/sunrpc.h4
-rw-r--r--net/sunrpc/svc.c20
-rw-r--r--net/sunrpc/svc_xprt.c22
-rw-r--r--net/sunrpc/svcauth_unix.c12
-rw-r--r--net/sunrpc/svcsock.c202
-rw-r--r--net/sunrpc/xprt.c3
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c36
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_backchannel.c17
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_recvfrom.c244
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_rw.c57
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_sendto.c512
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c8
-rw-r--r--net/sunrpc/xprtsock.c188
19 files changed, 901 insertions, 771 deletions
diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c
index 24ca861815b1..ee060d57d216 100644
--- a/net/sunrpc/auth_gss/auth_gss.c
+++ b/net/sunrpc/auth_gss/auth_gss.c
@@ -1877,7 +1877,7 @@ static int gss_wrap_req_priv(struct rpc_cred *cred, struct gss_cl_ctx *ctx,
else
iov = snd_buf->head;
p = iov->iov_base + iov->iov_len;
- pad = 3 - ((snd_buf->len - offset - 1) & 3);
+ pad = xdr_pad_size(snd_buf->len - offset);
memset(p, 0, pad);
iov->iov_len += pad;
snd_buf->len += pad;
diff --git a/net/sunrpc/auth_gss/svcauth_gss.c b/net/sunrpc/auth_gss/svcauth_gss.c
index 65b67b257302..54ae5be62f6a 100644
--- a/net/sunrpc/auth_gss/svcauth_gss.c
+++ b/net/sunrpc/auth_gss/svcauth_gss.c
@@ -55,10 +55,6 @@
#include "gss_rpc_upcall.h"
-#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
-# define RPCDBG_FACILITY RPCDBG_AUTH
-#endif
-
/* The rpcsec_init cache is used for mapping RPCSEC_GSS_{,CONT_}INIT requests
* into replies.
*
@@ -184,6 +180,11 @@ static struct cache_head *rsi_alloc(void)
return NULL;
}
+static int rsi_upcall(struct cache_detail *cd, struct cache_head *h)
+{
+ return sunrpc_cache_pipe_upcall_timeout(cd, h);
+}
+
static void rsi_request(struct cache_detail *cd,
struct cache_head *h,
char **bpp, int *blen)
@@ -282,6 +283,7 @@ static const struct cache_detail rsi_cache_template = {
.hash_size = RSI_HASHMAX,
.name = "auth.rpcsec.init",
.cache_put = rsi_put,
+ .cache_upcall = rsi_upcall,
.cache_request = rsi_request,
.cache_parse = rsi_parse,
.match = rsi_match,
@@ -428,6 +430,11 @@ rsc_alloc(void)
return NULL;
}
+static int rsc_upcall(struct cache_detail *cd, struct cache_head *h)
+{
+ return -EINVAL;
+}
+
static int rsc_parse(struct cache_detail *cd,
char *mesg, int mlen)
{
@@ -554,6 +561,7 @@ static const struct cache_detail rsc_cache_template = {
.hash_size = RSC_HASHMAX,
.name = "auth.rpcsec.context",
.cache_put = rsc_put,
+ .cache_upcall = rsc_upcall,
.cache_parse = rsc_parse,
.match = rsc_match,
.init = rsc_init,
@@ -713,14 +721,12 @@ gss_verify_header(struct svc_rqst *rqstp, struct rsc *rsci,
}
if (gc->gc_seq > MAXSEQ) {
- dprintk("RPC: svcauth_gss: discarding request with "
- "large sequence number %d\n", gc->gc_seq);
+ trace_rpcgss_svc_large_seqno(rqstp->rq_xid, gc->gc_seq);
*authp = rpcsec_gsserr_ctxproblem;
return SVC_DENIED;
}
if (!gss_check_seq_num(rsci, gc->gc_seq)) {
- dprintk("RPC: svcauth_gss: discarding request with "
- "old sequence number %d\n", gc->gc_seq);
+ trace_rpcgss_svc_old_seqno(rqstp->rq_xid, gc->gc_seq);
return SVC_DROP;
}
return SVC_OK;
@@ -961,7 +967,7 @@ unwrap_priv_data(struct svc_rqst *rqstp, struct xdr_buf *buf, u32 seq, struct gs
/* XXX: This is very inefficient. It would be better to either do
* this while we encrypt, or maybe in the receive code, if we can peak
* ahead and work out the service and mechanism there. */
- offset = buf->head[0].iov_len % 4;
+ offset = xdr_pad_size(buf->head[0].iov_len);
if (offset) {
buf->buflen = RPCSVC_MAXPAYLOAD;
xdr_shift_buf(buf, offset);
@@ -1245,7 +1251,6 @@ static int gss_proxy_save_rsc(struct cache_detail *cd,
if (!ud->found_creds) {
/* userspace seem buggy, we should always get at least a
* mapping to nobody */
- dprintk("RPC: No creds found!\n");
goto out;
} else {
struct timespec64 boot;
@@ -1311,8 +1316,8 @@ static int svcauth_gss_proxy_init(struct svc_rqst *rqstp,
if (status)
goto out;
- trace_rpcgss_accept_upcall(rqstp->rq_xid, ud.major_status,
- ud.minor_status);
+ trace_rpcgss_svc_accept_upcall(rqstp->rq_xid, ud.major_status,
+ ud.minor_status);
switch (ud.major_status) {
case GSS_S_CONTINUE_NEEDED:
@@ -1320,31 +1325,23 @@ static int svcauth_gss_proxy_init(struct svc_rqst *rqstp,
break;
case GSS_S_COMPLETE:
status = gss_proxy_save_rsc(sn->rsc_cache, &ud, &handle);
- if (status) {
- pr_info("%s: gss_proxy_save_rsc failed (%d)\n",
- __func__, status);
+ if (status)
goto out;
- }
cli_handle.data = (u8 *)&handle;
cli_handle.len = sizeof(handle);
break;
default:
- ret = SVC_CLOSE;
goto out;
}
/* Got an answer to the upcall; use it: */
if (gss_write_init_verf(sn->rsc_cache, rqstp,
- &cli_handle, &ud.major_status)) {
- pr_info("%s: gss_write_init_verf failed\n", __func__);
+ &cli_handle, &ud.major_status))
goto out;
- }
if (gss_write_resv(resv, PAGE_SIZE,
&cli_handle, &ud.out_token,
- ud.major_status, ud.minor_status)) {
- pr_info("%s: gss_write_resv failed\n", __func__);
+ ud.major_status, ud.minor_status))
goto out;
- }
ret = SVC_COMPLETE;
out:
@@ -1495,8 +1492,7 @@ svcauth_gss_accept(struct svc_rqst *rqstp, __be32 *authp)
int ret;
struct sunrpc_net *sn = net_generic(SVC_NET(rqstp), sunrpc_net_id);
- dprintk("RPC: svcauth_gss: argv->iov_len = %zd\n",
- argv->iov_len);
+ trace_rpcgss_svc_accept(rqstp->rq_xid, argv->iov_len);
*authp = rpc_autherr_badcred;
if (!svcdata)
@@ -1680,7 +1676,8 @@ svcauth_gss_wrap_resp_integ(struct svc_rqst *rqstp)
goto out;
integ_offset = (u8 *)(p + 1) - (u8 *)resbuf->head[0].iov_base;
integ_len = resbuf->len - integ_offset;
- BUG_ON(integ_len % 4);
+ if (integ_len & 3)
+ goto out;
*p++ = htonl(integ_len);
*p++ = htonl(gc->gc_seq);
if (xdr_buf_subsegment(resbuf, &integ_buf, integ_offset, integ_len)) {
@@ -1704,7 +1701,8 @@ svcauth_gss_wrap_resp_integ(struct svc_rqst *rqstp)
resv->iov_len += XDR_QUADLEN(mic.len) << 2;
/* not strictly required: */
resbuf->len += XDR_QUADLEN(mic.len) << 2;
- BUG_ON(resv->iov_len > PAGE_SIZE);
+ if (resv->iov_len > PAGE_SIZE)
+ goto out_err;
out:
stat = 0;
out_err:
@@ -1740,9 +1738,11 @@ svcauth_gss_wrap_resp_priv(struct svc_rqst *rqstp)
* both the head and tail.
*/
if (resbuf->tail[0].iov_base) {
- BUG_ON(resbuf->tail[0].iov_base >= resbuf->head[0].iov_base
- + PAGE_SIZE);
- BUG_ON(resbuf->tail[0].iov_base < resbuf->head[0].iov_base);
+ if (resbuf->tail[0].iov_base >=
+ resbuf->head[0].iov_base + PAGE_SIZE)
+ return -EINVAL;
+ if (resbuf->tail[0].iov_base < resbuf->head[0].iov_base)
+ return -EINVAL;
if (resbuf->tail[0].iov_len + resbuf->head[0].iov_len
+ 2 * RPC_MAX_AUTH_SIZE > PAGE_SIZE)
return -ENOMEM;
diff --git a/net/sunrpc/cache.c b/net/sunrpc/cache.c
index bd843a81afa0..af0ddd28b081 100644
--- a/net/sunrpc/cache.c
+++ b/net/sunrpc/cache.c
@@ -32,13 +32,13 @@
#include <linux/sunrpc/cache.h>
#include <linux/sunrpc/stats.h>
#include <linux/sunrpc/rpc_pipe_fs.h>
+#include <trace/events/sunrpc.h>
#include "netns.h"
#define RPCDBG_FACILITY RPCDBG_CACHE
static bool cache_defer_req(struct cache_req *req, struct cache_head *item);
static void cache_revisit_request(struct cache_head *item);
-static bool cache_listeners_exist(struct cache_detail *detail);
static void cache_init(struct cache_head *h, struct cache_detail *detail)
{
@@ -65,13 +65,14 @@ static struct cache_head *sunrpc_cache_find_rcu(struct cache_detail *detail,
rcu_read_lock();
hlist_for_each_entry_rcu(tmp, head, cache_list) {
- if (detail->match(tmp, key)) {
- if (cache_is_expired(detail, tmp))
- continue;
- tmp = cache_get_rcu(tmp);
- rcu_read_unlock();
- return tmp;
- }
+ if (!detail->match(tmp, key))
+ continue;
+ if (test_bit(CACHE_VALID, &tmp->flags) &&
+ cache_is_expired(detail, tmp))
+ continue;
+ tmp = cache_get_rcu(tmp);
+ rcu_read_unlock();
+ return tmp;
}
rcu_read_unlock();
return NULL;
@@ -113,18 +114,21 @@ static struct cache_head *sunrpc_cache_add_entry(struct cache_detail *detail,
spin_lock(&detail->hash_lock);
/* check if entry appeared while we slept */
- hlist_for_each_entry_rcu(tmp, head, cache_list) {
- if (detail->match(tmp, key)) {
- if (cache_is_expired(detail, tmp)) {
- sunrpc_begin_cache_remove_entry(tmp, detail);
- freeme = tmp;
- break;
- }
- cache_get(tmp);
- spin_unlock(&detail->hash_lock);
- cache_put(new, detail);
- return tmp;
+ hlist_for_each_entry_rcu(tmp, head, cache_list,
+ lockdep_is_held(&detail->hash_lock)) {
+ if (!detail->match(tmp, key))
+ continue;
+ if (test_bit(CACHE_VALID, &tmp->flags) &&
+ cache_is_expired(detail, tmp)) {
+ sunrpc_begin_cache_remove_entry(tmp, detail);
+ trace_cache_entry_expired(detail, tmp);
+ freeme = tmp;
+ break;
}
+ cache_get(tmp);
+ spin_unlock(&detail->hash_lock);
+ cache_put(new, detail);
+ return tmp;
}
hlist_add_head_rcu(&new->cache_list, head);
@@ -174,6 +178,25 @@ static void cache_fresh_unlocked(struct cache_head *head,
}
}
+static void cache_make_negative(struct cache_detail *detail,
+ struct cache_head *h)
+{
+ set_bit(CACHE_NEGATIVE, &h->flags);
+ trace_cache_entry_make_negative(detail, h);
+}
+
+static void cache_entry_update(struct cache_detail *detail,
+ struct cache_head *h,
+ struct cache_head *new)
+{
+ if (!test_bit(CACHE_NEGATIVE, &new->flags)) {
+ detail->update(h, new);
+ trace_cache_entry_update(detail, h);
+ } else {
+ cache_make_negative(detail, h);
+ }
+}
+
struct cache_head *sunrpc_cache_update(struct cache_detail *detail,
struct cache_head *new, struct cache_head *old, int hash)
{
@@ -186,10 +209,7 @@ struct cache_head *sunrpc_cache_update(struct cache_detail *detail,
if (!test_bit(CACHE_VALID, &old->flags)) {
spin_lock(&detail->hash_lock);
if (!test_bit(CACHE_VALID, &old->flags)) {
- if (test_bit(CACHE_NEGATIVE, &new->flags))
- set_bit(CACHE_NEGATIVE, &old->flags);
- else
- detail->update(old, new);
+ cache_entry_update(detail, old, new);
cache_fresh_locked(old, new->expiry_time, detail);
spin_unlock(&detail->hash_lock);
cache_fresh_unlocked(old, detail);
@@ -207,10 +227,7 @@ struct cache_head *sunrpc_cache_update(struct cache_detail *detail,
detail->init(tmp, old);
spin_lock(&detail->hash_lock);
- if (test_bit(CACHE_NEGATIVE, &new->flags))
- set_bit(CACHE_NEGATIVE, &tmp->flags);
- else
- detail->update(tmp, new);
+ cache_entry_update(detail, tmp, new);
hlist_add_head(&tmp->cache_list, &detail->hash_table[hash]);
detail->entries++;
cache_get(tmp);
@@ -224,13 +241,6 @@ struct cache_head *sunrpc_cache_update(struct cache_detail *detail,
}
EXPORT_SYMBOL_GPL(sunrpc_cache_update);
-static int cache_make_upcall(struct cache_detail *cd, struct cache_head *h)
-{
- if (cd->cache_upcall)
- return cd->cache_upcall(cd, h);
- return sunrpc_cache_pipe_upcall(cd, h);
-}
-
static inline int cache_is_valid(struct cache_head *h)
{
if (!test_bit(CACHE_VALID, &h->flags))
@@ -259,7 +269,7 @@ static int try_to_negate_entry(struct cache_detail *detail, struct cache_head *h
spin_lock(&detail->hash_lock);
rv = cache_is_valid(h);
if (rv == -EAGAIN) {
- set_bit(CACHE_NEGATIVE, &h->flags);
+ cache_make_negative(detail, h);
cache_fresh_locked(h, seconds_since_boot()+CACHE_NEW_EXPIRY,
detail);
rv = -ENOENT;
@@ -303,17 +313,14 @@ int cache_check(struct cache_detail *detail,
(h->expiry_time != 0 && age > refresh_age/2)) {
dprintk("RPC: Want update, refage=%lld, age=%lld\n",
refresh_age, age);
- if (!test_and_set_bit(CACHE_PENDING, &h->flags)) {
- switch (cache_make_upcall(detail, h)) {
- case -EINVAL:
- rv = try_to_negate_entry(detail, h);
- break;
- case -EAGAIN:
- cache_fresh_unlocked(h, detail);
- break;
- }
- } else if (!cache_listeners_exist(detail))
+ switch (detail->cache_upcall(detail, h)) {
+ case -EINVAL:
rv = try_to_negate_entry(detail, h);
+ break;
+ case -EAGAIN:
+ cache_fresh_unlocked(h, detail);
+ break;
+ }
}
if (rv == -EAGAIN) {
@@ -468,6 +475,7 @@ static int cache_clean(void)
continue;
sunrpc_begin_cache_remove_entry(ch, current_detail);
+ trace_cache_entry_expired(current_detail, ch);
rv = 1;
break;
}
@@ -1195,20 +1203,12 @@ static bool cache_listeners_exist(struct cache_detail *detail)
*
* Each request is at most one page long.
*/
-int sunrpc_cache_pipe_upcall(struct cache_detail *detail, struct cache_head *h)
+static int cache_pipe_upcall(struct cache_detail *detail, struct cache_head *h)
{
-
char *buf;
struct cache_request *crq;
int ret = 0;
- if (!detail->cache_request)
- return -EINVAL;
-
- if (!cache_listeners_exist(detail)) {
- warn_no_listener(detail);
- return -EINVAL;
- }
if (test_bit(CACHE_CLEANED, &h->flags))
/* Too late to make an upcall */
return -EAGAIN;
@@ -1231,6 +1231,7 @@ int sunrpc_cache_pipe_upcall(struct cache_detail *detail, struct cache_head *h)
if (test_bit(CACHE_PENDING, &h->flags)) {
crq->item = cache_get(h);
list_add_tail(&crq->q.list, &detail->queue);
+ trace_cache_entry_upcall(detail, h);
} else
/* Lost a race, no longer PENDING, so don't enqueue */
ret = -EAGAIN;
@@ -1242,8 +1243,27 @@ int sunrpc_cache_pipe_upcall(struct cache_detail *detail, struct cache_head *h)
}
return ret;
}
+
+int sunrpc_cache_pipe_upcall(struct cache_detail *detail, struct cache_head *h)
+{
+ if (test_and_set_bit(CACHE_PENDING, &h->flags))
+ return 0;
+ return cache_pipe_upcall(detail, h);
+}
EXPORT_SYMBOL_GPL(sunrpc_cache_pipe_upcall);
+int sunrpc_cache_pipe_upcall_timeout(struct cache_detail *detail,
+ struct cache_head *h)
+{
+ if (!cache_listeners_exist(detail)) {
+ warn_no_listener(detail);
+ trace_cache_entry_no_listener(detail, h);
+ return -EINVAL;
+ }
+ return sunrpc_cache_pipe_upcall(detail, h);
+}
+EXPORT_SYMBOL_GPL(sunrpc_cache_pipe_upcall_timeout);
+
/*
* parse a message from user-space and pass it
* to an appropriate cache
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 7324b21f923e..07992d3e8703 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -2509,6 +2509,7 @@ call_decode(struct rpc_task *task)
goto out;
req->rq_rcv_buf.len = req->rq_private_buf.len;
+ trace_xprt_recvfrom(&req->rq_rcv_buf);
/* Check that the softirq receive buffer is valid */
WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
diff --git a/net/sunrpc/socklib.c b/net/sunrpc/socklib.c
index 1a864f1ed119..3fc8af8bb961 100644
--- a/net/sunrpc/socklib.c
+++ b/net/sunrpc/socklib.c
@@ -14,9 +14,24 @@
#include <linux/types.h>
#include <linux/pagemap.h>
#include <linux/udp.h>
+#include <linux/sunrpc/msg_prot.h>
#include <linux/sunrpc/xdr.h>
#include <linux/export.h>
+#include "socklib.h"
+
+/*
+ * Helper structure for copying from an sk_buff.
+ */
+struct xdr_skb_reader {
+ struct sk_buff *skb;
+ unsigned int offset;
+ size_t count;
+ __wsum csum;
+};
+
+typedef size_t (*xdr_skb_read_actor)(struct xdr_skb_reader *desc, void *to,
+ size_t len);
/**
* xdr_skb_read_bits - copy some data bits from skb to internal buffer
@@ -186,3 +201,129 @@ no_checksum:
return 0;
}
EXPORT_SYMBOL_GPL(csum_partial_copy_to_xdr);
+
+static inline int xprt_sendmsg(struct socket *sock, struct msghdr *msg,
+ size_t seek)
+{
+ if (seek)
+ iov_iter_advance(&msg->msg_iter, seek);
+ return sock_sendmsg(sock, msg);
+}
+
+static int xprt_send_kvec(struct socket *sock, struct msghdr *msg,
+ struct kvec *vec, size_t seek)
+{
+ iov_iter_kvec(&msg->msg_iter, WRITE, vec, 1, vec->iov_len);
+ return xprt_sendmsg(sock, msg, seek);
+}
+
+static int xprt_send_pagedata(struct socket *sock, struct msghdr *msg,
+ struct xdr_buf *xdr, size_t base)
+{
+ int err;
+
+ err = xdr_alloc_bvec(xdr, GFP_KERNEL);
+ if (err < 0)
+ return err;
+
+ iov_iter_bvec(&msg->msg_iter, WRITE, xdr->bvec, xdr_buf_pagecount(xdr),
+ xdr->page_len + xdr->page_base);
+ return xprt_sendmsg(sock, msg, base + xdr->page_base);
+}
+
+/* Common case:
+ * - stream transport
+ * - sending from byte 0 of the message
+ * - the message is wholly contained in @xdr's head iovec
+ */
+static int xprt_send_rm_and_kvec(struct socket *sock, struct msghdr *msg,
+ rpc_fraghdr marker, struct kvec *vec,
+ size_t base)
+{
+ struct kvec iov[2] = {
+ [0] = {
+ .iov_base = &marker,
+ .iov_len = sizeof(marker)
+ },
+ [1] = *vec,
+ };
+ size_t len = iov[0].iov_len + iov[1].iov_len;
+
+ iov_iter_kvec(&msg->msg_iter, WRITE, iov, 2, len);
+ return xprt_sendmsg(sock, msg, base);
+}
+
+/**
+ * xprt_sock_sendmsg - write an xdr_buf directly to a socket
+ * @sock: open socket to send on
+ * @msg: socket message metadata
+ * @xdr: xdr_buf containing this request
+ * @base: starting position in the buffer
+ * @marker: stream record marker field
+ * @sent_p: return the total number of bytes successfully queued for sending
+ *
+ * Return values:
+ * On success, returns zero and fills in @sent_p.
+ * %-ENOTSOCK if @sock is not a struct socket.
+ */
+int xprt_sock_sendmsg(struct socket *sock, struct msghdr *msg,
+ struct xdr_buf *xdr, unsigned int base,
+ rpc_fraghdr marker, unsigned int *sent_p)
+{
+ unsigned int rmsize = marker ? sizeof(marker) : 0;
+ unsigned int remainder = rmsize + xdr->len - base;
+ unsigned int want;
+ int err = 0;
+
+ *sent_p = 0;
+
+ if (unlikely(!sock))
+ return -ENOTSOCK;
+
+ msg->msg_flags |= MSG_MORE;
+ want = xdr->head[0].iov_len + rmsize;
+ if (base < want) {
+ unsigned int len = want - base;
+
+ remainder -= len;
+ if (remainder == 0)
+ msg->msg_flags &= ~MSG_MORE;
+ if (rmsize)
+ err = xprt_send_rm_and_kvec(sock, msg, marker,
+ &xdr->head[0], base);
+ else
+ err = xprt_send_kvec(sock, msg, &xdr->head[0], base);
+ if (remainder == 0 || err != len)
+ goto out;
+ *sent_p += err;
+ base = 0;
+ } else {
+ base -= want;
+ }
+
+ if (base < xdr->page_len) {
+ unsigned int len = xdr->page_len - base;
+
+ remainder -= len;
+ if (remainder == 0)
+ msg->msg_flags &= ~MSG_MORE;
+ err = xprt_send_pagedata(sock, msg, xdr, base);
+ if (remainder == 0 || err != len)
+ goto out;
+ *sent_p += err;
+ base = 0;
+ } else {
+ base -= xdr->page_len;
+ }
+
+ if (base >= xdr->tail[0].iov_len)
+ return 0;
+ msg->msg_flags &= ~MSG_MORE;
+ err = xprt_send_kvec(sock, msg, &xdr->tail[0], base);
+out:
+ if (err > 0) {
+ *sent_p += err;
+ err = 0;
+ }
+ return err;
+}
diff --git a/net/sunrpc/socklib.h b/net/sunrpc/socklib.h
new file mode 100644
index 000000000000..c48114ad6f00
--- /dev/null
+++ b/net/sunrpc/socklib.h
@@ -0,0 +1,15 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+/*
+ * Copyright (C) 1995-1997 Olaf Kirch <okir@monad.swb.de>
+ * Copyright (C) 2020, Oracle.
+ */
+
+#ifndef _NET_SUNRPC_SOCKLIB_H_
+#define _NET_SUNRPC_SOCKLIB_H_
+
+int csum_partial_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb);
+int xprt_sock_sendmsg(struct socket *sock, struct msghdr *msg,
+ struct xdr_buf *xdr, unsigned int base,
+ rpc_fraghdr marker, unsigned int *sent_p);
+
+#endif /* _NET_SUNRPC_SOCKLIB_H_ */
diff --git a/net/sunrpc/sunrpc.h b/net/sunrpc/sunrpc.h
index c9bacb3c930f..47a756503d11 100644
--- a/net/sunrpc/sunrpc.h
+++ b/net/sunrpc/sunrpc.h
@@ -50,10 +50,6 @@ static inline int sock_is_loopback(struct sock *sk)
return loopback;
}
-int svc_send_common(struct socket *sock, struct xdr_buf *xdr,
- struct page *headpage, unsigned long headoffset,
- struct page *tailpage, unsigned long tailoffset);
-
int rpc_clients_notifier_register(void);
void rpc_clients_notifier_unregister(void);
#endif /* _NET_SUNRPC_SUNRPC_H */
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index 187dd4e73d64..9ed3126600ce 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -1529,10 +1529,6 @@ svc_process(struct svc_rqst *rqstp)
goto out_drop;
}
- /* Reserve space for the record marker */
- if (rqstp->rq_prot == IPPROTO_TCP)
- svc_putnl(resv, 0);
-
/* Returns 1 for send, 0 for drop */
if (likely(svc_process_common(rqstp, argv, resv)))
return svc_send(rqstp);
@@ -1637,6 +1633,22 @@ u32 svc_max_payload(const struct svc_rqst *rqstp)
EXPORT_SYMBOL_GPL(svc_max_payload);
/**
+ * svc_encode_read_payload - mark a range of bytes as a READ payload
+ * @rqstp: svc_rqst to operate on
+ * @offset: payload's byte offset in rqstp->rq_res
+ * @length: size of payload, in bytes
+ *
+ * Returns zero on success, or a negative errno if a permanent
+ * error occurred.
+ */
+int svc_encode_read_payload(struct svc_rqst *rqstp, unsigned int offset,
+ unsigned int length)
+{
+ return rqstp->rq_xprt->xpt_ops->xpo_read_payload(rqstp, offset, length);
+}
+EXPORT_SYMBOL_GPL(svc_encode_read_payload);
+
+/**
* svc_fill_write_vector - Construct data argument for VFS write call
* @rqstp: svc_rqst to operate on
* @pages: list of pages containing data payload
diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c
index de3c077733a7..e27e3532ec75 100644
--- a/net/sunrpc/svc_xprt.c
+++ b/net/sunrpc/svc_xprt.c
@@ -104,8 +104,17 @@ void svc_unreg_xprt_class(struct svc_xprt_class *xcl)
}
EXPORT_SYMBOL_GPL(svc_unreg_xprt_class);
-/*
- * Format the transport list for printing
+/**
+ * svc_print_xprts - Format the transport list for printing
+ * @buf: target buffer for formatted address
+ * @maxlen: length of target buffer
+ *
+ * Fills in @buf with a string containing a list of transport names, each name
+ * terminated with '\n'. If the buffer is too small, some entries may be
+ * missing, but it is guaranteed that all lines in the output buffer are
+ * complete.
+ *
+ * Returns positive length of the filled-in string.
*/
int svc_print_xprts(char *buf, int maxlen)
{
@@ -118,9 +127,9 @@ int svc_print_xprts(char *buf, int maxlen)
list_for_each_entry(xcl, &svc_xprt_class_list, xcl_list) {
int slen;
- sprintf(tmpstr, "%s %d\n", xcl->xcl_name, xcl->xcl_max_payload);
- slen = strlen(tmpstr);
- if (len + slen > maxlen)
+ slen = snprintf(tmpstr, sizeof(tmpstr), "%s %d\n",
+ xcl->xcl_name, xcl->xcl_max_payload);
+ if (slen >= sizeof(tmpstr) || len + slen >= maxlen)
break;
len += slen;
strcat(buf, tmpstr);
@@ -802,6 +811,8 @@ static int svc_handle_xprt(struct svc_rqst *rqstp, struct svc_xprt *xprt)
len = svc_deferred_recv(rqstp);
else
len = xprt->xpt_ops->xpo_recvfrom(rqstp);
+ if (len > 0)
+ trace_svc_recvfrom(&rqstp->rq_arg);
rqstp->rq_stime = ktime_get();
rqstp->rq_reserved = serv->sv_max_mesg;
atomic_add(rqstp->rq_reserved, &xprt->xpt_reserved);
@@ -905,6 +916,7 @@ int svc_send(struct svc_rqst *rqstp)
xb->len = xb->head[0].iov_len +
xb->page_len +
xb->tail[0].iov_len;
+ trace_svc_sendto(xb);
/* Grab mutex to serialize outgoing data. */
mutex_lock(&xprt->xpt_mutex);
diff --git a/net/sunrpc/svcauth_unix.c b/net/sunrpc/svcauth_unix.c
index 04aa80a2d752..6c8f802c4261 100644
--- a/net/sunrpc/svcauth_unix.c
+++ b/net/sunrpc/svcauth_unix.c
@@ -148,6 +148,11 @@ static struct cache_head *ip_map_alloc(void)
return NULL;
}
+static int ip_map_upcall(struct cache_detail *cd, struct cache_head *h)
+{
+ return sunrpc_cache_pipe_upcall(cd, h);
+}
+
static void ip_map_request(struct cache_detail *cd,
struct cache_head *h,
char **bpp, int *blen)
@@ -467,6 +472,11 @@ static struct cache_head *unix_gid_alloc(void)
return NULL;
}
+static int unix_gid_upcall(struct cache_detail *cd, struct cache_head *h)
+{
+ return sunrpc_cache_pipe_upcall_timeout(cd, h);
+}
+
static void unix_gid_request(struct cache_detail *cd,
struct cache_head *h,
char **bpp, int *blen)
@@ -584,6 +594,7 @@ static const struct cache_detail unix_gid_cache_template = {
.hash_size = GID_HASHMAX,
.name = "auth.unix.gid",
.cache_put = unix_gid_put,
+ .cache_upcall = unix_gid_upcall,
.cache_request = unix_gid_request,
.cache_parse = unix_gid_parse,
.cache_show = unix_gid_show,
@@ -881,6 +892,7 @@ static const struct cache_detail ip_map_cache_template = {
.hash_size = IP_HASHMAX,
.name = "auth.unix.ip",
.cache_put = ip_map_put,
+ .cache_upcall = ip_map_upcall,
.cache_request = ip_map_request,
.cache_parse = ip_map_parse,
.cache_show = ip_map_show,
diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
index 2934dd711715..519cf9c4f8fd 100644
--- a/net/sunrpc/svcsock.c
+++ b/net/sunrpc/svcsock.c
@@ -55,6 +55,7 @@
#include <linux/sunrpc/stats.h>
#include <linux/sunrpc/xprt.h>
+#include "socklib.h"
#include "sunrpc.h"
#define RPCDBG_FACILITY RPCDBG_SVCXPRT
@@ -174,109 +175,10 @@ static void svc_set_cmsg_data(struct svc_rqst *rqstp, struct cmsghdr *cmh)
}
}
-/*
- * send routine intended to be shared by the fore- and back-channel
- */
-int svc_send_common(struct socket *sock, struct xdr_buf *xdr,
- struct page *headpage, unsigned long headoffset,
- struct page *tailpage, unsigned long tailoffset)
-{
- int result;
- int size;
- struct page **ppage = xdr->pages;
- size_t base = xdr->page_base;
- unsigned int pglen = xdr->page_len;
- unsigned int flags = MSG_MORE | MSG_SENDPAGE_NOTLAST;
- int slen;
- int len = 0;
-
- slen = xdr->len;
-
- /* send head */
- if (slen == xdr->head[0].iov_len)
- flags = 0;
- len = kernel_sendpage(sock, headpage, headoffset,
- xdr->head[0].iov_len, flags);
- if (len != xdr->head[0].iov_len)
- goto out;
- slen -= xdr->head[0].iov_len;
- if (slen == 0)
- goto out;
-
- /* send page data */
- size = PAGE_SIZE - base < pglen ? PAGE_SIZE - base : pglen;
- while (pglen > 0) {
- if (slen == size)
- flags = 0;
- result = kernel_sendpage(sock, *ppage, base, size, flags);
- if (result > 0)
- len += result;
- if (result != size)
- goto out;
- slen -= size;
- pglen -= size;
- size = PAGE_SIZE < pglen ? PAGE_SIZE : pglen;
- base = 0;
- ppage++;
- }
-
- /* send tail */
- if (xdr->tail[0].iov_len) {
- result = kernel_sendpage(sock, tailpage, tailoffset,
- xdr->tail[0].iov_len, 0);
- if (result > 0)
- len += result;
- }
-
-out:
- return len;
-}
-
-
-/*
- * Generic sendto routine
- */
-static int svc_sendto(struct svc_rqst *rqstp, struct xdr_buf *xdr)
+static int svc_sock_read_payload(struct svc_rqst *rqstp, unsigned int offset,
+ unsigned int length)
{
- struct svc_sock *svsk =
- container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt);
- struct socket *sock = svsk->sk_sock;
- union {
- struct cmsghdr hdr;
- long all[SVC_PKTINFO_SPACE / sizeof(long)];
- } buffer;
- struct cmsghdr *cmh = &buffer.hdr;
- int len = 0;
- unsigned long tailoff;
- unsigned long headoff;
- RPC_IFDEBUG(char buf[RPC_MAX_ADDRBUFLEN]);
-
- if (rqstp->rq_prot == IPPROTO_UDP) {
- struct msghdr msg = {
- .msg_name = &rqstp->rq_addr,
- .msg_namelen = rqstp->rq_addrlen,
- .msg_control = cmh,
- .msg_controllen = sizeof(buffer),
- .msg_flags = MSG_MORE,
- };
-
- svc_set_cmsg_data(rqstp, cmh);
-
- if (sock_sendmsg(sock, &msg) < 0)
- goto out;
- }
-
- tailoff = ((unsigned long)xdr->tail[0].iov_base) & (PAGE_SIZE-1);
- headoff = 0;
- len = svc_send_common(sock, xdr, rqstp->rq_respages[0], headoff,
- rqstp->rq_respages[0], tailoff);
-
-out:
- dprintk("svc: socket %p sendto([%p %zu... ], %d) = %d (addr %s)\n",
- svsk, xdr->head[0].iov_base, xdr->head[0].iov_len,
- xdr->len, len, svc_print_addr(rqstp, buf, sizeof(buf)));
-
- return len;
+ return 0;
}
/*
@@ -600,17 +502,43 @@ out_free:
return 0;
}
-static int
-svc_udp_sendto(struct svc_rqst *rqstp)
+/**
+ * svc_udp_sendto - Send out a reply on a UDP socket
+ * @rqstp: completed svc_rqst
+ *
+ * Returns the number of bytes sent, or a negative errno.
+ */
+static int svc_udp_sendto(struct svc_rqst *rqstp)
{
- int error;
+ struct svc_xprt *xprt = rqstp->rq_xprt;
+ struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt);
+ struct xdr_buf *xdr = &rqstp->rq_res;
+ union {
+ struct cmsghdr hdr;
+ long all[SVC_PKTINFO_SPACE / sizeof(long)];
+ } buffer;
+ struct cmsghdr *cmh = &buffer.hdr;
+ struct msghdr msg = {
+ .msg_name = &rqstp->rq_addr,
+ .msg_namelen = rqstp->rq_addrlen,
+ .msg_control = cmh,
+ .msg_controllen = sizeof(buffer),
+ };
+ unsigned int uninitialized_var(sent);
+ int err;
- error = svc_sendto(rqstp, &rqstp->rq_res);
- if (error == -ECONNREFUSED)
- /* ICMP error on earlier request. */
- error = svc_sendto(rqstp, &rqstp->rq_res);
+ svc_set_cmsg_data(rqstp, cmh);
- return error;
+ err = xprt_sock_sendmsg(svsk->sk_sock, &msg, xdr, 0, 0, &sent);
+ xdr_free_bvec(xdr);
+ if (err == -ECONNREFUSED) {
+ /* ICMP error on earlier request. */
+ err = xprt_sock_sendmsg(svsk->sk_sock, &msg, xdr, 0, 0, &sent);
+ xdr_free_bvec(xdr);
+ }
+ if (err < 0)
+ return err;
+ return sent;
}
static int svc_udp_has_wspace(struct svc_xprt *xprt)
@@ -653,6 +581,7 @@ static const struct svc_xprt_ops svc_udp_ops = {
.xpo_create = svc_udp_create,
.xpo_recvfrom = svc_udp_recvfrom,
.xpo_sendto = svc_udp_sendto,
+ .xpo_read_payload = svc_sock_read_payload,
.xpo_release_rqst = svc_release_udp_skb,
.xpo_detach = svc_sock_detach,
.xpo_free = svc_sock_free,
@@ -1128,35 +1057,39 @@ err_noclose:
return 0; /* record not complete */
}
-/*
- * Send out data on TCP socket.
+/**
+ * svc_tcp_sendto - Send out a reply on a TCP socket
+ * @rqstp: completed svc_rqst
+ *
+ * Returns the number of bytes sent, or a negative errno.
*/
static int svc_tcp_sendto(struct svc_rqst *rqstp)
{
- struct xdr_buf *xbufp = &rqstp->rq_res;
- int sent;
- __be32 reclen;
+ struct svc_xprt *xprt = rqstp->rq_xprt;
+ struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt);
+ struct xdr_buf *xdr = &rqstp->rq_res;
+ rpc_fraghdr marker = cpu_to_be32(RPC_LAST_STREAM_FRAGMENT |
+ (u32)xdr->len);
+ struct msghdr msg = {
+ .msg_flags = 0,
+ };
+ unsigned int uninitialized_var(sent);
+ int err;
- /* Set up the first element of the reply kvec.
- * Any other kvecs that may be in use have been taken
- * care of by the server implementation itself.
- */
- reclen = htonl(0x80000000|((xbufp->len ) - 4));
- memcpy(xbufp->head[0].iov_base, &reclen, 4);
-
- sent = svc_sendto(rqstp, &rqstp->rq_res);
- if (sent != xbufp->len) {
- printk(KERN_NOTICE
- "rpc-srv/tcp: %s: %s %d when sending %d bytes "
- "- shutting down socket\n",
- rqstp->rq_xprt->xpt_server->sv_name,
- (sent<0)?"got error":"sent only",
- sent, xbufp->len);
- set_bit(XPT_CLOSE, &rqstp->rq_xprt->xpt_flags);
- svc_xprt_enqueue(rqstp->rq_xprt);
- sent = -EAGAIN;
- }
+ err = xprt_sock_sendmsg(svsk->sk_sock, &msg, xdr, 0, marker, &sent);
+ xdr_free_bvec(xdr);
+ if (err < 0 || sent != (xdr->len + sizeof(marker)))
+ goto out_close;
return sent;
+
+out_close:
+ pr_notice("rpc-srv/tcp: %s: %s %d when sending %d bytes - shutting down socket\n",
+ xprt->xpt_server->sv_name,
+ (err < 0) ? "got error" : "sent",
+ (err < 0) ? err : sent, xdr->len);
+ set_bit(XPT_CLOSE, &xprt->xpt_flags);
+ svc_xprt_enqueue(xprt);
+ return -EAGAIN;
}
static struct svc_xprt *svc_tcp_create(struct svc_serv *serv,
@@ -1171,6 +1104,7 @@ static const struct svc_xprt_ops svc_tcp_ops = {
.xpo_create = svc_tcp_create,
.xpo_recvfrom = svc_tcp_recvfrom,
.xpo_sendto = svc_tcp_sendto,
+ .xpo_read_payload = svc_sock_read_payload,
.xpo_release_rqst = svc_release_skb,
.xpo_detach = svc_tcp_sock_detach,
.xpo_free = svc_sock_free,
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 1aafe8d3f3f4..493a30a296fc 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -1117,8 +1117,6 @@ void xprt_complete_rqst(struct rpc_task *task, int copied)
struct rpc_rqst *req = task->tk_rqstp;
struct rpc_xprt *xprt = req->rq_xprt;
- dprintk("RPC: %5u xid %08x complete (%d bytes received)\n",
- task->tk_pid, ntohl(req->rq_xid), copied);
trace_xprt_complete_rqst(xprt, req->rq_xid, copied);
xprt->stat.recvs++;
@@ -1462,6 +1460,7 @@ xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task)
*/
req->rq_ntrans++;
+ trace_xprt_sendto(&req->rq_snd_buf);
connect_cookie = xprt->connect_cookie;
status = xprt->ops->send_request(req);
if (status != 0) {
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 28020ec104d4..577513b7642e 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -275,32 +275,6 @@ out:
return n;
}
-static inline int
-encode_item_present(struct xdr_stream *xdr)
-{
- __be32 *p;
-
- p = xdr_reserve_space(xdr, sizeof(*p));
- if (unlikely(!p))
- return -EMSGSIZE;
-
- *p = xdr_one;
- return 0;
-}
-
-static inline int
-encode_item_not_present(struct xdr_stream *xdr)
-{
- __be32 *p;
-
- p = xdr_reserve_space(xdr, sizeof(*p));
- if (unlikely(!p))
- return -EMSGSIZE;
-
- *p = xdr_zero;
- return 0;
-}
-
static void
xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mr *mr)
{
@@ -414,7 +388,7 @@ static int rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
} while (nsegs);
done:
- return encode_item_not_present(xdr);
+ return xdr_stream_encode_item_absent(xdr);
}
/* Register and XDR encode the Write list. Supports encoding a list
@@ -453,7 +427,7 @@ static int rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt,
if (nsegs < 0)
return nsegs;
- if (encode_item_present(xdr) < 0)
+ if (xdr_stream_encode_item_present(xdr) < 0)
return -EMSGSIZE;
segcount = xdr_reserve_space(xdr, sizeof(*segcount));
if (unlikely(!segcount))
@@ -480,7 +454,7 @@ static int rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt,
*segcount = cpu_to_be32(nchunks);
done:
- return encode_item_not_present(xdr);
+ return xdr_stream_encode_item_absent(xdr);
}
/* Register and XDR encode the Reply chunk. Supports encoding an array
@@ -507,14 +481,14 @@ static int rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
__be32 *segcount;
if (wtype != rpcrdma_replych)
- return encode_item_not_present(xdr);
+ return xdr_stream_encode_item_absent(xdr);
seg = req->rl_segments;
nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg);
if (nsegs < 0)
return nsegs;
- if (encode_item_present(xdr) < 0)
+ if (xdr_stream_encode_item_present(xdr) < 0)
return -EMSGSIZE;
segcount = xdr_reserve_space(xdr, sizeof(*segcount));
if (unlikely(!segcount))
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index 908e78bb87c6..d510a3a15d4b 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -117,7 +117,7 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
{
int ret;
- ret = svc_rdma_map_reply_msg(rdma, ctxt, &rqst->rq_snd_buf, NULL);
+ ret = svc_rdma_map_reply_msg(rdma, ctxt, NULL, &rqst->rq_snd_buf);
if (ret < 0)
return -EIO;
@@ -181,7 +181,9 @@ rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst)
if (!ctxt)
goto drop_connection;
- p = ctxt->sc_xprt_buf;
+ p = xdr_reserve_space(&ctxt->sc_stream, RPCRDMA_HDRLEN_MIN);
+ if (!p)
+ goto put_ctxt;
*p++ = rqst->rq_xid;
*p++ = rpcrdma_version;
*p++ = cpu_to_be32(r_xprt->rx_buf.rb_bc_max_requests);
@@ -189,7 +191,6 @@ rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst)
*p++ = xdr_zero;
*p++ = xdr_zero;
*p = xdr_zero;
- svc_rdma_sync_reply_hdr(rdma, ctxt, RPCRDMA_HDRLEN_MIN);
#ifdef SVCRDMA_BACKCHANNEL_DEBUG
pr_info("%s: %*ph\n", __func__, 64, rqst->rq_buffer);
@@ -197,12 +198,13 @@ rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst)
rqst->rq_xtime = ktime_get();
rc = svc_rdma_bc_sendto(rdma, rqst, ctxt);
- if (rc) {
- svc_rdma_send_ctxt_put(rdma, ctxt);
- goto drop_connection;
- }
+ if (rc)
+ goto put_ctxt;
return 0;
+put_ctxt:
+ svc_rdma_send_ctxt_put(rdma, ctxt);
+
drop_connection:
dprintk("svcrdma: failed to send bc call\n");
return -ENOTCONN;
@@ -250,6 +252,7 @@ xprt_rdma_bc_put(struct rpc_xprt *xprt)
{
dprintk("svcrdma: %s: xprt %p\n", __func__, xprt);
+ xprt_rdma_free_addresses(xprt);
xprt_free(xprt);
}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 96bccd398469..54469b72b25f 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -193,6 +193,7 @@ svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma)
out:
ctxt->rc_page_count = 0;
+ ctxt->rc_read_payload_length = 0;
return ctxt;
out_empty:
@@ -357,15 +358,14 @@ static void svc_rdma_build_arg_xdr(struct svc_rqst *rqstp,
arg->len = ctxt->rc_byte_len;
}
-/* This accommodates the largest possible Write chunk,
- * in one segment.
+/* This accommodates the largest possible Write chunk.
*/
-#define MAX_BYTES_WRITE_SEG ((u32)(RPCSVC_MAXPAGES << PAGE_SHIFT))
+#define MAX_BYTES_WRITE_CHUNK ((u32)(RPCSVC_MAXPAGES << PAGE_SHIFT))
/* This accommodates the largest possible Position-Zero
- * Read chunk or Reply chunk, in one segment.
+ * Read chunk or Reply chunk.
*/
-#define MAX_BYTES_SPECIAL_SEG ((u32)((RPCSVC_MAXPAGES + 2) << PAGE_SHIFT))
+#define MAX_BYTES_SPECIAL_CHUNK ((u32)((RPCSVC_MAXPAGES + 2) << PAGE_SHIFT))
/* Sanity check the Read list.
*
@@ -373,7 +373,7 @@ static void svc_rdma_build_arg_xdr(struct svc_rqst *rqstp,
* - This implementation supports only one Read chunk.
*
* Sanity checks:
- * - Read list does not overflow buffer.
+ * - Read list does not overflow Receive buffer.
* - Segment size limited by largest NFS data payload.
*
* The segment count is limited to how many segments can
@@ -381,30 +381,44 @@ static void svc_rdma_build_arg_xdr(struct svc_rqst *rqstp,
* buffer. That's about 40 Read segments for a 1KB inline
* threshold.
*
- * Returns pointer to the following Write list.
+ * Return values:
+ * %true: Read list is valid. @rctxt's xdr_stream is updated
+ * to point to the first byte past the Read list.
+ * %false: Read list is corrupt. @rctxt's xdr_stream is left
+ * in an unknown state.
*/
-static __be32 *xdr_check_read_list(__be32 *p, const __be32 *end)
+static bool xdr_check_read_list(struct svc_rdma_recv_ctxt *rctxt)
{
- u32 position;
+ u32 position, len;
bool first;
+ __be32 *p;
+
+ p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
+ if (!p)
+ return false;
+ len = 0;
first = true;
- while (*p++ != xdr_zero) {
+ while (*p != xdr_zero) {
+ p = xdr_inline_decode(&rctxt->rc_stream,
+ rpcrdma_readseg_maxsz * sizeof(*p));
+ if (!p)
+ return false;
+
if (first) {
- position = be32_to_cpup(p++);
+ position = be32_to_cpup(p);
first = false;
- } else if (be32_to_cpup(p++) != position) {
- return NULL;
+ } else if (be32_to_cpup(p) != position) {
+ return false;
}
- p++; /* handle */
- if (be32_to_cpup(p++) > MAX_BYTES_SPECIAL_SEG)
- return NULL;
- p += 2; /* offset */
+ p += 2;
+ len += be32_to_cpup(p);
- if (p > end)
- return NULL;
+ p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
+ if (!p)
+ return false;
}
- return p;
+ return len <= MAX_BYTES_SPECIAL_CHUNK;
}
/* The segment count is limited to how many segments can
@@ -412,67 +426,100 @@ static __be32 *xdr_check_read_list(__be32 *p, const __be32 *end)
* buffer. That's about 60 Write segments for a 1KB inline
* threshold.
*/
-static __be32 *xdr_check_write_chunk(__be32 *p, const __be32 *end,
- u32 maxlen)
+static bool xdr_check_write_chunk(struct svc_rdma_recv_ctxt *rctxt, u32 maxlen)
{
- u32 i, segcount;
+ u32 i, segcount, total;
+ __be32 *p;
+
+ p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
+ if (!p)
+ return false;
+ segcount = be32_to_cpup(p);
- segcount = be32_to_cpup(p++);
+ total = 0;
for (i = 0; i < segcount; i++) {
- p++; /* handle */
- if (be32_to_cpup(p++) > maxlen)
- return NULL;
- p += 2; /* offset */
+ u32 handle, length;
+ u64 offset;
- if (p > end)
- return NULL;
- }
+ p = xdr_inline_decode(&rctxt->rc_stream,
+ rpcrdma_segment_maxsz * sizeof(*p));
+ if (!p)
+ return false;
+
+ handle = be32_to_cpup(p++);
+ length = be32_to_cpup(p++);
+ xdr_decode_hyper(p, &offset);
+ trace_svcrdma_decode_wseg(handle, length, offset);
- return p;
+ total += length;
+ }
+ return total <= maxlen;
}
/* Sanity check the Write list.
*
* Implementation limits:
- * - This implementation supports only one Write chunk.
+ * - This implementation currently supports only one Write chunk.
*
* Sanity checks:
- * - Write list does not overflow buffer.
- * - Segment size limited by largest NFS data payload.
- *
- * Returns pointer to the following Reply chunk.
+ * - Write list does not overflow Receive buffer.
+ * - Chunk size limited by largest NFS data payload.
+ *
+ * Return values:
+ * %true: Write list is valid. @rctxt's xdr_stream is updated
+ * to point to the first byte past the Write list.
+ * %false: Write list is corrupt. @rctxt's xdr_stream is left
+ * in an unknown state.
*/
-static __be32 *xdr_check_write_list(__be32 *p, const __be32 *end)
+static bool xdr_check_write_list(struct svc_rdma_recv_ctxt *rctxt)
{
- u32 chcount;
+ u32 chcount = 0;
+ __be32 *p;
- chcount = 0;
- while (*p++ != xdr_zero) {
- p = xdr_check_write_chunk(p, end, MAX_BYTES_WRITE_SEG);
+ p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
+ if (!p)
+ return false;
+ rctxt->rc_write_list = p;
+ while (*p != xdr_zero) {
+ if (!xdr_check_write_chunk(rctxt, MAX_BYTES_WRITE_CHUNK))
+ return false;
+ ++chcount;
+ p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
if (!p)
- return NULL;
- if (chcount++ > 1)
- return NULL;
+ return false;
}
- return p;
+ if (!chcount)
+ rctxt->rc_write_list = NULL;
+ return chcount < 2;
}
/* Sanity check the Reply chunk.
*
* Sanity checks:
- * - Reply chunk does not overflow buffer.
- * - Segment size limited by largest NFS data payload.
- *
- * Returns pointer to the following RPC header.
+ * - Reply chunk does not overflow Receive buffer.
+ * - Chunk size limited by largest NFS data payload.
+ *
+ * Return values:
+ * %true: Reply chunk is valid. @rctxt's xdr_stream is updated
+ * to point to the first byte past the Reply chunk.
+ * %false: Reply chunk is corrupt. @rctxt's xdr_stream is left
+ * in an unknown state.
*/
-static __be32 *xdr_check_reply_chunk(__be32 *p, const __be32 *end)
+static bool xdr_check_reply_chunk(struct svc_rdma_recv_ctxt *rctxt)
{
- if (*p++ != xdr_zero) {
- p = xdr_check_write_chunk(p, end, MAX_BYTES_SPECIAL_SEG);
- if (!p)
- return NULL;
+ __be32 *p;
+
+ p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
+ if (!p)
+ return false;
+ rctxt->rc_reply_chunk = p;
+ if (*p != xdr_zero) {
+ if (!xdr_check_write_chunk(rctxt, MAX_BYTES_SPECIAL_CHUNK))
+ return false;
+ } else {
+ rctxt->rc_reply_chunk = NULL;
}
- return p;
+ return true;
}
/* RPC-over-RDMA Version One private extension: Remote Invalidation.
@@ -537,60 +584,61 @@ static void svc_rdma_get_inv_rkey(struct svcxprt_rdma *rdma,
ctxt->rc_inv_rkey = be32_to_cpu(inv_rkey);
}
-/* On entry, xdr->head[0].iov_base points to first byte in the
- * RPC-over-RDMA header.
+/**
+ * svc_rdma_xdr_decode_req - Decode the transport header
+ * @rq_arg: xdr_buf containing ingress RPC/RDMA message
+ * @rctxt: state of decoding
+ *
+ * On entry, xdr->head[0].iov_base points to first byte of the
+ * RPC-over-RDMA transport header.
*
* On successful exit, head[0] points to first byte past the
* RPC-over-RDMA header. For RDMA_MSG, this is the RPC message.
+ *
* The length of the RPC-over-RDMA header is returned.
*
* Assumptions:
* - The transport header is entirely contained in the head iovec.
*/
-static int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg)
+static int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg,
+ struct svc_rdma_recv_ctxt *rctxt)
{
- __be32 *p, *end, *rdma_argp;
+ __be32 *p, *rdma_argp;
unsigned int hdr_len;
- /* Verify that there's enough bytes for header + something */
- if (rq_arg->len <= RPCRDMA_HDRLEN_ERR)
- goto out_short;
-
rdma_argp = rq_arg->head[0].iov_base;
- if (*(rdma_argp + 1) != rpcrdma_version)
- goto out_version;
+ xdr_init_decode(&rctxt->rc_stream, rq_arg, rdma_argp, NULL);
- switch (*(rdma_argp + 3)) {
+ p = xdr_inline_decode(&rctxt->rc_stream,
+ rpcrdma_fixed_maxsz * sizeof(*p));
+ if (unlikely(!p))
+ goto out_short;
+ p++;
+ if (*p != rpcrdma_version)
+ goto out_version;
+ p += 2;
+ switch (*p) {
case rdma_msg:
break;
case rdma_nomsg:
break;
-
case rdma_done:
goto out_drop;
-
case rdma_error:
goto out_drop;
-
default:
goto out_proc;
}
- end = (__be32 *)((unsigned long)rdma_argp + rq_arg->len);
- p = xdr_check_read_list(rdma_argp + 4, end);
- if (!p)
- goto out_inval;
- p = xdr_check_write_list(p, end);
- if (!p)
+ if (!xdr_check_read_list(rctxt))
goto out_inval;
- p = xdr_check_reply_chunk(p, end);
- if (!p)
+ if (!xdr_check_write_list(rctxt))
goto out_inval;
- if (p > end)
+ if (!xdr_check_reply_chunk(rctxt))
goto out_inval;
- rq_arg->head[0].iov_base = p;
- hdr_len = (unsigned long)p - (unsigned long)rdma_argp;
+ rq_arg->head[0].iov_base = rctxt->rc_stream.p;
+ hdr_len = xdr_stream_pos(&rctxt->rc_stream);
rq_arg->head[0].iov_len -= hdr_len;
rq_arg->len -= hdr_len;
trace_svcrdma_decode_rqst(rdma_argp, hdr_len);
@@ -650,7 +698,6 @@ static void svc_rdma_send_error(struct svcxprt_rdma *xprt,
__be32 *rdma_argp, int status)
{
struct svc_rdma_send_ctxt *ctxt;
- unsigned int length;
__be32 *p;
int ret;
@@ -658,29 +705,46 @@ static void svc_rdma_send_error(struct svcxprt_rdma *xprt,
if (!ctxt)
return;
- p = ctxt->sc_xprt_buf;
+ p = xdr_reserve_space(&ctxt->sc_stream,
+ rpcrdma_fixed_maxsz * sizeof(*p));
+ if (!p)
+ goto put_ctxt;
+
*p++ = *rdma_argp;
*p++ = *(rdma_argp + 1);
*p++ = xprt->sc_fc_credits;
- *p++ = rdma_error;
+ *p = rdma_error;
+
switch (status) {
case -EPROTONOSUPPORT:
+ p = xdr_reserve_space(&ctxt->sc_stream, 3 * sizeof(*p));
+ if (!p)
+ goto put_ctxt;
+
*p++ = err_vers;
*p++ = rpcrdma_version;
- *p++ = rpcrdma_version;
+ *p = rpcrdma_version;
trace_svcrdma_err_vers(*rdma_argp);
break;
default:
- *p++ = err_chunk;
+ p = xdr_reserve_space(&ctxt->sc_stream, sizeof(*p));
+ if (!p)
+ goto put_ctxt;
+
+ *p = err_chunk;
trace_svcrdma_err_chunk(*rdma_argp);
}
- length = (unsigned long)p - (unsigned long)ctxt->sc_xprt_buf;
- svc_rdma_sync_reply_hdr(xprt, ctxt, length);
+ ctxt->sc_send_wr.num_sge = 1;
ctxt->sc_send_wr.opcode = IB_WR_SEND;
+ ctxt->sc_sges[0].length = ctxt->sc_hdrbuf.len;
ret = svc_rdma_send(xprt, &ctxt->sc_send_wr);
if (ret)
- svc_rdma_send_ctxt_put(xprt, ctxt);
+ goto put_ctxt;
+ return;
+
+put_ctxt:
+ svc_rdma_send_ctxt_put(xprt, ctxt);
}
/* By convention, backchannel calls arrive via rdma_msg type
@@ -785,7 +849,7 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
rqstp->rq_next_page = rqstp->rq_respages;
p = (__be32 *)rqstp->rq_arg.head[0].iov_base;
- ret = svc_rdma_xdr_decode_req(&rqstp->rq_arg);
+ ret = svc_rdma_xdr_decode_req(&rqstp->rq_arg, ctxt);
if (ret < 0)
goto out_err;
if (ret == 0)
diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
index 48fe3b16b0d9..bd7c195d872e 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_rw.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -41,7 +41,7 @@ struct svc_rdma_rw_ctxt {
struct rdma_rw_ctx rw_ctx;
int rw_nents;
struct sg_table rw_sg_table;
- struct scatterlist rw_first_sgl[0];
+ struct scatterlist rw_first_sgl[];
};
static inline struct svc_rdma_rw_ctxt *
@@ -439,7 +439,8 @@ svc_rdma_build_writes(struct svc_rdma_write_info *info,
if (ret < 0)
goto out_initerr;
- trace_svcrdma_encode_wseg(seg_handle, write_len, seg_offset);
+ trace_svcrdma_send_wseg(seg_handle, write_len, seg_offset);
+
list_add(&ctxt->rw_list, &cc->cc_rwctxts);
cc->cc_sqecount += ret;
if (write_len == seg_length - info->wi_seg_off) {
@@ -482,18 +483,19 @@ static int svc_rdma_send_xdr_kvec(struct svc_rdma_write_info *info,
vec->iov_len);
}
-/* Send an xdr_buf's page list by itself. A Write chunk is
- * just the page list. a Reply chunk is the head, page list,
- * and tail. This function is shared between the two types
- * of chunk.
+/* Send an xdr_buf's page list by itself. A Write chunk is just
+ * the page list. A Reply chunk is @xdr's head, page list, and
+ * tail. This function is shared between the two types of chunk.
*/
static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info,
- struct xdr_buf *xdr)
+ struct xdr_buf *xdr,
+ unsigned int offset,
+ unsigned long length)
{
info->wi_xdr = xdr;
- info->wi_next_off = 0;
+ info->wi_next_off = offset - xdr->head[0].iov_len;
return svc_rdma_build_writes(info, svc_rdma_pagelist_to_sg,
- xdr->page_len);
+ length);
}
/**
@@ -501,6 +503,8 @@ static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info,
* @rdma: controlling RDMA transport
* @wr_ch: Write chunk provided by client
* @xdr: xdr_buf containing the data payload
+ * @offset: payload's byte offset in @xdr
+ * @length: size of payload, in bytes
*
* Returns a non-negative number of bytes the chunk consumed, or
* %-E2BIG if the payload was larger than the Write chunk,
@@ -510,19 +514,20 @@ static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info,
* %-EIO if rdma_rw initialization failed (DMA mapping, etc).
*/
int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch,
- struct xdr_buf *xdr)
+ struct xdr_buf *xdr,
+ unsigned int offset, unsigned long length)
{
struct svc_rdma_write_info *info;
int ret;
- if (!xdr->page_len)
+ if (!length)
return 0;
info = svc_rdma_write_info_alloc(rdma, wr_ch);
if (!info)
return -ENOMEM;
- ret = svc_rdma_send_xdr_pagelist(info, xdr);
+ ret = svc_rdma_send_xdr_pagelist(info, xdr, offset, length);
if (ret < 0)
goto out_err;
@@ -530,8 +535,8 @@ int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch,
if (ret < 0)
goto out_err;
- trace_svcrdma_encode_write(xdr->page_len);
- return xdr->page_len;
+ trace_svcrdma_send_write_chunk(xdr->page_len);
+ return length;
out_err:
svc_rdma_write_info_free(info);
@@ -541,8 +546,7 @@ out_err:
/**
* svc_rdma_send_reply_chunk - Write all segments in the Reply chunk
* @rdma: controlling RDMA transport
- * @rp_ch: Reply chunk provided by client
- * @writelist: true if client provided a Write list
+ * @rctxt: Write and Reply chunks from client
* @xdr: xdr_buf containing an RPC Reply
*
* Returns a non-negative number of bytes the chunk consumed, or
@@ -552,13 +556,14 @@ out_err:
* %-ENOTCONN if posting failed (connection is lost),
* %-EIO if rdma_rw initialization failed (DMA mapping, etc).
*/
-int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, __be32 *rp_ch,
- bool writelist, struct xdr_buf *xdr)
+int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
+ const struct svc_rdma_recv_ctxt *rctxt,
+ struct xdr_buf *xdr)
{
struct svc_rdma_write_info *info;
int consumed, ret;
- info = svc_rdma_write_info_alloc(rdma, rp_ch);
+ info = svc_rdma_write_info_alloc(rdma, rctxt->rc_reply_chunk);
if (!info)
return -ENOMEM;
@@ -570,8 +575,10 @@ int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, __be32 *rp_ch,
/* Send the page list in the Reply chunk only if the
* client did not provide Write chunks.
*/
- if (!writelist && xdr->page_len) {
- ret = svc_rdma_send_xdr_pagelist(info, xdr);
+ if (!rctxt->rc_write_list && xdr->page_len) {
+ ret = svc_rdma_send_xdr_pagelist(info, xdr,
+ xdr->head[0].iov_len,
+ xdr->page_len);
if (ret < 0)
goto out_err;
consumed += xdr->page_len;
@@ -588,7 +595,7 @@ int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, __be32 *rp_ch,
if (ret < 0)
goto out_err;
- trace_svcrdma_encode_reply(consumed);
+ trace_svcrdma_send_reply_chunk(consumed);
return consumed;
out_err:
@@ -691,7 +698,7 @@ static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp,
if (ret < 0)
break;
- trace_svcrdma_encode_rseg(rs_handle, rs_length, rs_offset);
+ trace_svcrdma_send_rseg(rs_handle, rs_length, rs_offset);
info->ri_chunklen += rs_length;
}
@@ -722,7 +729,7 @@ static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp,
if (ret < 0)
goto out;
- trace_svcrdma_encode_read(info->ri_chunklen, info->ri_position);
+ trace_svcrdma_send_read_chunk(info->ri_chunklen, info->ri_position);
head->rc_hdr_count = 0;
@@ -778,7 +785,7 @@ static int svc_rdma_build_pz_read_chunk(struct svc_rqst *rqstp,
if (ret < 0)
goto out;
- trace_svcrdma_encode_pzr(info->ri_chunklen);
+ trace_svcrdma_send_pzr(info->ri_chunklen);
head->rc_arg.len += info->ri_chunklen;
head->rc_arg.buflen += info->ri_chunklen;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index f3f108090aa4..90cba3058f04 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -151,6 +151,8 @@ svc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma)
ctxt->sc_send_wr.send_flags = IB_SEND_SIGNALED;
ctxt->sc_cqe.done = svc_rdma_wc_send;
ctxt->sc_xprt_buf = buffer;
+ xdr_buf_init(&ctxt->sc_hdrbuf, ctxt->sc_xprt_buf,
+ rdma->sc_max_req_size);
ctxt->sc_sges[0].addr = addr;
for (i = 0; i < rdma->sc_max_send_sges; i++)
@@ -204,6 +206,10 @@ struct svc_rdma_send_ctxt *svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma)
spin_unlock(&rdma->sc_send_lock);
out:
+ rpcrdma_set_xdrlen(&ctxt->sc_hdrbuf, 0);
+ xdr_init_encode(&ctxt->sc_stream, &ctxt->sc_hdrbuf,
+ ctxt->sc_xprt_buf, NULL);
+
ctxt->sc_send_wr.num_sge = 0;
ctxt->sc_cur_sge_no = 0;
ctxt->sc_page_count = 0;
@@ -295,6 +301,12 @@ int svc_rdma_send(struct svcxprt_rdma *rdma, struct ib_send_wr *wr)
might_sleep();
+ /* Sync the transport header buffer */
+ ib_dma_sync_single_for_device(rdma->sc_pd->device,
+ wr->sg_list[0].addr,
+ wr->sg_list[0].length,
+ DMA_TO_DEVICE);
+
/* If the SQ is full, wait until an SQ entry is available */
while (1) {
if ((atomic_dec_return(&rdma->sc_sq_avail) < 0)) {
@@ -322,166 +334,173 @@ int svc_rdma_send(struct svcxprt_rdma *rdma, struct ib_send_wr *wr)
return ret;
}
-static u32 xdr_padsize(u32 len)
+/**
+ * svc_rdma_encode_read_list - Encode RPC Reply's Read chunk list
+ * @sctxt: Send context for the RPC Reply
+ *
+ * Return values:
+ * On success, returns length in bytes of the Reply XDR buffer
+ * that was consumed by the Reply Read list
+ * %-EMSGSIZE on XDR buffer overflow
+ */
+static ssize_t svc_rdma_encode_read_list(struct svc_rdma_send_ctxt *sctxt)
{
- return (len & 3) ? (4 - (len & 3)) : 0;
+ /* RPC-over-RDMA version 1 replies never have a Read list. */
+ return xdr_stream_encode_item_absent(&sctxt->sc_stream);
}
-/* Returns length of transport header, in bytes.
+/**
+ * svc_rdma_encode_write_segment - Encode one Write segment
+ * @src: matching Write chunk in the RPC Call header
+ * @sctxt: Send context for the RPC Reply
+ * @remaining: remaining bytes of the payload left in the Write chunk
+ *
+ * Return values:
+ * On success, returns length in bytes of the Reply XDR buffer
+ * that was consumed by the Write segment
+ * %-EMSGSIZE on XDR buffer overflow
*/
-static unsigned int svc_rdma_reply_hdr_len(__be32 *rdma_resp)
+static ssize_t svc_rdma_encode_write_segment(__be32 *src,
+ struct svc_rdma_send_ctxt *sctxt,
+ unsigned int *remaining)
{
- unsigned int nsegs;
__be32 *p;
-
- p = rdma_resp;
-
- /* RPC-over-RDMA V1 replies never have a Read list. */
- p += rpcrdma_fixed_maxsz + 1;
-
- /* Skip Write list. */
- while (*p++ != xdr_zero) {
- nsegs = be32_to_cpup(p++);
- p += nsegs * rpcrdma_segment_maxsz;
- }
-
- /* Skip Reply chunk. */
- if (*p++ != xdr_zero) {
- nsegs = be32_to_cpup(p++);
- p += nsegs * rpcrdma_segment_maxsz;
+ const size_t len = rpcrdma_segment_maxsz * sizeof(*p);
+ u32 handle, length;
+ u64 offset;
+
+ p = xdr_reserve_space(&sctxt->sc_stream, len);
+ if (!p)
+ return -EMSGSIZE;
+
+ handle = be32_to_cpup(src++);
+ length = be32_to_cpup(src++);
+ xdr_decode_hyper(src, &offset);
+
+ *p++ = cpu_to_be32(handle);
+ if (*remaining < length) {
+ /* segment only partly filled */
+ length = *remaining;
+ *remaining = 0;
+ } else {
+ /* entire segment was consumed */
+ *remaining -= length;
}
+ *p++ = cpu_to_be32(length);
+ xdr_encode_hyper(p, offset);
- return (unsigned long)p - (unsigned long)rdma_resp;
+ trace_svcrdma_encode_wseg(handle, length, offset);
+ return len;
}
-/* One Write chunk is copied from Call transport header to Reply
- * transport header. Each segment's length field is updated to
- * reflect number of bytes consumed in the segment.
- *
- * Returns number of segments in this chunk.
+/**
+ * svc_rdma_encode_write_chunk - Encode one Write chunk
+ * @src: matching Write chunk in the RPC Call header
+ * @sctxt: Send context for the RPC Reply
+ * @remaining: size in bytes of the payload in the Write chunk
+ *
+ * Copy a Write chunk from the Call transport header to the
+ * Reply transport header. Update each segment's length field
+ * to reflect the number of bytes written in that segment.
+ *
+ * Return values:
+ * On success, returns length in bytes of the Reply XDR buffer
+ * that was consumed by the Write chunk
+ * %-EMSGSIZE on XDR buffer overflow
*/
-static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src,
+static ssize_t svc_rdma_encode_write_chunk(__be32 *src,
+ struct svc_rdma_send_ctxt *sctxt,
unsigned int remaining)
{
unsigned int i, nsegs;
- u32 seg_len;
+ ssize_t len, ret;
- /* Write list discriminator */
- *dst++ = *src++;
+ len = 0;
+ trace_svcrdma_encode_write_chunk(remaining);
- /* number of segments in this chunk */
- nsegs = be32_to_cpup(src);
- *dst++ = *src++;
+ src++;
+ ret = xdr_stream_encode_item_present(&sctxt->sc_stream);
+ if (ret < 0)
+ return -EMSGSIZE;
+ len += ret;
- for (i = nsegs; i; i--) {
- /* segment's RDMA handle */
- *dst++ = *src++;
-
- /* bytes returned in this segment */
- seg_len = be32_to_cpu(*src);
- if (remaining >= seg_len) {
- /* entire segment was consumed */
- *dst = *src;
- remaining -= seg_len;
- } else {
- /* segment only partly filled */
- *dst = cpu_to_be32(remaining);
- remaining = 0;
- }
- dst++; src++;
+ nsegs = be32_to_cpup(src++);
+ ret = xdr_stream_encode_u32(&sctxt->sc_stream, nsegs);
+ if (ret < 0)
+ return -EMSGSIZE;
+ len += ret;
- /* segment's RDMA offset */
- *dst++ = *src++;
- *dst++ = *src++;
+ for (i = nsegs; i; i--) {
+ ret = svc_rdma_encode_write_segment(src, sctxt, &remaining);
+ if (ret < 0)
+ return -EMSGSIZE;
+ src += rpcrdma_segment_maxsz;
+ len += ret;
}
- return nsegs;
+ return len;
}
-/* The client provided a Write list in the Call message. Fill in
- * the segments in the first Write chunk in the Reply's transport
+/**
+ * svc_rdma_encode_write_list - Encode RPC Reply's Write chunk list
+ * @rctxt: Reply context with information about the RPC Call
+ * @sctxt: Send context for the RPC Reply
+ * @length: size in bytes of the payload in the first Write chunk
+ *
+ * The client provides a Write chunk list in the Call message. Fill
+ * in the segments in the first Write chunk in the Reply's transport
* header with the number of bytes consumed in each segment.
* Remaining chunks are returned unused.
*
* Assumptions:
* - Client has provided only one Write chunk
- */
-static void svc_rdma_xdr_encode_write_list(__be32 *rdma_resp, __be32 *wr_ch,
- unsigned int consumed)
-{
- unsigned int nsegs;
- __be32 *p, *q;
-
- /* RPC-over-RDMA V1 replies never have a Read list. */
- p = rdma_resp + rpcrdma_fixed_maxsz + 1;
-
- q = wr_ch;
- while (*q != xdr_zero) {
- nsegs = xdr_encode_write_chunk(p, q, consumed);
- q += 2 + nsegs * rpcrdma_segment_maxsz;
- p += 2 + nsegs * rpcrdma_segment_maxsz;
- consumed = 0;
- }
-
- /* Terminate Write list */
- *p++ = xdr_zero;
-
- /* Reply chunk discriminator; may be replaced later */
- *p = xdr_zero;
-}
-
-/* The client provided a Reply chunk in the Call message. Fill in
- * the segments in the Reply chunk in the Reply message with the
- * number of bytes consumed in each segment.
*
- * Assumptions:
- * - Reply can always fit in the provided Reply chunk
+ * Return values:
+ * On success, returns length in bytes of the Reply XDR buffer
+ * that was consumed by the Reply's Write list
+ * %-EMSGSIZE on XDR buffer overflow
*/
-static void svc_rdma_xdr_encode_reply_chunk(__be32 *rdma_resp, __be32 *rp_ch,
- unsigned int consumed)
+static ssize_t
+svc_rdma_encode_write_list(const struct svc_rdma_recv_ctxt *rctxt,
+ struct svc_rdma_send_ctxt *sctxt,
+ unsigned int length)
{
- __be32 *p;
+ ssize_t len, ret;
- /* Find the Reply chunk in the Reply's xprt header.
- * RPC-over-RDMA V1 replies never have a Read list.
- */
- p = rdma_resp + rpcrdma_fixed_maxsz + 1;
+ ret = svc_rdma_encode_write_chunk(rctxt->rc_write_list, sctxt, length);
+ if (ret < 0)
+ return ret;
+ len = ret;
- /* Skip past Write list */
- while (*p++ != xdr_zero)
- p += 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz;
+ /* Terminate the Write list */
+ ret = xdr_stream_encode_item_absent(&sctxt->sc_stream);
+ if (ret < 0)
+ return ret;
- xdr_encode_write_chunk(p, rp_ch, consumed);
+ return len + ret;
}
-/* Parse the RPC Call's transport header.
+/**
+ * svc_rdma_encode_reply_chunk - Encode RPC Reply's Reply chunk
+ * @rctxt: Reply context with information about the RPC Call
+ * @sctxt: Send context for the RPC Reply
+ * @length: size in bytes of the payload in the Reply chunk
+ *
+ * Assumptions:
+ * - Reply can always fit in the client-provided Reply chunk
+ *
+ * Return values:
+ * On success, returns length in bytes of the Reply XDR buffer
+ * that was consumed by the Reply's Reply chunk
+ * %-EMSGSIZE on XDR buffer overflow
*/
-static void svc_rdma_get_write_arrays(__be32 *rdma_argp,
- __be32 **write, __be32 **reply)
+static ssize_t
+svc_rdma_encode_reply_chunk(const struct svc_rdma_recv_ctxt *rctxt,
+ struct svc_rdma_send_ctxt *sctxt,
+ unsigned int length)
{
- __be32 *p;
-
- p = rdma_argp + rpcrdma_fixed_maxsz;
-
- /* Read list */
- while (*p++ != xdr_zero)
- p += 5;
-
- /* Write list */
- if (*p != xdr_zero) {
- *write = p;
- while (*p++ != xdr_zero)
- p += 1 + be32_to_cpu(*p) * 4;
- } else {
- *write = NULL;
- p++;
- }
-
- /* Reply chunk */
- if (*p != xdr_zero)
- *reply = p;
- else
- *reply = NULL;
+ return svc_rdma_encode_write_chunk(rctxt->rc_reply_chunk, sctxt,
+ length);
}
static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
@@ -520,38 +539,36 @@ static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma,
}
/**
- * svc_rdma_sync_reply_hdr - DMA sync the transport header buffer
+ * svc_rdma_pull_up_needed - Determine whether to use pull-up
* @rdma: controlling transport
- * @ctxt: send_ctxt for the Send WR
- * @len: length of transport header
+ * @sctxt: send_ctxt for the Send WR
+ * @rctxt: Write and Reply chunks provided by client
+ * @xdr: xdr_buf containing RPC message to transmit
*
- */
-void svc_rdma_sync_reply_hdr(struct svcxprt_rdma *rdma,
- struct svc_rdma_send_ctxt *ctxt,
- unsigned int len)
-{
- ctxt->sc_sges[0].length = len;
- ctxt->sc_send_wr.num_sge++;
- ib_dma_sync_single_for_device(rdma->sc_pd->device,
- ctxt->sc_sges[0].addr, len,
- DMA_TO_DEVICE);
-}
-
-/* If the xdr_buf has more elements than the device can
- * transmit in a single RDMA Send, then the reply will
- * have to be copied into a bounce buffer.
+ * Returns:
+ * %true if pull-up must be used
+ * %false otherwise
*/
static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma,
- struct xdr_buf *xdr,
- __be32 *wr_lst)
+ struct svc_rdma_send_ctxt *sctxt,
+ const struct svc_rdma_recv_ctxt *rctxt,
+ struct xdr_buf *xdr)
{
int elements;
+ /* For small messages, copying bytes is cheaper than DMA mapping.
+ */
+ if (sctxt->sc_hdrbuf.len + xdr->len < RPCRDMA_PULLUP_THRESH)
+ return true;
+
+ /* Check whether the xdr_buf has more elements than can
+ * fit in a single RDMA Send.
+ */
/* xdr->head */
elements = 1;
/* xdr->pages */
- if (!wr_lst) {
+ if (!rctxt || !rctxt->rc_write_list) {
unsigned int remaining;
unsigned long pageoff;
@@ -573,29 +590,36 @@ static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma,
return elements >= rdma->sc_max_send_sges;
}
-/* The device is not capable of sending the reply directly.
- * Assemble the elements of @xdr into the transport header
- * buffer.
+/**
+ * svc_rdma_pull_up_reply_msg - Copy Reply into a single buffer
+ * @rdma: controlling transport
+ * @sctxt: send_ctxt for the Send WR; xprt hdr is already prepared
+ * @rctxt: Write and Reply chunks provided by client
+ * @xdr: prepared xdr_buf containing RPC message
+ *
+ * The device is not capable of sending the reply directly.
+ * Assemble the elements of @xdr into the transport header buffer.
+ *
+ * Returns zero on success, or a negative errno on failure.
*/
static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma,
- struct svc_rdma_send_ctxt *ctxt,
- struct xdr_buf *xdr, __be32 *wr_lst)
+ struct svc_rdma_send_ctxt *sctxt,
+ const struct svc_rdma_recv_ctxt *rctxt,
+ const struct xdr_buf *xdr)
{
unsigned char *dst, *tailbase;
unsigned int taillen;
- dst = ctxt->sc_xprt_buf;
- dst += ctxt->sc_sges[0].length;
-
+ dst = sctxt->sc_xprt_buf + sctxt->sc_hdrbuf.len;
memcpy(dst, xdr->head[0].iov_base, xdr->head[0].iov_len);
dst += xdr->head[0].iov_len;
tailbase = xdr->tail[0].iov_base;
taillen = xdr->tail[0].iov_len;
- if (wr_lst) {
+ if (rctxt && rctxt->rc_write_list) {
u32 xdrpad;
- xdrpad = xdr_padsize(xdr->page_len);
+ xdrpad = xdr_pad_size(xdr->page_len);
if (taillen && xdrpad) {
tailbase += xdrpad;
taillen -= xdrpad;
@@ -621,29 +645,26 @@ static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma,
if (taillen)
memcpy(dst, tailbase, taillen);
- ctxt->sc_sges[0].length += xdr->len;
- ib_dma_sync_single_for_device(rdma->sc_pd->device,
- ctxt->sc_sges[0].addr,
- ctxt->sc_sges[0].length,
- DMA_TO_DEVICE);
-
+ sctxt->sc_sges[0].length += xdr->len;
+ trace_svcrdma_send_pullup(sctxt->sc_sges[0].length);
return 0;
}
-/* svc_rdma_map_reply_msg - Map the buffer holding RPC message
+/* svc_rdma_map_reply_msg - DMA map the buffer holding RPC message
* @rdma: controlling transport
- * @ctxt: send_ctxt for the Send WR
+ * @sctxt: send_ctxt for the Send WR
+ * @rctxt: Write and Reply chunks provided by client
* @xdr: prepared xdr_buf containing RPC message
- * @wr_lst: pointer to Call header's Write list, or NULL
*
* Load the xdr_buf into the ctxt's sge array, and DMA map each
- * element as it is added.
+ * element as it is added. The Send WR's num_sge field is set.
*
* Returns zero on success, or a negative errno on failure.
*/
int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
- struct svc_rdma_send_ctxt *ctxt,
- struct xdr_buf *xdr, __be32 *wr_lst)
+ struct svc_rdma_send_ctxt *sctxt,
+ const struct svc_rdma_recv_ctxt *rctxt,
+ struct xdr_buf *xdr)
{
unsigned int len, remaining;
unsigned long page_off;
@@ -652,11 +673,24 @@ int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
u32 xdr_pad;
int ret;
- if (svc_rdma_pull_up_needed(rdma, xdr, wr_lst))
- return svc_rdma_pull_up_reply_msg(rdma, ctxt, xdr, wr_lst);
+ /* Set up the (persistently-mapped) transport header SGE. */
+ sctxt->sc_send_wr.num_sge = 1;
+ sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len;
- ++ctxt->sc_cur_sge_no;
- ret = svc_rdma_dma_map_buf(rdma, ctxt,
+ /* If there is a Reply chunk, nothing follows the transport
+ * header, and we're done here.
+ */
+ if (rctxt && rctxt->rc_reply_chunk)
+ return 0;
+
+ /* For pull-up, svc_rdma_send() will sync the transport header.
+ * No additional DMA mapping is necessary.
+ */
+ if (svc_rdma_pull_up_needed(rdma, sctxt, rctxt, xdr))
+ return svc_rdma_pull_up_reply_msg(rdma, sctxt, rctxt, xdr);
+
+ ++sctxt->sc_cur_sge_no;
+ ret = svc_rdma_dma_map_buf(rdma, sctxt,
xdr->head[0].iov_base,
xdr->head[0].iov_len);
if (ret < 0)
@@ -667,10 +701,10 @@ int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
* have added XDR padding in the tail buffer, and that
* should not be included inline.
*/
- if (wr_lst) {
+ if (rctxt && rctxt->rc_write_list) {
base = xdr->tail[0].iov_base;
len = xdr->tail[0].iov_len;
- xdr_pad = xdr_padsize(xdr->page_len);
+ xdr_pad = xdr_pad_size(xdr->page_len);
if (len && xdr_pad) {
base += xdr_pad;
@@ -686,8 +720,8 @@ int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
while (remaining) {
len = min_t(u32, PAGE_SIZE - page_off, remaining);
- ++ctxt->sc_cur_sge_no;
- ret = svc_rdma_dma_map_page(rdma, ctxt, *ppages++,
+ ++sctxt->sc_cur_sge_no;
+ ret = svc_rdma_dma_map_page(rdma, sctxt, *ppages++,
page_off, len);
if (ret < 0)
return ret;
@@ -700,8 +734,8 @@ int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
len = xdr->tail[0].iov_len;
tail:
if (len) {
- ++ctxt->sc_cur_sge_no;
- ret = svc_rdma_dma_map_buf(rdma, ctxt, base, len);
+ ++sctxt->sc_cur_sge_no;
+ ret = svc_rdma_dma_map_buf(rdma, sctxt, base, len);
if (ret < 0)
return ret;
}
@@ -748,18 +782,14 @@ static void svc_rdma_save_io_pages(struct svc_rqst *rqstp,
*/
static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
struct svc_rdma_send_ctxt *sctxt,
- struct svc_rdma_recv_ctxt *rctxt,
- struct svc_rqst *rqstp,
- __be32 *wr_lst, __be32 *rp_ch)
+ const struct svc_rdma_recv_ctxt *rctxt,
+ struct svc_rqst *rqstp)
{
int ret;
- if (!rp_ch) {
- ret = svc_rdma_map_reply_msg(rdma, sctxt,
- &rqstp->rq_res, wr_lst);
- if (ret < 0)
- return ret;
- }
+ ret = svc_rdma_map_reply_msg(rdma, sctxt, rctxt, &rqstp->rq_res);
+ if (ret < 0)
+ return ret;
svc_rdma_save_io_pages(rqstp, sctxt);
@@ -769,8 +799,6 @@ static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
} else {
sctxt->sc_send_wr.opcode = IB_WR_SEND;
}
- dprintk("svcrdma: posting Send WR with %u sge(s)\n",
- sctxt->sc_send_wr.num_sge);
return svc_rdma_send(rdma, &sctxt->sc_send_wr);
}
@@ -785,26 +813,31 @@ static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
struct svc_rdma_send_ctxt *ctxt,
struct svc_rqst *rqstp)
{
+ struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
+ __be32 *rdma_argp = rctxt->rc_recv_buf;
__be32 *p;
- int ret;
- p = ctxt->sc_xprt_buf;
- trace_svcrdma_err_chunk(*p);
- p += 3;
+ rpcrdma_set_xdrlen(&ctxt->sc_hdrbuf, 0);
+ xdr_init_encode(&ctxt->sc_stream, &ctxt->sc_hdrbuf, ctxt->sc_xprt_buf,
+ NULL);
+
+ p = xdr_reserve_space(&ctxt->sc_stream, RPCRDMA_HDRLEN_ERR);
+ if (!p)
+ return -ENOMSG;
+
+ *p++ = *rdma_argp;
+ *p++ = *(rdma_argp + 1);
+ *p++ = rdma->sc_fc_credits;
*p++ = rdma_error;
*p = err_chunk;
- svc_rdma_sync_reply_hdr(rdma, ctxt, RPCRDMA_HDRLEN_ERR);
+ trace_svcrdma_err_chunk(*rdma_argp);
svc_rdma_save_io_pages(rqstp, ctxt);
+ ctxt->sc_send_wr.num_sge = 1;
ctxt->sc_send_wr.opcode = IB_WR_SEND;
- ret = svc_rdma_send(rdma, &ctxt->sc_send_wr);
- if (ret) {
- svc_rdma_send_ctxt_put(rdma, ctxt);
- return ret;
- }
-
- return 0;
+ ctxt->sc_sges[0].length = ctxt->sc_hdrbuf.len;
+ return svc_rdma_send(rdma, &ctxt->sc_send_wr);
}
/**
@@ -825,14 +858,14 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
struct svcxprt_rdma *rdma =
container_of(xprt, struct svcxprt_rdma, sc_xprt);
struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
- __be32 *p, *rdma_argp, *rdma_resp, *wr_lst, *rp_ch;
+ __be32 *rdma_argp = rctxt->rc_recv_buf;
+ __be32 *wr_lst = rctxt->rc_write_list;
+ __be32 *rp_ch = rctxt->rc_reply_chunk;
struct xdr_buf *xdr = &rqstp->rq_res;
struct svc_rdma_send_ctxt *sctxt;
+ __be32 *p;
int ret;
- rdma_argp = rctxt->rc_recv_buf;
- svc_rdma_get_write_arrays(rdma_argp, &wr_lst, &rp_ch);
-
/* Create the RDMA response header. xprt->xpt_mutex,
* acquired in svc_send(), serializes RPC replies. The
* code path below that inserts the credit grant value
@@ -843,36 +876,52 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
sctxt = svc_rdma_send_ctxt_get(rdma);
if (!sctxt)
goto err0;
- rdma_resp = sctxt->sc_xprt_buf;
- p = rdma_resp;
+ p = xdr_reserve_space(&sctxt->sc_stream,
+ rpcrdma_fixed_maxsz * sizeof(*p));
+ if (!p)
+ goto err0;
*p++ = *rdma_argp;
*p++ = *(rdma_argp + 1);
*p++ = rdma->sc_fc_credits;
- *p++ = rp_ch ? rdma_nomsg : rdma_msg;
-
- /* Start with empty chunks */
- *p++ = xdr_zero;
- *p++ = xdr_zero;
- *p = xdr_zero;
+ *p = rp_ch ? rdma_nomsg : rdma_msg;
+ if (svc_rdma_encode_read_list(sctxt) < 0)
+ goto err0;
if (wr_lst) {
/* XXX: Presume the client sent only one Write chunk */
- ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr);
+ unsigned long offset;
+ unsigned int length;
+
+ if (rctxt->rc_read_payload_length) {
+ offset = rctxt->rc_read_payload_offset;
+ length = rctxt->rc_read_payload_length;
+ } else {
+ offset = xdr->head[0].iov_len;
+ length = xdr->page_len;
+ }
+ ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr, offset,
+ length);
if (ret < 0)
goto err2;
- svc_rdma_xdr_encode_write_list(rdma_resp, wr_lst, ret);
+ if (svc_rdma_encode_write_list(rctxt, sctxt, length) < 0)
+ goto err0;
+ } else {
+ if (xdr_stream_encode_item_absent(&sctxt->sc_stream) < 0)
+ goto err0;
}
if (rp_ch) {
- ret = svc_rdma_send_reply_chunk(rdma, rp_ch, wr_lst, xdr);
+ ret = svc_rdma_send_reply_chunk(rdma, rctxt, &rqstp->rq_res);
if (ret < 0)
goto err2;
- svc_rdma_xdr_encode_reply_chunk(rdma_resp, rp_ch, ret);
+ if (svc_rdma_encode_reply_chunk(rctxt, sctxt, ret) < 0)
+ goto err0;
+ } else {
+ if (xdr_stream_encode_item_absent(&sctxt->sc_stream) < 0)
+ goto err0;
}
- svc_rdma_sync_reply_hdr(rdma, sctxt, svc_rdma_reply_hdr_len(rdma_resp));
- ret = svc_rdma_send_reply_msg(rdma, sctxt, rctxt, rqstp,
- wr_lst, rp_ch);
+ ret = svc_rdma_send_reply_msg(rdma, sctxt, rctxt, rqstp);
if (ret < 0)
goto err1;
ret = 0;
@@ -900,3 +949,30 @@ out:
ret = -ENOTCONN;
goto out;
}
+
+/**
+ * svc_rdma_read_payload - special processing for a READ payload
+ * @rqstp: svc_rqst to operate on
+ * @offset: payload's byte offset in @xdr
+ * @length: size of payload, in bytes
+ *
+ * Returns zero on success.
+ *
+ * For the moment, just record the xdr_buf location of the READ
+ * payload. svc_rdma_sendto will use that location later when
+ * we actually send the payload.
+ */
+int svc_rdma_read_payload(struct svc_rqst *rqstp, unsigned int offset,
+ unsigned int length)
+{
+ struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
+
+ /* XXX: Just one READ payload slot for now, since our
+ * transport implementation currently supports only one
+ * Write chunk.
+ */
+ rctxt->rc_read_payload_offset = offset;
+ rctxt->rc_read_payload_length = length;
+
+ return 0;
+}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index 145a3615c319..8bb99980ae85 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -82,6 +82,7 @@ static const struct svc_xprt_ops svc_rdma_ops = {
.xpo_create = svc_rdma_create,
.xpo_recvfrom = svc_rdma_recvfrom,
.xpo_sendto = svc_rdma_sendto,
+ .xpo_read_payload = svc_rdma_read_payload,
.xpo_release_rqst = svc_rdma_release_rqst,
.xpo_detach = svc_rdma_detach,
.xpo_free = svc_rdma_free,
@@ -240,10 +241,6 @@ static void handle_connect_req(struct rdma_cm_id *new_cma_id,
static int rdma_listen_handler(struct rdma_cm_id *cma_id,
struct rdma_cm_event *event)
{
- struct sockaddr *sap = (struct sockaddr *)&cma_id->route.addr.src_addr;
-
- trace_svcrdma_cm_event(event, sap);
-
switch (event->event) {
case RDMA_CM_EVENT_CONNECT_REQUEST:
dprintk("svcrdma: Connect request on cma_id=%p, xprt = %p, "
@@ -265,12 +262,9 @@ static int rdma_listen_handler(struct rdma_cm_id *cma_id,
static int rdma_cma_handler(struct rdma_cm_id *cma_id,
struct rdma_cm_event *event)
{
- struct sockaddr *sap = (struct sockaddr *)&cma_id->route.addr.dst_addr;
struct svcxprt_rdma *rdma = cma_id->context;
struct svc_xprt *xprt = &rdma->sc_xprt;
- trace_svcrdma_cm_event(event, sap);
-
switch (event->event) {
case RDMA_CM_EVENT_ESTABLISHED:
/* Accept complete */
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index d86c664ea6af..17cb902e5153 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -54,6 +54,7 @@
#include <trace/events/sunrpc.h>
+#include "socklib.h"
#include "sunrpc.h"
static void xs_close(struct rpc_xprt *xprt);
@@ -749,125 +750,6 @@ xs_stream_start_connect(struct sock_xprt *transport)
#define XS_SENDMSG_FLAGS (MSG_DONTWAIT | MSG_NOSIGNAL)
-static int xs_sendmsg(struct socket *sock, struct msghdr *msg, size_t seek)
-{
- if (seek)
- iov_iter_advance(&msg->msg_iter, seek);
- return sock_sendmsg(sock, msg);
-}
-
-static int xs_send_kvec(struct socket *sock, struct msghdr *msg, struct kvec *vec, size_t seek)
-{
- iov_iter_kvec(&msg->msg_iter, WRITE, vec, 1, vec->iov_len);
- return xs_sendmsg(sock, msg, seek);
-}
-
-static int xs_send_pagedata(struct socket *sock, struct msghdr *msg, struct xdr_buf *xdr, size_t base)
-{
- int err;
-
- err = xdr_alloc_bvec(xdr, GFP_KERNEL);
- if (err < 0)
- return err;
-
- iov_iter_bvec(&msg->msg_iter, WRITE, xdr->bvec,
- xdr_buf_pagecount(xdr),
- xdr->page_len + xdr->page_base);
- return xs_sendmsg(sock, msg, base + xdr->page_base);
-}
-
-#define xs_record_marker_len() sizeof(rpc_fraghdr)
-
-/* Common case:
- * - stream transport
- * - sending from byte 0 of the message
- * - the message is wholly contained in @xdr's head iovec
- */
-static int xs_send_rm_and_kvec(struct socket *sock, struct msghdr *msg,
- rpc_fraghdr marker, struct kvec *vec, size_t base)
-{
- struct kvec iov[2] = {
- [0] = {
- .iov_base = &marker,
- .iov_len = sizeof(marker)
- },
- [1] = *vec,
- };
- size_t len = iov[0].iov_len + iov[1].iov_len;
-
- iov_iter_kvec(&msg->msg_iter, WRITE, iov, 2, len);
- return xs_sendmsg(sock, msg, base);
-}
-
-/**
- * xs_sendpages - write pages directly to a socket
- * @sock: socket to send on
- * @addr: UDP only -- address of destination
- * @addrlen: UDP only -- length of destination address
- * @xdr: buffer containing this request
- * @base: starting position in the buffer
- * @rm: stream record marker field
- * @sent_p: return the total number of bytes successfully queued for sending
- *
- */
-static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base, rpc_fraghdr rm, int *sent_p)
-{
- struct msghdr msg = {
- .msg_name = addr,
- .msg_namelen = addrlen,
- .msg_flags = XS_SENDMSG_FLAGS | MSG_MORE,
- };
- unsigned int rmsize = rm ? sizeof(rm) : 0;
- unsigned int remainder = rmsize + xdr->len - base;
- unsigned int want;
- int err = 0;
-
- if (unlikely(!sock))
- return -ENOTSOCK;
-
- want = xdr->head[0].iov_len + rmsize;
- if (base < want) {
- unsigned int len = want - base;
- remainder -= len;
- if (remainder == 0)
- msg.msg_flags &= ~MSG_MORE;
- if (rmsize)
- err = xs_send_rm_and_kvec(sock, &msg, rm,
- &xdr->head[0], base);
- else
- err = xs_send_kvec(sock, &msg, &xdr->head[0], base);
- if (remainder == 0 || err != len)
- goto out;
- *sent_p += err;
- base = 0;
- } else
- base -= want;
-
- if (base < xdr->page_len) {
- unsigned int len = xdr->page_len - base;
- remainder -= len;
- if (remainder == 0)
- msg.msg_flags &= ~MSG_MORE;
- err = xs_send_pagedata(sock, &msg, xdr, base);
- if (remainder == 0 || err != len)
- goto out;
- *sent_p += err;
- base = 0;
- } else
- base -= xdr->page_len;
-
- if (base >= xdr->tail[0].iov_len)
- return 0;
- msg.msg_flags &= ~MSG_MORE;
- err = xs_send_kvec(sock, &msg, &xdr->tail[0], base);
-out:
- if (err > 0) {
- *sent_p += err;
- err = 0;
- }
- return err;
-}
-
/**
* xs_nospace - handle transmit was incomplete
* @req: pointer to RPC request
@@ -959,8 +841,11 @@ static int xs_local_send_request(struct rpc_rqst *req)
struct xdr_buf *xdr = &req->rq_snd_buf;
rpc_fraghdr rm = xs_stream_record_marker(xdr);
unsigned int msglen = rm ? req->rq_slen + sizeof(rm) : req->rq_slen;
+ struct msghdr msg = {
+ .msg_flags = XS_SENDMSG_FLAGS,
+ };
+ unsigned int uninitialized_var(sent);
int status;
- int sent = 0;
/* Close the stream if the previous transmission was incomplete */
if (xs_send_request_was_aborted(transport, req)) {
@@ -972,8 +857,8 @@ static int xs_local_send_request(struct rpc_rqst *req)
req->rq_svec->iov_base, req->rq_svec->iov_len);
req->rq_xtime = ktime_get();
- status = xs_sendpages(transport->sock, NULL, 0, xdr,
- transport->xmit.offset, rm, &sent);
+ status = xprt_sock_sendmsg(transport->sock, &msg, xdr,
+ transport->xmit.offset, rm, &sent);
dprintk("RPC: %s(%u) = %d\n",
__func__, xdr->len - transport->xmit.offset, status);
@@ -1025,7 +910,12 @@ static int xs_udp_send_request(struct rpc_rqst *req)
struct rpc_xprt *xprt = req->rq_xprt;
struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
struct xdr_buf *xdr = &req->rq_snd_buf;
- int sent = 0;
+ struct msghdr msg = {
+ .msg_name = xs_addr(xprt),
+ .msg_namelen = xprt->addrlen,
+ .msg_flags = XS_SENDMSG_FLAGS,
+ };
+ unsigned int uninitialized_var(sent);
int status;
xs_pktdump("packet data:",
@@ -1039,8 +929,7 @@ static int xs_udp_send_request(struct rpc_rqst *req)
return -EBADSLT;
req->rq_xtime = ktime_get();
- status = xs_sendpages(transport->sock, xs_addr(xprt), xprt->addrlen,
- xdr, 0, 0, &sent);
+ status = xprt_sock_sendmsg(transport->sock, &msg, xdr, 0, 0, &sent);
dprintk("RPC: xs_udp_send_request(%u) = %d\n",
xdr->len, status);
@@ -1106,9 +995,12 @@ static int xs_tcp_send_request(struct rpc_rqst *req)
struct xdr_buf *xdr = &req->rq_snd_buf;
rpc_fraghdr rm = xs_stream_record_marker(xdr);
unsigned int msglen = rm ? req->rq_slen + sizeof(rm) : req->rq_slen;
+ struct msghdr msg = {
+ .msg_flags = XS_SENDMSG_FLAGS,
+ };
bool vm_wait = false;
+ unsigned int uninitialized_var(sent);
int status;
- int sent;
/* Close the stream if the previous transmission was incomplete */
if (xs_send_request_was_aborted(transport, req)) {
@@ -1129,9 +1021,8 @@ static int xs_tcp_send_request(struct rpc_rqst *req)
* called sendmsg(). */
req->rq_xtime = ktime_get();
while (1) {
- sent = 0;
- status = xs_sendpages(transport->sock, NULL, 0, xdr,
- transport->xmit.offset, rm, &sent);
+ status = xprt_sock_sendmsg(transport->sock, &msg, xdr,
+ transport->xmit.offset, rm, &sent);
dprintk("RPC: xs_tcp_send_request(%u) = %d\n",
xdr->len - transport->xmit.offset, status);
@@ -2636,46 +2527,25 @@ static void bc_free(struct rpc_task *task)
free_page((unsigned long)buf);
}
-/*
- * Use the svc_sock to send the callback. Must be called with svsk->sk_mutex
- * held. Borrows heavily from svc_tcp_sendto and xs_tcp_send_request.
- */
static int bc_sendto(struct rpc_rqst *req)
{
- int len;
- struct xdr_buf *xbufp = &req->rq_snd_buf;
+ struct xdr_buf *xdr = &req->rq_snd_buf;
struct sock_xprt *transport =
container_of(req->rq_xprt, struct sock_xprt, xprt);
- unsigned long headoff;
- unsigned long tailoff;
- struct page *tailpage;
struct msghdr msg = {
- .msg_flags = MSG_MORE
+ .msg_flags = 0,
};
rpc_fraghdr marker = cpu_to_be32(RPC_LAST_STREAM_FRAGMENT |
- (u32)xbufp->len);
- struct kvec iov = {
- .iov_base = &marker,
- .iov_len = sizeof(marker),
- };
+ (u32)xdr->len);
+ unsigned int sent = 0;
+ int err;
req->rq_xtime = ktime_get();
-
- len = kernel_sendmsg(transport->sock, &msg, &iov, 1, iov.iov_len);
- if (len != iov.iov_len)
+ err = xprt_sock_sendmsg(transport->sock, &msg, xdr, 0, marker, &sent);
+ xdr_free_bvec(xdr);
+ if (err < 0 || sent != (xdr->len + sizeof(marker)))
return -EAGAIN;
-
- tailpage = NULL;
- if (xbufp->tail[0].iov_len)
- tailpage = virt_to_page(xbufp->tail[0].iov_base);
- tailoff = (unsigned long)xbufp->tail[0].iov_base & ~PAGE_MASK;
- headoff = (unsigned long)xbufp->head[0].iov_base & ~PAGE_MASK;
- len = svc_send_common(transport->sock, xbufp,
- virt_to_page(xbufp->head[0].iov_base), headoff,
- tailpage, tailoff);
- if (len != xbufp->len)
- return -EAGAIN;
- return len;
+ return sent;
}
/*