diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2018-01-30 19:03:48 -0800 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2018-01-30 19:03:48 -0800 |
commit | efd52b5d363e3e3b6224ad39949219c0df117c91 (patch) | |
tree | 2d885d2f431a324af58d8f267755240bff3e32da /net | |
parent | 1ed2d76e0213751c82e3a242b61b0883daf330df (diff) | |
parent | e231c6879cfd44e4fffd384bb6dd7d313249a523 (diff) |
Merge tag 'nfs-for-4.16-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs
Pull NFS client updates from Trond Myklebust:
"Highlights include:
Stable bugfixes:
- Fix breakages in the nfsstat utility due to the inclusion of the
NFSv4 LOOKUPP operation
- Fix a NULL pointer dereference in nfs_idmap_prepare_pipe_upcall()
due to nfs_idmap_legacy_upcall() being called without an 'aux'
parameter
- Fix a refcount leak in the standard O_DIRECT error path
- Fix a refcount leak in the pNFS O_DIRECT fallback to MDS path
- Fix CPU latency issues with nfs_commit_release_pages()
- Fix the LAYOUTUNAVAILABLE error case in the file layout type
- NFS: Fix a race between mmap() and O_DIRECT
Features:
- Support the statx() mask and query flags to enable optimisations
when the user is requesting only attributes that are already up to
date in the inode cache, or is specifying the AT_STATX_DONT_SYNC
flag
- Add a module alias for the SCSI pNFS layout type
Bugfixes:
- Automounting when resolving a NFSv4 referral should preserve the
RDMA transport protocol settings
- Various other RDMA bugfixes from Chuck
- pNFS block layout fixes
- Always set NFS_LOCK_LOST when a lock is lost"
* tag 'nfs-for-4.16-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs: (69 commits)
NFS: Fix a race between mmap() and O_DIRECT
NFS: Remove a redundant call to unmap_mapping_range()
pnfs/blocklayout: Ensure disk address in block device map
pnfs/blocklayout: pnfs_block_dev_map uses bytes, not sectors
lockd: Fix server refcounting
SUNRPC: Fix null rpc_clnt dereference in rpc_task_queued tracepoint
SUNRPC: Micro-optimize __rpc_execute
SUNRPC: task_run_action should display tk_callback
sunrpc: Format RPC events consistently for display
SUNRPC: Trace xprt_timer events
xprtrdma: Correct some documenting comments
xprtrdma: Fix "bytes registered" accounting
xprtrdma: Instrument allocation/release of rpcrdma_req/rep objects
xprtrdma: Add trace points to instrument QP and CQ access upcalls
xprtrdma: Add trace points in the client-side backchannel code paths
xprtrdma: Add trace points for connect events
xprtrdma: Add trace points to instrument MR allocation and recovery
xprtrdma: Add trace points to instrument memory invalidation
xprtrdma: Add trace points in reply decoder path
xprtrdma: Add trace points to instrument memory registration
..
Diffstat (limited to 'net')
-rw-r--r-- | net/sunrpc/clnt.c | 16 | ||||
-rw-r--r-- | net/sunrpc/sched.c | 26 | ||||
-rw-r--r-- | net/sunrpc/xprt.c | 2 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/backchannel.c | 78 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/fmr_ops.c | 157 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/frwr_ops.c | 329 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/module.c | 12 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/rpc_rdma.c | 162 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/transport.c | 128 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/verbs.c | 280 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/xprt_rdma.h | 116 | ||||
-rw-r--r-- | net/sunrpc/xprtsock.c | 36 |
12 files changed, 666 insertions, 676 deletions
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index e2a4184f3c5d..6e432ecd7f99 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -1376,22 +1376,6 @@ rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize EXPORT_SYMBOL_GPL(rpc_setbufsize); /** - * rpc_protocol - Get transport protocol number for an RPC client - * @clnt: RPC client to query - * - */ -int rpc_protocol(struct rpc_clnt *clnt) -{ - int protocol; - - rcu_read_lock(); - protocol = rcu_dereference(clnt->cl_xprt)->prot; - rcu_read_unlock(); - return protocol; -} -EXPORT_SYMBOL_GPL(rpc_protocol); - -/** * rpc_net_ns - Get the network namespace for this RPC client * @clnt: RPC client to query * diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index b1b49edd7c4d..896691afbb1a 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -755,22 +755,20 @@ static void __rpc_execute(struct rpc_task *task) void (*do_action)(struct rpc_task *); /* - * Execute any pending callback first. + * Perform the next FSM step or a pending callback. + * + * tk_action may be NULL if the task has been killed. + * In particular, note that rpc_killall_tasks may + * do this at any time, so beware when dereferencing. */ - do_action = task->tk_callback; - task->tk_callback = NULL; - if (do_action == NULL) { - /* - * Perform the next FSM step. - * tk_action may be NULL if the task has been killed. - * In particular, note that rpc_killall_tasks may - * do this at any time, so beware when dereferencing. - */ - do_action = task->tk_action; - if (do_action == NULL) - break; + do_action = task->tk_action; + if (task->tk_callback) { + do_action = task->tk_callback; + task->tk_callback = NULL; } - trace_rpc_task_run_action(task->tk_client, task, task->tk_action); + if (!do_action) + break; + trace_rpc_task_run_action(task->tk_client, task, do_action); do_action(task); /* diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 33b74fd84051..2436fd1125fc 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -940,8 +940,8 @@ static void xprt_timer(struct rpc_task *task) if (task->tk_status != -ETIMEDOUT) return; - dprintk("RPC: %5u xprt_timer\n", task->tk_pid); + trace_xprt_timer(xprt, req->rq_xid, task->tk_status); if (!req->rq_reply_bytes_recvd) { if (xprt->ops->timer) xprt->ops->timer(xprt, task); diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c index 8b818bb3518a..ed1a4a3065ee 100644 --- a/net/sunrpc/xprtrdma/backchannel.c +++ b/net/sunrpc/xprtrdma/backchannel.c @@ -43,7 +43,6 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt, req = rpcrdma_create_req(r_xprt); if (IS_ERR(req)) return PTR_ERR(req); - __set_bit(RPCRDMA_REQ_F_BACKCHANNEL, &req->rl_flags); rb = rpcrdma_alloc_regbuf(RPCRDMA_HDRBUF_SIZE, DMA_TO_DEVICE, GFP_KERNEL); @@ -74,21 +73,13 @@ out_fail: static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt, unsigned int count) { - struct rpcrdma_rep *rep; int rc = 0; while (count--) { - rep = rpcrdma_create_rep(r_xprt); - if (IS_ERR(rep)) { - pr_err("RPC: %s: reply buffer alloc failed\n", - __func__); - rc = PTR_ERR(rep); + rc = rpcrdma_create_rep(r_xprt); + if (rc) break; - } - - rpcrdma_recv_buffer_put(rep); } - return rc; } @@ -129,6 +120,7 @@ int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs) rqst->rq_xprt = &r_xprt->rx_xprt; INIT_LIST_HEAD(&rqst->rq_list); INIT_LIST_HEAD(&rqst->rq_bc_list); + __set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state); if (rpcrdma_bc_setup_rqst(r_xprt, rqst)) goto out_free; @@ -148,7 +140,7 @@ int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs) buffer->rb_bc_srv_max_requests = reqs; request_module("svcrdma"); - + trace_xprtrdma_cb_setup(r_xprt, reqs); return 0; out_free: @@ -196,13 +188,7 @@ size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt) return maxmsg - RPCRDMA_HDRLEN_MIN; } -/** - * rpcrdma_bc_marshal_reply - Send backwards direction reply - * @rqst: buffer containing RPC reply data - * - * Returns zero on success. - */ -int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst) +static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst) { struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); struct rpcrdma_req *req = rpcr_to_rdmar(rqst); @@ -226,7 +212,46 @@ int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst) if (rpcrdma_prepare_send_sges(r_xprt, req, RPCRDMA_HDRLEN_MIN, &rqst->rq_snd_buf, rpcrdma_noch)) return -EIO; + + trace_xprtrdma_cb_reply(rqst); + return 0; +} + +/** + * xprt_rdma_bc_send_reply - marshal and send a backchannel reply + * @rqst: RPC rqst with a backchannel RPC reply in rq_snd_buf + * + * Caller holds the transport's write lock. + * + * Returns: + * %0 if the RPC message has been sent + * %-ENOTCONN if the caller should reconnect and call again + * %-EIO if a permanent error occurred and the request was not + * sent. Do not try to send this message again. + */ +int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst) +{ + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); + struct rpcrdma_req *req = rpcr_to_rdmar(rqst); + int rc; + + if (!xprt_connected(rqst->rq_xprt)) + goto drop_connection; + + rc = rpcrdma_bc_marshal_reply(rqst); + if (rc < 0) + goto failed_marshal; + + if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req)) + goto drop_connection; return 0; + +failed_marshal: + if (rc != -ENOTCONN) + return rc; +drop_connection: + xprt_disconnect_done(rqst->rq_xprt); + return -ENOTCONN; } /** @@ -262,11 +287,6 @@ void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst) dprintk("RPC: %s: freeing rqst %p (req %p)\n", __func__, rqst, rpcr_to_rdmar(rqst)); - smp_mb__before_atomic(); - WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state)); - clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state); - smp_mb__after_atomic(); - spin_lock_bh(&xprt->bc_pa_lock); list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list); spin_unlock_bh(&xprt->bc_pa_lock); @@ -274,7 +294,7 @@ void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst) /** * rpcrdma_bc_receive_call - Handle a backward direction call - * @xprt: transport receiving the call + * @r_xprt: transport receiving the call * @rep: receive buffer containing the call * * Operational assumptions: @@ -313,7 +333,6 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt, struct rpc_rqst, rq_bc_pa_list); list_del(&rqst->rq_bc_pa_list); spin_unlock(&xprt->bc_pa_lock); - dprintk("RPC: %s: using rqst %p\n", __func__, rqst); /* Prepare rqst */ rqst->rq_reply_bytes_recvd = 0; @@ -321,7 +340,6 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt, rqst->rq_xid = *p; rqst->rq_private_buf.len = size; - set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state); buf = &rqst->rq_rcv_buf; memset(buf, 0, sizeof(*buf)); @@ -335,12 +353,8 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt, * the Upper Layer is done decoding it. */ req = rpcr_to_rdmar(rqst); - dprintk("RPC: %s: attaching rep %p to req %p\n", - __func__, rep, req); req->rl_reply = rep; - - /* Defeat the retransmit detection logic in send_request */ - req->rl_connect_cookie = 0; + trace_xprtrdma_cb_call(rqst); /* Queue rqst for ULP's callback service */ bc_serv = xprt->bc_serv; diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c index 29fc84c7ff98..d5f95bb39300 100644 --- a/net/sunrpc/xprtrdma/fmr_ops.c +++ b/net/sunrpc/xprtrdma/fmr_ops.c @@ -1,6 +1,6 @@ // SPDX-License-Identifier: GPL-2.0 /* - * Copyright (c) 2015 Oracle. All rights reserved. + * Copyright (c) 2015, 2017 Oracle. All rights reserved. * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. */ @@ -47,7 +47,7 @@ fmr_is_supported(struct rpcrdma_ia *ia) } static int -fmr_op_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *mw) +fmr_op_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr) { static struct ib_fmr_attr fmr_attr = { .max_pages = RPCRDMA_MAX_FMR_SGES, @@ -55,106 +55,108 @@ fmr_op_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *mw) .page_shift = PAGE_SHIFT }; - mw->fmr.fm_physaddrs = kcalloc(RPCRDMA_MAX_FMR_SGES, + mr->fmr.fm_physaddrs = kcalloc(RPCRDMA_MAX_FMR_SGES, sizeof(u64), GFP_KERNEL); - if (!mw->fmr.fm_physaddrs) + if (!mr->fmr.fm_physaddrs) goto out_free; - mw->mw_sg = kcalloc(RPCRDMA_MAX_FMR_SGES, - sizeof(*mw->mw_sg), GFP_KERNEL); - if (!mw->mw_sg) + mr->mr_sg = kcalloc(RPCRDMA_MAX_FMR_SGES, + sizeof(*mr->mr_sg), GFP_KERNEL); + if (!mr->mr_sg) goto out_free; - sg_init_table(mw->mw_sg, RPCRDMA_MAX_FMR_SGES); + sg_init_table(mr->mr_sg, RPCRDMA_MAX_FMR_SGES); - mw->fmr.fm_mr = ib_alloc_fmr(ia->ri_pd, RPCRDMA_FMR_ACCESS_FLAGS, + mr->fmr.fm_mr = ib_alloc_fmr(ia->ri_pd, RPCRDMA_FMR_ACCESS_FLAGS, &fmr_attr); - if (IS_ERR(mw->fmr.fm_mr)) + if (IS_ERR(mr->fmr.fm_mr)) goto out_fmr_err; return 0; out_fmr_err: dprintk("RPC: %s: ib_alloc_fmr returned %ld\n", __func__, - PTR_ERR(mw->fmr.fm_mr)); + PTR_ERR(mr->fmr.fm_mr)); out_free: - kfree(mw->mw_sg); - kfree(mw->fmr.fm_physaddrs); + kfree(mr->mr_sg); + kfree(mr->fmr.fm_physaddrs); return -ENOMEM; } static int -__fmr_unmap(struct rpcrdma_mw *mw) +__fmr_unmap(struct rpcrdma_mr *mr) { LIST_HEAD(l); int rc; - list_add(&mw->fmr.fm_mr->list, &l); + list_add(&mr->fmr.fm_mr->list, &l); rc = ib_unmap_fmr(&l); - list_del(&mw->fmr.fm_mr->list); + list_del(&mr->fmr.fm_mr->list); return rc; } static void -fmr_op_release_mr(struct rpcrdma_mw *r) +fmr_op_release_mr(struct rpcrdma_mr *mr) { LIST_HEAD(unmap_list); int rc; /* Ensure MW is not on any rl_registered list */ - if (!list_empty(&r->mw_list)) - list_del(&r->mw_list); + if (!list_empty(&mr->mr_list)) + list_del(&mr->mr_list); - kfree(r->fmr.fm_physaddrs); - kfree(r->mw_sg); + kfree(mr->fmr.fm_physaddrs); + kfree(mr->mr_sg); /* In case this one was left mapped, try to unmap it * to prevent dealloc_fmr from failing with EBUSY */ - rc = __fmr_unmap(r); + rc = __fmr_unmap(mr); if (rc) pr_err("rpcrdma: final ib_unmap_fmr for %p failed %i\n", - r, rc); + mr, rc); - rc = ib_dealloc_fmr(r->fmr.fm_mr); + rc = ib_dealloc_fmr(mr->fmr.fm_mr); if (rc) pr_err("rpcrdma: final ib_dealloc_fmr for %p returned %i\n", - r, rc); + mr, rc); - kfree(r); + kfree(mr); } /* Reset of a single FMR. */ static void -fmr_op_recover_mr(struct rpcrdma_mw *mw) +fmr_op_recover_mr(struct rpcrdma_mr *mr) { - struct rpcrdma_xprt *r_xprt = mw->mw_xprt; + struct rpcrdma_xprt *r_xprt = mr->mr_xprt; int rc; /* ORDER: invalidate first */ - rc = __fmr_unmap(mw); - - /* ORDER: then DMA unmap */ - ib_dma_unmap_sg(r_xprt->rx_ia.ri_device, - mw->mw_sg, mw->mw_nents, mw->mw_dir); + rc = __fmr_unmap(mr); if (rc) goto out_release; - rpcrdma_put_mw(r_xprt, mw); + /* ORDER: then DMA unmap */ + rpcrdma_mr_unmap_and_put(mr); + r_xprt->rx_stats.mrs_recovered++; return; out_release: - pr_err("rpcrdma: FMR reset failed (%d), %p released\n", rc, mw); + pr_err("rpcrdma: FMR reset failed (%d), %p released\n", rc, mr); r_xprt->rx_stats.mrs_orphaned++; - spin_lock(&r_xprt->rx_buf.rb_mwlock); - list_del(&mw->mw_all); - spin_unlock(&r_xprt->rx_buf.rb_mwlock); + trace_xprtrdma_dma_unmap(mr); + ib_dma_unmap_sg(r_xprt->rx_ia.ri_device, + mr->mr_sg, mr->mr_nents, mr->mr_dir); + + spin_lock(&r_xprt->rx_buf.rb_mrlock); + list_del(&mr->mr_all); + spin_unlock(&r_xprt->rx_buf.rb_mrlock); - fmr_op_release_mr(mw); + fmr_op_release_mr(mr); } static int @@ -180,15 +182,15 @@ fmr_op_maxpages(struct rpcrdma_xprt *r_xprt) */ static struct rpcrdma_mr_seg * fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, - int nsegs, bool writing, struct rpcrdma_mw **out) + int nsegs, bool writing, struct rpcrdma_mr **out) { struct rpcrdma_mr_seg *seg1 = seg; int len, pageoff, i, rc; - struct rpcrdma_mw *mw; + struct rpcrdma_mr *mr; u64 *dma_pages; - mw = rpcrdma_get_mw(r_xprt); - if (!mw) + mr = rpcrdma_mr_get(r_xprt); + if (!mr) return ERR_PTR(-ENOBUFS); pageoff = offset_in_page(seg1->mr_offset); @@ -199,12 +201,12 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, nsegs = RPCRDMA_MAX_FMR_SGES; for (i = 0; i < nsegs;) { if (seg->mr_page) - sg_set_page(&mw->mw_sg[i], + sg_set_page(&mr->mr_sg[i], seg->mr_page, seg->mr_len, offset_in_page(seg->mr_offset)); else - sg_set_buf(&mw->mw_sg[i], seg->mr_offset, + sg_set_buf(&mr->mr_sg[i], seg->mr_offset, seg->mr_len); len += seg->mr_len; ++seg; @@ -214,40 +216,38 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len)) break; } - mw->mw_dir = rpcrdma_data_dir(writing); + mr->mr_dir = rpcrdma_data_dir(writing); - mw->mw_nents = ib_dma_map_sg(r_xprt->rx_ia.ri_device, - mw->mw_sg, i, mw->mw_dir); - if (!mw->mw_nents) + mr->mr_nents = ib_dma_map_sg(r_xprt->rx_ia.ri_device, + mr->mr_sg, i, mr->mr_dir); + if (!mr->mr_nents) goto out_dmamap_err; - for (i = 0, dma_pages = mw->fmr.fm_physaddrs; i < mw->mw_nents; i++) - dma_pages[i] = sg_dma_address(&mw->mw_sg[i]); - rc = ib_map_phys_fmr(mw->fmr.fm_mr, dma_pages, mw->mw_nents, + for (i = 0, dma_pages = mr->fmr.fm_physaddrs; i < mr->mr_nents; i++) + dma_pages[i] = sg_dma_address(&mr->mr_sg[i]); + rc = ib_map_phys_fmr(mr->fmr.fm_mr, dma_pages, mr->mr_nents, dma_pages[0]); if (rc) goto out_maperr; - mw->mw_handle = mw->fmr.fm_mr->rkey; - mw->mw_length = len; - mw->mw_offset = dma_pages[0] + pageoff; + mr->mr_handle = mr->fmr.fm_mr->rkey; + mr->mr_length = len; + mr->mr_offset = dma_pages[0] + pageoff; - *out = mw; + *out = mr; return seg; out_dmamap_err: pr_err("rpcrdma: failed to DMA map sg %p sg_nents %d\n", - mw->mw_sg, i); - rpcrdma_put_mw(r_xprt, mw); + mr->mr_sg, i); + rpcrdma_mr_put(mr); return ERR_PTR(-EIO); out_maperr: pr_err("rpcrdma: ib_map_phys_fmr %u@0x%llx+%i (%d) status %i\n", len, (unsigned long long)dma_pages[0], - pageoff, mw->mw_nents, rc); - ib_dma_unmap_sg(r_xprt->rx_ia.ri_device, - mw->mw_sg, mw->mw_nents, mw->mw_dir); - rpcrdma_put_mw(r_xprt, mw); + pageoff, mr->mr_nents, rc); + rpcrdma_mr_unmap_and_put(mr); return ERR_PTR(-EIO); } @@ -256,13 +256,13 @@ out_maperr: * Sleeps until it is safe for the host CPU to access the * previously mapped memory regions. * - * Caller ensures that @mws is not empty before the call. This + * Caller ensures that @mrs is not empty before the call. This * function empties the list. */ static void -fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mws) +fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mrs) { - struct rpcrdma_mw *mw; + struct rpcrdma_mr *mr; LIST_HEAD(unmap_list); int rc; @@ -271,10 +271,11 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mws) * ib_unmap_fmr() is slow, so use a single call instead * of one call per mapped FMR. */ - list_for_each_entry(mw, mws, mw_list) { + list_for_each_entry(mr, mrs, mr_list) { dprintk("RPC: %s: unmapping fmr %p\n", - __func__, &mw->fmr); - list_add_tail(&mw->fmr.fm_mr->list, &unmap_list); + __func__, &mr->fmr); + trace_xprtrdma_localinv(mr); + list_add_tail(&mr->fmr.fm_mr->list, &unmap_list); } r_xprt->rx_stats.local_inv_needed++; rc = ib_unmap_fmr(&unmap_list); @@ -284,14 +285,10 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mws) /* ORDER: Now DMA unmap all of the req's MRs, and return * them to the free MW list. */ - while (!list_empty(mws)) { - mw = rpcrdma_pop_mw(mws); - dprintk("RPC: %s: DMA unmapping fmr %p\n", - __func__, &mw->fmr); - list_del(&mw->fmr.fm_mr->list); - ib_dma_unmap_sg(r_xprt->rx_ia.ri_device, - mw->mw_sg, mw->mw_nents, mw->mw_dir); - rpcrdma_put_mw(r_xprt, mw); + while (!list_empty(mrs)) { + mr = rpcrdma_mr_pop(mrs); + list_del(&mr->fmr.fm_mr->list); + rpcrdma_mr_unmap_and_put(mr); } return; @@ -299,10 +296,10 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mws) out_reset: pr_err("rpcrdma: ib_unmap_fmr failed (%i)\n", rc); - while (!list_empty(mws)) { - mw = rpcrdma_pop_mw(mws); - list_del(&mw->fmr.fm_mr->list); - fmr_op_recover_mr(mw); + while (!list_empty(mrs)) { + mr = rpcrdma_mr_pop(mrs); + list_del(&mr->fmr.fm_mr->list); + fmr_op_recover_mr(mr); } } diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index 773e66e10a15..90f688f19783 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -1,11 +1,11 @@ // SPDX-License-Identifier: GPL-2.0 /* - * Copyright (c) 2015 Oracle. All rights reserved. + * Copyright (c) 2015, 2017 Oracle. All rights reserved. * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. */ /* Lightweight memory registration using Fast Registration Work - * Requests (FRWR). Also referred to sometimes as FRMR mode. + * Requests (FRWR). * * FRWR features ordered asynchronous registration and deregistration * of arbitrarily sized memory regions. This is the fastest and safest @@ -15,9 +15,9 @@ /* Normal operation * * A Memory Region is prepared for RDMA READ or WRITE using a FAST_REG - * Work Request (frmr_op_map). When the RDMA operation is finished, this + * Work Request (frwr_op_map). When the RDMA operation is finished, this * Memory Region is invalidated using a LOCAL_INV Work Request - * (frmr_op_unmap). + * (frwr_op_unmap_sync). * * Typically these Work Requests are not signaled, and neither are RDMA * SEND Work Requests (with the exception of signaling occasionally to @@ -26,7 +26,7 @@ * * As an optimization, frwr_op_unmap marks MRs INVALID before the * LOCAL_INV WR is posted. If posting succeeds, the MR is placed on - * rb_mws immediately so that no work (like managing a linked list + * rb_mrs immediately so that no work (like managing a linked list * under a spinlock) is needed in the completion upcall. * * But this means that frwr_op_map() can occasionally encounter an MR @@ -60,7 +60,7 @@ * When frwr_op_map encounters FLUSHED and VALID MRs, they are recovered * with ib_dereg_mr and then are re-initialized. Because MR recovery * allocates fresh resources, it is deferred to a workqueue, and the - * recovered MRs are placed back on the rb_mws list when recovery is + * recovered MRs are placed back on the rb_mrs list when recovery is * complete. frwr_op_map allocates another MR for the current RPC while * the broken MR is reset. * @@ -96,26 +96,26 @@ out_not_supported: } static int -frwr_op_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r) +frwr_op_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr) { - unsigned int depth = ia->ri_max_frmr_depth; - struct rpcrdma_frmr *f = &r->frmr; + unsigned int depth = ia->ri_max_frwr_depth; + struct rpcrdma_frwr *frwr = &mr->frwr; int rc; - f->fr_mr = ib_alloc_mr(ia->ri_pd, ia->ri_mrtype, depth); - if (IS_ERR(f->fr_mr)) + frwr->fr_mr = ib_alloc_mr(ia->ri_pd, ia->ri_mrtype, depth); + if (IS_ERR(frwr->fr_mr)) goto out_mr_err; - r->mw_sg = kcalloc(depth, sizeof(*r->mw_sg), GFP_KERNEL); - if (!r->mw_sg) + mr->mr_sg = kcalloc(depth, sizeof(*mr->mr_sg), GFP_KERNEL); + if (!mr->mr_sg) goto out_list_err; - sg_init_table(r->mw_sg, depth); - init_completion(&f->fr_linv_done); + sg_init_table(mr->mr_sg, depth); + init_completion(&frwr->fr_linv_done); return 0; out_mr_err: - rc = PTR_ERR(f->fr_mr); + rc = PTR_ERR(frwr->fr_mr); dprintk("RPC: %s: ib_alloc_mr status %i\n", __func__, rc); return rc; @@ -124,83 +124,85 @@ out_list_err: rc = -ENOMEM; dprintk("RPC: %s: sg allocation failure\n", __func__); - ib_dereg_mr(f->fr_mr); + ib_dereg_mr(frwr->fr_mr); return rc; } static void -frwr_op_release_mr(struct rpcrdma_mw *r) +frwr_op_release_mr(struct rpcrdma_mr *mr) { int rc; - /* Ensure MW is not on any rl_registered list */ - if (!list_empty(&r->mw_list)) - list_del(&r->mw_list); + /* Ensure MR is not on any rl_registered list */ + if (!list_empty(&mr->mr_list)) + list_del(&mr->mr_list); - rc = ib_dereg_mr(r->frmr.fr_mr); + rc = ib_dereg_mr(mr->frwr.fr_mr); if (rc) pr_err("rpcrdma: final ib_dereg_mr for %p returned %i\n", - r, rc); - kfree(r->mw_sg); - kfree(r); + mr, rc); + kfree(mr->mr_sg); + kfree(mr); } static int -__frwr_reset_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r) +__frwr_mr_reset(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr) { - struct rpcrdma_frmr *f = &r->frmr; + struct rpcrdma_frwr *frwr = &mr->frwr; int rc; - rc = ib_dereg_mr(f->fr_mr); + rc = ib_dereg_mr(frwr->fr_mr); if (rc) { pr_warn("rpcrdma: ib_dereg_mr status %d, frwr %p orphaned\n", - rc, r); + rc, mr); return rc; } - f->fr_mr = ib_alloc_mr(ia->ri_pd, ia->ri_mrtype, - ia->ri_max_frmr_depth); - if (IS_ERR(f->fr_mr)) { + frwr->fr_mr = ib_alloc_mr(ia->ri_pd, ia->ri_mrtype, + ia->ri_max_frwr_depth); + if (IS_ERR(frwr->fr_mr)) { pr_warn("rpcrdma: ib_alloc_mr status %ld, frwr %p orphaned\n", - PTR_ERR(f->fr_mr), r); - return PTR_ERR(f->fr_mr); + PTR_ERR(frwr->fr_mr), mr); + return PTR_ERR(frwr->fr_mr); } - dprintk("RPC: %s: recovered FRMR %p\n", __func__, f); - f->fr_state = FRMR_IS_INVALID; + dprintk("RPC: %s: recovered FRWR %p\n", __func__, frwr); + frwr->fr_state = FRWR_IS_INVALID; return 0; } -/* Reset of a single FRMR. Generate a fresh rkey by replacing the MR. +/* Reset of a single FRWR. Generate a fresh rkey by replacing the MR. */ static void -frwr_op_recover_mr(struct rpcrdma_mw *mw) +frwr_op_recover_mr(struct rpcrdma_mr *mr) { - enum rpcrdma_frmr_state state = mw->frmr.fr_state; - struct rpcrdma_xprt *r_xprt = mw->mw_xprt; + enum rpcrdma_frwr_state state = mr->frwr.fr_state; + struct rpcrdma_xprt *r_xprt = mr->mr_xprt; struct rpcrdma_ia *ia = &r_xprt->rx_ia; int rc; - rc = __frwr_reset_mr(ia, mw); - if (state != FRMR_FLUSHED_LI) + rc = __frwr_mr_reset(ia, mr); + if (state != FRWR_FLUSHED_LI) { + trace_xprtrdma_dma_unmap(mr); ib_dma_unmap_sg(ia->ri_device, - mw->mw_sg, mw->mw_nents, mw->mw_dir); + mr->mr_sg, mr->mr_nents, mr->mr_dir); + } if (rc) goto out_release; - rpcrdma_put_mw(r_xprt, mw); + rpcrdma_mr_put(mr); r_xprt->rx_stats.mrs_recovered++; return; out_release: - pr_err("rpcrdma: FRMR reset failed %d, %p release\n", rc, mw); + pr_err("rpcrdma: FRWR reset failed %d, %p release\n", rc, mr); r_xprt->rx_stats.mrs_orphaned++; - spin_lock(&r_xprt->rx_buf.rb_mwlock); - list_del(&mw->mw_all); - spin_unlock(&r_xprt->rx_buf.rb_mwlock); + spin_lock(&r_xprt->rx_buf.rb_mrlock); + list_del(&mr->mr_all); + spin_unlock(&r_xprt->rx_buf.rb_mrlock); - frwr_op_release_mr(mw); + frwr_op_release_mr(mr); } static int @@ -214,31 +216,31 @@ frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep, if (attrs->device_cap_flags & IB_DEVICE_SG_GAPS_REG) ia->ri_mrtype = IB_MR_TYPE_SG_GAPS; - ia->ri_max_frmr_depth = + ia->ri_max_frwr_depth = min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS, attrs->max_fast_reg_page_list_len); dprintk("RPC: %s: device's max FR page list len = %u\n", - __func__, ia->ri_max_frmr_depth); - - /* Add room for frmr register and invalidate WRs. - * 1. FRMR reg WR for head - * 2. FRMR invalidate WR for head - * 3. N FRMR reg WRs for pagelist - * 4. N FRMR invalidate WRs for pagelist - * 5. FRMR reg WR for tail - * 6. FRMR invalidate WR for tail + __func__, ia->ri_max_frwr_depth); + + /* Add room for frwr register and invalidate WRs. + * 1. FRWR reg WR for head + * 2. FRWR invalidate WR for head + * 3. N FRWR reg WRs for pagelist + * 4. N FRWR invalidate WRs for pagelist + * 5. FRWR reg WR for tail + * 6. FRWR invalidate WR for tail * 7. The RDMA_SEND WR */ depth = 7; - /* Calculate N if the device max FRMR depth is smaller than + /* Calculate N if the device max FRWR depth is smaller than * RPCRDMA_MAX_DATA_SEGS. */ - if (ia->ri_max_frmr_depth < RPCRDMA_MAX_DATA_SEGS) { - delta = RPCRDMA_MAX_DATA_SEGS - ia->ri_max_frmr_depth; + if (ia->ri_max_frwr_depth < RPCRDMA_MAX_DATA_SEGS) { + delta = RPCRDMA_MAX_DATA_SEGS - ia->ri_max_frwr_depth; do { - depth += 2; /* FRMR reg + invalidate */ - delta -= ia->ri_max_frmr_depth; + depth += 2; /* FRWR reg + invalidate */ + delta -= ia->ri_max_frwr_depth; } while (delta > 0); } @@ -252,7 +254,7 @@ frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep, } ia->ri_max_segs = max_t(unsigned int, 1, RPCRDMA_MAX_DATA_SEGS / - ia->ri_max_frmr_depth); + ia->ri_max_frwr_depth); return 0; } @@ -265,7 +267,7 @@ frwr_op_maxpages(struct rpcrdma_xprt *r_xprt) struct rpcrdma_ia *ia = &r_xprt->rx_ia; return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS, - RPCRDMA_MAX_HDR_SEGS * ia->ri_max_frmr_depth); + RPCRDMA_MAX_HDR_SEGS * ia->ri_max_frwr_depth); } static void @@ -286,16 +288,16 @@ __frwr_sendcompletion_flush(struct ib_wc *wc, const char *wr) static void frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc) { - struct rpcrdma_frmr *frmr; - struct ib_cqe *cqe; + struct ib_cqe *cqe = wc->wr_cqe; + struct rpcrdma_frwr *frwr = + container_of(cqe, struct rpcrdma_frwr, fr_cqe); /* WARNING: Only wr_cqe and status are reliable at this point */ if (wc->status != IB_WC_SUCCESS) { - cqe = wc->wr_cqe; - frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe); - frmr->fr_state = FRMR_FLUSHED_FR; + frwr->fr_state = FRWR_FLUSHED_FR; __frwr_sendcompletion_flush(wc, "fastreg"); } + trace_xprtrdma_wc_fastreg(wc, frwr); } /** @@ -307,16 +309,16 @@ frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc) static void frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc) { - struct rpcrdma_frmr *frmr; - struct ib_cqe *cqe; + struct ib_cqe *cqe = wc->wr_cqe; + struct rpcrdma_frwr *frwr = container_of(cqe, struct rpcrdma_frwr, + fr_cqe); /* WARNING: Only wr_cqe and status are reliable at this point */ if (wc->status != IB_WC_SUCCESS) { - cqe = wc->wr_cqe; - frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe); - frmr->fr_state = FRMR_FLUSHED_LI; + frwr->fr_state = FRWR_FLUSHED_LI; __frwr_sendcompletion_flush(wc, "localinv"); } + trace_xprtrdma_wc_li(wc, frwr); } /** @@ -329,17 +331,17 @@ frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc) static void frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc) { - struct rpcrdma_frmr *frmr; - struct ib_cqe *cqe; + struct ib_cqe *cqe = wc->wr_cqe; + struct rpcrdma_frwr *frwr = container_of(cqe, struct rpcrdma_frwr, + fr_cqe); /* WARNING: Only wr_cqe and status are reliable at this point */ - cqe = wc->wr_cqe; - frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe); if (wc->status != IB_WC_SUCCESS) { - frmr->fr_state = FRMR_FLUSHED_LI; + frwr->fr_state = FRWR_FLUSHED_LI; __frwr_sendcompletion_flush(wc, "localinv"); } - complete(&frmr->fr_linv_done); + complete(&frwr->fr_linv_done); + trace_xprtrdma_wc_li_wake(wc, frwr); } /* Post a REG_MR Work Request to register a memory region @@ -347,41 +349,39 @@ frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc) */ static struct rpcrdma_mr_seg * frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, - int nsegs, bool writing, struct rpcrdma_mw **out) + int nsegs, bool writing, struct rpcrdma_mr **out) { struct rpcrdma_ia *ia = &r_xprt->rx_ia; bool holes_ok = ia->ri_mrtype == IB_MR_TYPE_SG_GAPS; - struct rpcrdma_mw *mw; - struct rpcrdma_frmr *frmr; - struct ib_mr *mr; + struct rpcrdma_frwr *frwr; + struct rpcrdma_mr *mr; + struct ib_mr *ibmr; struct ib_reg_wr *reg_wr; struct ib_send_wr *bad_wr; int rc, i, n; u8 key; - mw = NULL; + mr = NULL; do { - if (mw) - rpcrdma_defer_mr_recovery(mw); - mw = rpcrdma_get_mw(r_xprt); - if (!mw) + if (mr) + rpcrdma_mr_defer_recovery(mr); + mr = rpcrdma_mr_get(r_xprt); + if (!mr) return ERR_PTR(-ENOBUFS); - } while (mw->frmr.fr_state != FRMR_IS_INVALID); - frmr = &mw->frmr; - frmr->fr_state = FRMR_IS_VALID; - mr = frmr->fr_mr; - reg_wr = &frmr->fr_regwr; - - if (nsegs > ia->ri_max_frmr_depth) - nsegs = ia->ri_max_frmr_depth; + } while (mr->frwr.fr_state != FRWR_IS_INVALID); + frwr = &mr->frwr; + frwr->fr_state = FRWR_IS_VALID; + + if (nsegs > ia->ri_max_frwr_depth) + nsegs = ia->ri_max_frwr_depth; for (i = 0; i < nsegs;) { if (seg->mr_page) - sg_set_page(&mw->mw_sg[i], + sg_set_page(&mr->mr_sg[i], seg->mr_page, seg->mr_len, offset_in_page(seg->mr_offset)); else - sg_set_buf(&mw->mw_sg[i], seg->mr_offset, + sg_set_buf(&mr->mr_sg[i], seg->mr_offset, seg->mr_len); ++seg; @@ -392,30 +392,29 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len)) break; } - mw->mw_dir = rpcrdma_data_dir(writing); + mr->mr_dir = rpcrdma_data_dir(writing); - mw->mw_nents = ib_dma_map_sg(ia->ri_device, mw->mw_sg, i, mw->mw_dir); - if (!mw->mw_nents) + mr->mr_nents = ib_dma_map_sg(ia->ri_device, mr->mr_sg, i, mr->mr_dir); + if (!mr->mr_nents) goto out_dmamap_err; - n = ib_map_mr_sg(mr, mw->mw_sg, mw->mw_nents, NULL, PAGE_SIZE); - if (unlikely(n != mw->mw_nents)) + ibmr = frwr->fr_mr; + n = ib_map_mr_sg(ibmr, mr->mr_sg, mr->mr_nents, NULL, PAGE_SIZE); + if (unlikely(n != mr->mr_nents)) goto out_mapmr_err; - dprintk("RPC: %s: Using frmr %p to map %u segments (%llu bytes)\n", - __func__, frmr, mw->mw_nents, mr->length); - - key = (u8)(mr->rkey & 0x000000FF); - ib_update_fast_reg_key(mr, ++key); + key = (u8)(ibmr->rkey & 0x000000FF); + ib_update_fast_reg_key(ibmr, ++key); + reg_wr = &frwr->fr_regwr; reg_wr->wr.next = NULL; reg_wr->wr.opcode = IB_WR_REG_MR; - frmr->fr_cqe.done = frwr_wc_fastreg; - reg_wr->wr.wr_cqe = &frmr->fr_cqe; + frwr->fr_cqe.done = frwr_wc_fastreg; + reg_wr->wr.wr_cqe = &frwr->fr_cqe; reg_wr->wr.num_sge = 0; reg_wr->wr.send_flags = 0; - reg_wr->mr = mr; - reg_wr->key = mr->rkey; + reg_wr->mr = ibmr; + reg_wr->key = ibmr->rkey; reg_wr->access = writing ? IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE : IB_ACCESS_REMOTE_READ; @@ -424,47 +423,64 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, if (rc) goto out_senderr; - mw->mw_handle = mr->rkey; - mw->mw_length = mr->length; - mw->mw_offset = mr->iova; + mr->mr_handle = ibmr->rkey; + mr->mr_length = ibmr->length; + mr->mr_offset = ibmr->iova; - *out = mw; + *out = mr; return seg; out_dmamap_err: pr_err("rpcrdma: failed to DMA map sg %p sg_nents %d\n", - mw->mw_sg, i); - frmr->fr_state = FRMR_IS_INVALID; - rpcrdma_put_mw(r_xprt, mw); + mr->mr_sg, i); + frwr->fr_state = FRWR_IS_INVALID; + rpcrdma_mr_put(mr); return ERR_PTR(-EIO); out_mapmr_err: pr_err("rpcrdma: failed to map mr %p (%d/%d)\n", - frmr->fr_mr, n, mw->mw_nents); - rpcrdma_defer_mr_recovery(mw); + frwr->fr_mr, n, mr->mr_nents); + rpcrdma_mr_defer_recovery(mr); return ERR_PTR(-EIO); out_senderr: - pr_err("rpcrdma: FRMR registration ib_post_send returned %i\n", rc); - rpcrdma_defer_mr_recovery(mw); + pr_err("rpcrdma: FRWR registration ib_post_send returned %i\n", rc); + rpcrdma_mr_defer_recovery(mr); return ERR_PTR(-ENOTCONN); } +/* Handle a remotely invalidated mr on the @mrs list + */ +static void +frwr_op_reminv(struct rpcrdma_rep *rep, struct list_head *mrs) +{ + struct rpcrdma_mr *mr; + + list_for_each_entry(mr, mrs, mr_list) + if (mr->mr_handle == rep->rr_inv_rkey) { + list_del(&mr->mr_list); + trace_xprtrdma_remoteinv(mr); + mr->frwr.fr_state = FRWR_IS_INVALID; + rpcrdma_mr_unmap_and_put(mr); + break; /* only one invalidated MR per RPC */ + } +} + /* Invalidate all memory regions that were registered for "req". * * Sleeps until it is safe for the host CPU to access the * previously mapped memory regions. * - * Caller ensures that @mws is not empty before the call. This + * Caller ensures that @mrs is not empty before the call. This * function empties the list. */ static void -frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mws) +frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mrs) { struct ib_send_wr *first, **prev, *last, *bad_wr; struct rpcrdma_ia *ia = &r_xprt->rx_ia; - struct rpcrdma_frmr *f; - struct rpcrdma_mw *mw; + struct rpcrdma_frwr *frwr; + struct rpcrdma_mr *mr; int count, rc; /* ORDER: Invalidate all of the MRs first @@ -472,31 +488,27 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mws) * Chain the LOCAL_INV Work Requests and post them with * a single ib_post_send() call. */ - f = NULL; + frwr = NULL; count = 0; prev = &first; - list_for_each_entry(mw, mws, mw_list) { - mw->frmr.fr_state = FRMR_IS_INVALID; + list_for_each_entry(mr, mrs, mr_list) { + mr->frwr.fr_state = FRWR_IS_INVALID; - if (mw->mw_flags & RPCRDMA_MW_F_RI) - continue; + frwr = &mr->frwr; + trace_xprtrdma_localinv(mr); - f = &mw->frmr; - dprintk("RPC: %s: invalidating frmr %p\n", - __func__, f); - - f->fr_cqe.done = frwr_wc_localinv; - last = &f->fr_invwr; + frwr->fr_cqe.done = frwr_wc_localinv; + last = &frwr->fr_invwr; memset(last, 0, sizeof(*last)); - last->wr_cqe = &f->fr_cqe; + last->wr_cqe = &frwr->fr_cqe; last->opcode = IB_WR_LOCAL_INV; - last->ex.invalidate_rkey = mw->mw_handle; + last->ex.invalidate_rkey = mr->mr_handle; count++; *prev = last; prev = &last->next; } - if (!f) + if (!frwr) goto unmap; /* Strong send queue ordering guarantees that when the @@ -504,8 +516,8 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mws) * are complete. */ last->send_flags = IB_SEND_SIGNALED; - f->fr_cqe.done = frwr_wc_localinv_wake; - reinit_completion(&f->fr_linv_done); + frwr->fr_cqe.done = frwr_wc_localinv_wake; + reinit_completion(&frwr->fr_linv_done); /* Transport disconnect drains the receive CQ before it * replaces the QP. The RPC reply handler won't call us @@ -515,36 +527,32 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mws) bad_wr = NULL; rc = ib_post_send(ia->ri_id->qp, first, &bad_wr); if (bad_wr != first) - wait_for_completion(&f->fr_linv_done); + wait_for_completion(&frwr->fr_linv_done); if (rc) goto reset_mrs; /* ORDER: Now DMA unmap all of the MRs, and return - * them to the free MW list. + * them to the free MR list. */ unmap: - while (!list_empty(mws)) { - mw = rpcrdma_pop_mw(mws); - dprintk("RPC: %s: DMA unmapping frmr %p\n", - __func__, &mw->frmr); - ib_dma_unmap_sg(ia->ri_device, - mw->mw_sg, mw->mw_nents, mw->mw_dir); - rpcrdma_put_mw(r_xprt, mw); + while (!list_empty(mrs)) { + mr = rpcrdma_mr_pop(mrs); + rpcrdma_mr_unmap_and_put(mr); } return; reset_mrs: - pr_err("rpcrdma: FRMR invalidate ib_post_send returned %i\n", rc); + pr_err("rpcrdma: FRWR invalidate ib_post_send returned %i\n", rc); /* Find and reset the MRs in the LOCAL_INV WRs that did not * get posted. */ while (bad_wr) { - f = container_of(bad_wr, struct rpcrdma_frmr, - fr_invwr); - mw = container_of(f, struct rpcrdma_mw, frmr); + frwr = container_of(bad_wr, struct rpcrdma_frwr, + fr_invwr); + mr = container_of(frwr, struct rpcrdma_mr, frwr); - __frwr_reset_mr(ia, mw); + __frwr_mr_reset(ia, mr); bad_wr = bad_wr->next; } @@ -553,6 +561,7 @@ reset_mrs: const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = { .ro_map = frwr_op_map, + .ro_reminv = frwr_op_reminv, .ro_unmap_sync = frwr_op_unmap_sync, .ro_recover_mr = frwr_op_recover_mr, .ro_open = frwr_op_open, diff --git a/net/sunrpc/xprtrdma/module.c b/net/sunrpc/xprtrdma/module.c index 560712bd9fa2..a762d192372b 100644 --- a/net/sunrpc/xprtrdma/module.c +++ b/net/sunrpc/xprtrdma/module.c @@ -1,18 +1,20 @@ /* - * Copyright (c) 2015 Oracle. All rights reserved. + * Copyright (c) 2015, 2017 Oracle. All rights reserved. */ /* rpcrdma.ko module initialization */ +#include <linux/types.h> +#include <linux/compiler.h> #include <linux/module.h> #include <linux/init.h> #include <linux/sunrpc/svc_rdma.h> -#include "xprt_rdma.h" -#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) -# define RPCDBG_FACILITY RPCDBG_TRANS -#endif +#include <asm/swab.h> + +#define CREATE_TRACE_POINTS +#include "xprt_rdma.h" MODULE_AUTHOR("Open Grid Computing and Network Appliance, Inc."); MODULE_DESCRIPTION("RPC/RDMA Transport"); diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index a3f2ab283aeb..162e5dd82466 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -292,15 +292,15 @@ encode_item_not_present(struct xdr_stream *xdr) } static void -xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mw *mw) +xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mr *mr) { - *iptr++ = cpu_to_be32(mw->mw_handle); - *iptr++ = cpu_to_be32(mw->mw_length); - xdr_encode_hyper(iptr, mw->mw_offset); + *iptr++ = cpu_to_be32(mr->mr_handle); + *iptr++ = cpu_to_be32(mr->mr_length); + xdr_encode_hyper(iptr, mr->mr_offset); } static int -encode_rdma_segment(struct xdr_stream *xdr, struct rpcrdma_mw *mw) +encode_rdma_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr) { __be32 *p; @@ -308,12 +308,12 @@ encode_rdma_segment(struct xdr_stream *xdr, struct rpcrdma_mw *mw) if (unlikely(!p)) return -EMSGSIZE; - xdr_encode_rdma_segment(p, mw); + xdr_encode_rdma_segment(p, mr); return 0; } static int -encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mw *mw, +encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr, u32 position) { __be32 *p; @@ -324,7 +324,7 @@ encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mw *mw, *p++ = xdr_one; /* Item present */ *p++ = cpu_to_be32(position); - xdr_encode_rdma_segment(p, mw); + xdr_encode_rdma_segment(p, mr); return 0; } @@ -348,7 +348,7 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, { struct xdr_stream *xdr = &req->rl_stream; struct rpcrdma_mr_seg *seg; - struct rpcrdma_mw *mw; + struct rpcrdma_mr *mr; unsigned int pos; int nsegs; @@ -363,21 +363,17 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, do { seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, - false, &mw); + false, &mr); if (IS_ERR(seg)) return PTR_ERR(seg); - rpcrdma_push_mw(mw, &req->rl_registered); + rpcrdma_mr_push(mr, &req->rl_registered); - if (encode_read_segment(xdr, mw, pos) < 0) + if (encode_read_segment(xdr, mr, pos) < 0) return -EMSGSIZE; - dprintk("RPC: %5u %s: pos %u %u@0x%016llx:0x%08x (%s)\n", - rqst->rq_task->tk_pid, __func__, pos, - mw->mw_length, (unsigned long long)mw->mw_offset, - mw->mw_handle, mw->mw_nents < nsegs ? "more" : "last"); - + trace_xprtrdma_read_chunk(rqst->rq_task, pos, mr, nsegs); r_xprt->rx_stats.read_chunk_count++; - nsegs -= mw->mw_nents; + nsegs -= mr->mr_nents; } while (nsegs); return 0; @@ -404,7 +400,7 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, { struct xdr_stream *xdr = &req->rl_stream; struct rpcrdma_mr_seg *seg; - struct rpcrdma_mw *mw; + struct rpcrdma_mr *mr; int nsegs, nchunks; __be32 *segcount; @@ -425,23 +421,19 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, nchunks = 0; do { seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, - true, &mw); + true, &mr); if (IS_ERR(seg)) return PTR_ERR(seg); - rpcrdma_push_mw(mw, &req->rl_registered); + rpcrdma_mr_push(mr, &req->rl_registered); - if (encode_rdma_segment(xdr, mw) < 0) + if (encode_rdma_segment(xdr, mr) < 0) return -EMSGSIZE; - dprintk("RPC: %5u %s: %u@0x016%llx:0x%08x (%s)\n", - rqst->rq_task->tk_pid, __func__, - mw->mw_length, (unsigned long long)mw->mw_offset, - mw->mw_handle, mw->mw_nents < nsegs ? "more" : "last"); - + trace_xprtrdma_write_chunk(rqst->rq_task, mr, nsegs); r_xprt->rx_stats.write_chunk_count++; - r_xprt->rx_stats.total_rdma_request += seg->mr_len; + r_xprt->rx_stats.total_rdma_request += mr->mr_length; nchunks++; - nsegs -= mw->mw_nents; + nsegs -= mr->mr_nents; } while (nsegs); /* Update count of segments in this Write chunk */ @@ -468,7 +460,7 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, { struct xdr_stream *xdr = &req->rl_stream; struct rpcrdma_mr_seg *seg; - struct rpcrdma_mw *mw; + struct rpcrdma_mr *mr; int nsegs, nchunks; __be32 *segcount; @@ -487,23 +479,19 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, nchunks = 0; do { seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, - true, &mw); + true, &mr); if (IS_ERR(seg)) return PTR_ERR(seg); - rpcrdma_push_mw(mw, &req->rl_registered); + rpcrdma_mr_push(mr, &req->rl_registered); - if (encode_rdma_segment(xdr, mw) < 0) + if (encode_rdma_segment(xdr, mr) < 0) return -EMSGSIZE; - dprintk("RPC: %5u %s: %u@0x%016llx:0x%08x (%s)\n", - rqst->rq_task->tk_pid, __func__, - mw->mw_length, (unsigned long long)mw->mw_offset, - mw->mw_handle, mw->mw_nents < nsegs ? "more" : "last"); - + trace_xprtrdma_reply_chunk(rqst->rq_task, mr, nsegs); r_xprt->rx_stats.reply_chunk_count++; - r_xprt->rx_stats.total_rdma_request += seg->mr_len; + r_xprt->rx_stats.total_rdma_request += mr->mr_length; nchunks++; - nsegs -= mw->mw_nents; + nsegs -= mr->mr_nents; } while (nsegs); /* Update count of segments in the Reply chunk */ @@ -524,9 +512,6 @@ rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc) struct ib_sge *sge; unsigned int count; - dprintk("RPC: %s: unmapping %u sges for sc=%p\n", - __func__, sc->sc_unmap_count, sc); - /* The first two SGEs contain the transport header and * the inline buffer. These are always left mapped so * they can be cheaply re-used. @@ -754,11 +739,6 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) __be32 *p; int ret; -#if defined(CONFIG_SUNRPC_BACKCHANNEL) - if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state)) - return rpcrdma_bc_marshal_reply(rqst); -#endif - rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0); xdr_init_encode(xdr, &req->rl_hdrbuf, req->rl_rdmabuf->rg_base); @@ -821,6 +801,17 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) rtype = rpcrdma_areadch; } + /* If this is a retransmit, discard previously registered + * chunks. Very likely the connection has been replaced, + * so these registrations are invalid and unusable. + */ + while (unlikely(!list_empty(&req->rl_registered))) { + struct rpcrdma_mr *mr; + + mr = rpcrdma_mr_pop(&req->rl_registered); + rpcrdma_mr_defer_recovery(mr); + } + /* This implementation supports the following combinations * of chunk lists in one RPC-over-RDMA Call message: * @@ -868,10 +859,7 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) if (ret) goto out_err; - dprintk("RPC: %5u %s: %s/%s: hdrlen %u rpclen\n", - rqst->rq_task->tk_pid, __func__, - transfertypes[rtype], transfertypes[wtype], - xdr_stream_pos(xdr)); + trace_xprtrdma_marshal(rqst, xdr_stream_pos(xdr), rtype, wtype); ret = rpcrdma_prepare_send_sges(r_xprt, req, xdr_stream_pos(xdr), &rqst->rq_snd_buf, rtype); @@ -926,8 +914,7 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad) curlen = rqst->rq_rcv_buf.head[0].iov_len; if (curlen > copy_len) curlen = copy_len; - dprintk("RPC: %s: srcp 0x%p len %d hdrlen %d\n", - __func__, srcp, copy_len, curlen); + trace_xprtrdma_fixup(rqst, copy_len, curlen); srcp += curlen; copy_len -= curlen; @@ -947,9 +934,8 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad) if (curlen > pagelist_len) curlen = pagelist_len; - dprintk("RPC: %s: page %d" - " srcp 0x%p len %d curlen %d\n", - __func__, i, srcp, copy_len, curlen); + trace_xprtrdma_fixup_pg(rqst, i, srcp, + copy_len, curlen); destp = kmap_atomic(ppages[i]); memcpy(destp + page_base, srcp, curlen); flush_dcache_page(ppages[i]); @@ -984,24 +970,6 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad) return fixup_copy_count; } -/* Caller must guarantee @rep remains stable during this call. - */ -static void -rpcrdma_mark_remote_invalidation(struct list_head *mws, - struct rpcrdma_rep *rep) -{ - struct rpcrdma_mw *mw; - - if (!(rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)) - return; - - list_for_each_entry(mw, mws, mw_list) - if (mw->mw_handle == rep->rr_inv_rkey) { - mw->mw_flags = RPCRDMA_MW_F_RI; - break; /* only one invalidated MR per RPC */ - } -} - /* By convention, backchannel calls arrive via rdma_msg type * messages, and never populate the chunk lists. This makes * the RPC/RDMA header small and fixed in size, so it is @@ -1058,26 +1026,19 @@ out_short: static int decode_rdma_segment(struct xdr_stream *xdr, u32 *length) { + u32 handle; + u64 offset; __be32 *p; p = xdr_inline_decode(xdr, 4 * sizeof(*p)); if (unlikely(!p)) return -EIO; - ifdebug(FACILITY) { - u64 offset; - u32 handle; - - handle = be32_to_cpup(p++); - *length = be32_to_cpup(p++); - xdr_decode_hyper(p, &offset); - dprintk("RPC: %s: segment %u@0x%016llx:0x%08x\n", - __func__, *length, (unsigned long long)offset, - handle); - } else { - *length = be32_to_cpup(p + 1); - } + handle = be32_to_cpup(p++); + *length = be32_to_cpup(p++); + xdr_decode_hyper(p, &offset); + trace_xprtrdma_decode_seg(handle, *length, offset); return 0; } @@ -1098,8 +1059,6 @@ static int decode_write_chunk(struct xdr_stream *xdr, u32 *length) *length += seglength; } - dprintk("RPC: %s: segcount=%u, %u bytes\n", - __func__, be32_to_cpup(p), *length); return 0; } @@ -1296,8 +1255,7 @@ out: * being marshaled. */ out_badheader: - dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n", - rqst->rq_task->tk_pid, __func__, be32_to_cpu(rep->rr_proc)); + trace_xprtrdma_reply_hdr(rep); r_xprt->rx_stats.bad_reply_count++; status = -EIO; goto out; @@ -1339,9 +1297,12 @@ void rpcrdma_deferred_completion(struct work_struct *work) struct rpcrdma_rep *rep = container_of(work, struct rpcrdma_rep, rr_work); struct rpcrdma_req *req = rpcr_to_rdmar(rep->rr_rqst); + struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; - rpcrdma_mark_remote_invalidation(&req->rl_registered, rep); - rpcrdma_release_rqst(rep->rr_rxprt, req); + trace_xprtrdma_defer_cmp(rep); + if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE) + r_xprt->rx_ia.ri_ops->ro_reminv(rep, &req->rl_registered); + rpcrdma_release_rqst(r_xprt, req); rpcrdma_complete_rqst(rep); } @@ -1360,8 +1321,6 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep) u32 credits; __be32 *p; - dprintk("RPC: %s: incoming rep %p\n", __func__, rep); - if (rep->rr_hdrbuf.head[0].iov_len == 0) goto out_badstatus; @@ -1405,8 +1364,7 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep) rep->rr_rqst = rqst; clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags); - dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n", - __func__, rep, req, be32_to_cpu(rep->rr_xid)); + trace_xprtrdma_reply(rqst->rq_task, rep, req, credits); queue_work_on(req->rl_cpu, rpcrdma_receive_wq, &rep->rr_work); return; @@ -1420,8 +1378,7 @@ out_badstatus: return; out_badversion: - dprintk("RPC: %s: invalid version %d\n", - __func__, be32_to_cpu(rep->rr_vers)); + trace_xprtrdma_reply_vers(rep); goto repost; /* The RPC transaction has already been terminated, or the header @@ -1429,12 +1386,11 @@ out_badversion: */ out_norqst: spin_unlock(&xprt->recv_lock); - dprintk("RPC: %s: no match for incoming xid 0x%08x\n", - __func__, be32_to_cpu(rep->rr_xid)); + trace_xprtrdma_reply_rqst(rep); goto repost; out_shortreply: - dprintk("RPC: %s: short/invalid reply\n", __func__); + trace_xprtrdma_reply_short(rep); /* If no pending RPC transaction was matched, post a replacement * receive buffer before returning. diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 6ee1ad8978f3..4b1ecfe979cf 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -67,8 +67,7 @@ static unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE; unsigned int xprt_rdma_max_inline_read = RPCRDMA_DEF_INLINE; static unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE; -static unsigned int xprt_rdma_inline_write_padding; -unsigned int xprt_rdma_memreg_strategy = RPCRDMA_FRMR; +unsigned int xprt_rdma_memreg_strategy = RPCRDMA_FRWR; int xprt_rdma_pad_optimize; #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) @@ -81,6 +80,7 @@ static unsigned int zero; static unsigned int max_padding = PAGE_SIZE; static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS; static unsigned int max_memreg = RPCRDMA_LAST - 1; +static unsigned int dummy; static struct ctl_table_header *sunrpc_table_header; @@ -114,7 +114,7 @@ static struct ctl_table xr_tunables_table[] = { }, { .procname = "rdma_inline_write_padding", - .data = &xprt_rdma_inline_write_padding, + .data = &dummy, .maxlen = sizeof(unsigned int), .mode = 0644, .proc_handler = proc_dointvec_minmax, @@ -259,13 +259,10 @@ xprt_rdma_connect_worker(struct work_struct *work) xprt_clear_connected(xprt); - dprintk("RPC: %s: %sconnect\n", __func__, - r_xprt->rx_ep.rep_connected != 0 ? "re" : ""); rc = rpcrdma_ep_connect(&r_xprt->rx_ep, &r_xprt->rx_ia); if (rc) xprt_wake_pending_tasks(xprt, rc); - dprintk("RPC: %s: exit\n", __func__); xprt_clear_connecting(xprt); } @@ -275,7 +272,7 @@ xprt_rdma_inject_disconnect(struct rpc_xprt *xprt) struct rpcrdma_xprt *r_xprt = container_of(xprt, struct rpcrdma_xprt, rx_xprt); - pr_info("rpcrdma: injecting transport disconnect on xprt=%p\n", xprt); + trace_xprtrdma_inject_dsc(r_xprt); rdma_disconnect(r_xprt->rx_ia.ri_id); } @@ -295,7 +292,7 @@ xprt_rdma_destroy(struct rpc_xprt *xprt) { struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); - dprintk("RPC: %s: called\n", __func__); + trace_xprtrdma_destroy(r_xprt); cancel_delayed_work_sync(&r_xprt->rx_connect_worker); @@ -306,11 +303,8 @@ xprt_rdma_destroy(struct rpc_xprt *xprt) rpcrdma_ia_close(&r_xprt->rx_ia); xprt_rdma_free_addresses(xprt); - xprt_free(xprt); - dprintk("RPC: %s: returning\n", __func__); - module_put(THIS_MODULE); } @@ -361,9 +355,7 @@ xprt_setup_rdma(struct xprt_create *args) /* * Set up RDMA-specific connect data. */ - - sap = (struct sockaddr *)&cdata.addr; - memcpy(sap, args->dstaddr, args->addrlen); + sap = args->dstaddr; /* Ensure xprt->addr holds valid server TCP (not RDMA) * address, for any side protocols which peek at it */ @@ -373,6 +365,7 @@ xprt_setup_rdma(struct xprt_create *args) if (rpc_get_port(sap)) xprt_set_bound(xprt); + xprt_rdma_format_addresses(xprt, sap); cdata.max_requests = xprt->max_reqs; @@ -387,8 +380,6 @@ xprt_setup_rdma(struct xprt_create *args) if (cdata.inline_rsize > cdata.rsize) cdata.inline_rsize = cdata.rsize; - cdata.padding = xprt_rdma_inline_write_padding; - /* * Create new transport instance, which includes initialized * o ia @@ -398,7 +389,7 @@ xprt_setup_rdma(struct xprt_create *args) new_xprt = rpcx_to_rdmax(xprt); - rc = rpcrdma_ia_open(new_xprt, sap); + rc = rpcrdma_ia_open(new_xprt); if (rc) goto out1; @@ -407,31 +398,19 @@ xprt_setup_rdma(struct xprt_create *args) */ new_xprt->rx_data = cdata; new_ep = &new_xprt->rx_ep; - new_ep->rep_remote_addr = cdata.addr; rc = rpcrdma_ep_create(&new_xprt->rx_ep, &new_xprt->rx_ia, &new_xprt->rx_data); if (rc) goto out2; - /* - * Allocate pre-registered send and receive buffers for headers and - * any inline data. Also specify any padding which will be provided - * from a preregistered zero buffer. - */ rc = rpcrdma_buffer_create(new_xprt); if (rc) goto out3; - /* - * Register a callback for connection events. This is necessary because - * connection loss notification is async. We also catch connection loss - * when reaping receives. - */ INIT_DELAYED_WORK(&new_xprt->rx_connect_worker, xprt_rdma_connect_worker); - xprt_rdma_format_addresses(xprt, sap); xprt->max_payload = new_xprt->rx_ia.ri_ops->ro_maxpages(new_xprt); if (xprt->max_payload == 0) goto out4; @@ -445,16 +424,19 @@ xprt_setup_rdma(struct xprt_create *args) dprintk("RPC: %s: %s:%s\n", __func__, xprt->address_strings[RPC_DISPLAY_ADDR], xprt->address_strings[RPC_DISPLAY_PORT]); + trace_xprtrdma_create(new_xprt); return xprt; out4: - xprt_rdma_free_addresses(xprt); - rc = -EINVAL; + rpcrdma_buffer_destroy(&new_xprt->rx_buf); + rc = -ENODEV; out3: rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia); out2: rpcrdma_ia_close(&new_xprt->rx_ia); out1: + trace_xprtrdma_destroy(new_xprt); + xprt_rdma_free_addresses(xprt); xprt_free(xprt); return ERR_PTR(rc); } @@ -488,16 +470,34 @@ xprt_rdma_close(struct rpc_xprt *xprt) rpcrdma_ep_disconnect(ep, ia); } +/** + * xprt_rdma_set_port - update server port with rpcbind result + * @xprt: controlling RPC transport + * @port: new port value + * + * Transport connect status is unchanged. + */ static void xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port) { - struct sockaddr_in *sap; + struct sockaddr *sap = (struct sockaddr *)&xprt->addr; + char buf[8]; - sap = (struct sockaddr_in *)&xprt->addr; - sap->sin_port = htons(port); - sap = (struct sockaddr_in *)&rpcx_to_rdmad(xprt).addr; - sap->sin_port = htons(port); - dprintk("RPC: %s: %u\n", __func__, port); + dprintk("RPC: %s: setting port for xprt %p (%s:%s) to %u\n", + __func__, xprt, + xprt->address_strings[RPC_DISPLAY_ADDR], + xprt->address_strings[RPC_DISPLAY_PORT], + port); + + rpc_set_port(sap, port); + + kfree(xprt->address_strings[RPC_DISPLAY_PORT]); + snprintf(buf, sizeof(buf), "%u", port); + xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL); + + kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]); + snprintf(buf, sizeof(buf), "%4hx", port); + xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL); } /** @@ -516,8 +516,6 @@ xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port) static void xprt_rdma_timer(struct rpc_xprt *xprt, struct rpc_task *task) { - dprintk("RPC: %5u %s: xprt = %p\n", task->tk_pid, __func__, xprt); - xprt_force_disconnect(xprt); } @@ -640,7 +638,7 @@ xprt_rdma_allocate(struct rpc_task *task) req = rpcrdma_buffer_get(&r_xprt->rx_buf); if (req == NULL) - return -ENOMEM; + goto out_get; flags = RPCRDMA_DEF_GFP; if (RPC_IS_SWAPPER(task)) @@ -653,19 +651,18 @@ xprt_rdma_allocate(struct rpc_task *task) if (!rpcrdma_get_recvbuf(r_xprt, req, rqst->rq_rcvsize, flags)) goto out_fail; - dprintk("RPC: %5u %s: send size = %zd, recv size = %zd, req = %p\n", - task->tk_pid, __func__, rqst->rq_callsize, - rqst->rq_rcvsize, req); - req->rl_cpu = smp_processor_id(); req->rl_connect_cookie = 0; /* our reserved value */ rpcrdma_set_xprtdata(rqst, req); rqst->rq_buffer = req->rl_sendbuf->rg_base; rqst->rq_rbuffer = req->rl_recvbuf->rg_base; + trace_xprtrdma_allocate(task, req); return 0; out_fail: rpcrdma_buffer_put(req); +out_get: + trace_xprtrdma_allocate(task, NULL); return -ENOMEM; } @@ -682,13 +679,9 @@ xprt_rdma_free(struct rpc_task *task) struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); struct rpcrdma_req *req = rpcr_to_rdmar(rqst); - if (test_bit(RPCRDMA_REQ_F_BACKCHANNEL, &req->rl_flags)) - return; - - dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply); - if (test_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags)) rpcrdma_release_rqst(r_xprt, req); + trace_xprtrdma_rpc_done(task, req); rpcrdma_buffer_put(req); } @@ -698,22 +691,12 @@ xprt_rdma_free(struct rpc_task *task) * * Caller holds the transport's write lock. * - * Return values: - * 0: The request has been sent - * ENOTCONN: Caller needs to invoke connect logic then call again - * ENOBUFS: Call again later to send the request - * EIO: A permanent error occurred. The request was not sent, - * and don't try it again - * - * send_request invokes the meat of RPC RDMA. It must do the following: - * - * 1. Marshal the RPC request into an RPC RDMA request, which means - * putting a header in front of data, and creating IOVs for RDMA - * from those in the request. - * 2. In marshaling, detect opportunities for RDMA, and use them. - * 3. Post a recv message to set up asynch completion, then send - * the request (rpcrdma_ep_post). - * 4. No partial sends are possible in the RPC-RDMA protocol (as in UDP). + * Returns: + * %0 if the RPC message has been sent + * %-ENOTCONN if the caller should reconnect and call again + * %-ENOBUFS if the caller should call again later + * %-EIO if a permanent error occurred and the request was not + * sent. Do not try to send this message again. */ static int xprt_rdma_send_request(struct rpc_task *task) @@ -724,14 +707,14 @@ xprt_rdma_send_request(struct rpc_task *task) struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); int rc = 0; +#if defined(CONFIG_SUNRPC_BACKCHANNEL) + if (unlikely(!rqst->rq_buffer)) + return xprt_rdma_bc_send_reply(rqst); +#endif /* CONFIG_SUNRPC_BACKCHANNEL */ + if (!xprt_connected(xprt)) goto drop_connection; - /* On retransmit, remove any previously registered chunks */ - if (unlikely(!list_empty(&req->rl_registered))) - r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, - &req->rl_registered); - rc = rpcrdma_marshal_req(r_xprt, rqst); if (rc < 0) goto failed_marshal; @@ -744,7 +727,7 @@ xprt_rdma_send_request(struct rpc_task *task) goto drop_connection; req->rl_connect_cookie = xprt->connect_cookie; - set_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags); + __set_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags); if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req)) goto drop_connection; @@ -904,8 +887,7 @@ int xprt_rdma_init(void) "\tMaxInlineRead %d\n\tMaxInlineWrite %d\n", xprt_rdma_slot_table_entries, xprt_rdma_max_inline_read, xprt_rdma_max_inline_write); - dprintk("\tPadding %d\n\tMemreg %d\n", - xprt_rdma_inline_write_padding, xprt_rdma_memreg_strategy); + dprintk("\tPadding 0\n\tMemreg %d\n", xprt_rdma_memreg_strategy); #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) if (!sunrpc_table_header) diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 8607c029c0dd..f4eb63e8e689 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -71,8 +71,8 @@ /* * internal functions */ -static void rpcrdma_create_mrs(struct rpcrdma_xprt *r_xprt); -static void rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf); +static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt); +static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf); static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb); struct workqueue_struct *rpcrdma_receive_wq __read_mostly; @@ -108,7 +108,10 @@ static void rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context) { struct rpcrdma_ep *ep = context; + struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt, + rx_ep); + trace_xprtrdma_qp_error(r_xprt, event); pr_err("rpcrdma: %s on device %s ep %p\n", ib_event_msg(event->event), event->device->name, context); @@ -133,6 +136,7 @@ rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) container_of(cqe, struct rpcrdma_sendctx, sc_cqe); /* WARNING: Only wr_cqe and status are reliable at this point */ + trace_xprtrdma_wc_send(sc, wc); if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR) pr_err("rpcrdma: Send: %s (%u/0x%x)\n", ib_wc_status_msg(wc->status), @@ -155,13 +159,11 @@ rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) rr_cqe); /* WARNING: Only wr_id and status are reliable at this point */ + trace_xprtrdma_wc_receive(rep, wc); if (wc->status != IB_WC_SUCCESS) goto out_fail; /* status == SUCCESS means all fields in wc are trustworthy */ - dprintk("RPC: %s: rep %p opcode 'recv', length %u: success\n", - __func__, rep, wc->byte_len); - rpcrdma_set_xdrlen(&rep->rr_hdrbuf, wc->byte_len); rep->rr_wc_flags = wc->wc_flags; rep->rr_inv_rkey = wc->ex.invalidate_rkey; @@ -192,7 +194,6 @@ rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt, unsigned int rsize, wsize; /* Default settings for RPC-over-RDMA Version One */ - r_xprt->rx_ia.ri_reminv_expected = false; r_xprt->rx_ia.ri_implicit_roundup = xprt_rdma_pad_optimize; rsize = RPCRDMA_V1_DEF_INLINE_SIZE; wsize = RPCRDMA_V1_DEF_INLINE_SIZE; @@ -200,7 +201,6 @@ rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt, if (pmsg && pmsg->cp_magic == rpcrdma_cmp_magic && pmsg->cp_version == RPCRDMA_CMP_VERSION) { - r_xprt->rx_ia.ri_reminv_expected = true; r_xprt->rx_ia.ri_implicit_roundup = true; rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size); wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size); @@ -221,11 +221,9 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) struct rpcrdma_xprt *xprt = id->context; struct rpcrdma_ia *ia = &xprt->rx_ia; struct rpcrdma_ep *ep = &xprt->rx_ep; -#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) - struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr; -#endif int connstate = 0; + trace_xprtrdma_conn_upcall(xprt, event); switch (event->event) { case RDMA_CM_EVENT_ADDR_RESOLVED: case RDMA_CM_EVENT_ROUTE_RESOLVED: @@ -234,21 +232,17 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) break; case RDMA_CM_EVENT_ADDR_ERROR: ia->ri_async_rc = -EHOSTUNREACH; - dprintk("RPC: %s: CM address resolution error, ep 0x%p\n", - __func__, ep); complete(&ia->ri_done); break; case RDMA_CM_EVENT_ROUTE_ERROR: ia->ri_async_rc = -ENETUNREACH; - dprintk("RPC: %s: CM route resolution error, ep 0x%p\n", - __func__, ep); complete(&ia->ri_done); break; case RDMA_CM_EVENT_DEVICE_REMOVAL: #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) - pr_info("rpcrdma: removing device %s for %pIS:%u\n", + pr_info("rpcrdma: removing device %s for %s:%s\n", ia->ri_device->name, - sap, rpc_get_port(sap)); + rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt)); #endif set_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags); ep->rep_connected = -ENODEV; @@ -271,8 +265,8 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) connstate = -ENETDOWN; goto connected; case RDMA_CM_EVENT_REJECTED: - dprintk("rpcrdma: connection to %pIS:%u rejected: %s\n", - sap, rpc_get_port(sap), + dprintk("rpcrdma: connection to %s:%s rejected: %s\n", + rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt), rdma_reject_msg(id, event->status)); connstate = -ECONNREFUSED; if (event->status == IB_CM_REJ_STALE_CONN) @@ -287,8 +281,9 @@ connected: wake_up_all(&ep->rep_connect_wait); /*FALLTHROUGH*/ default: - dprintk("RPC: %s: %pIS:%u on %s/%s (ep 0x%p): %s\n", - __func__, sap, rpc_get_port(sap), + dprintk("RPC: %s: %s:%s on %s/%s (ep 0x%p): %s\n", + __func__, + rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt), ia->ri_device->name, ia->ri_ops->ro_displayname, ep, rdma_event_msg(event->event)); break; @@ -298,13 +293,14 @@ connected: } static struct rdma_cm_id * -rpcrdma_create_id(struct rpcrdma_xprt *xprt, - struct rpcrdma_ia *ia, struct sockaddr *addr) +rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia) { unsigned long wtimeout = msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1; struct rdma_cm_id *id; int rc; + trace_xprtrdma_conn_start(xprt); + init_completion(&ia->ri_done); init_completion(&ia->ri_remove_done); @@ -318,7 +314,9 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt, } ia->ri_async_rc = -ETIMEDOUT; - rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT); + rc = rdma_resolve_addr(id, NULL, + (struct sockaddr *)&xprt->rx_xprt.addr, + RDMA_RESOLVE_TIMEOUT); if (rc) { dprintk("RPC: %s: rdma_resolve_addr() failed %i\n", __func__, rc); @@ -326,8 +324,7 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt, } rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout); if (rc < 0) { - dprintk("RPC: %s: wait() exited: %i\n", - __func__, rc); + trace_xprtrdma_conn_tout(xprt); goto out; } @@ -344,8 +341,7 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt, } rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout); if (rc < 0) { - dprintk("RPC: %s: wait() exited: %i\n", - __func__, rc); + trace_xprtrdma_conn_tout(xprt); goto out; } rc = ia->ri_async_rc; @@ -365,19 +361,18 @@ out: /** * rpcrdma_ia_open - Open and initialize an Interface Adapter. - * @xprt: controlling transport - * @addr: IP address of remote peer + * @xprt: transport with IA to (re)initialize * * Returns 0 on success, negative errno if an appropriate * Interface Adapter could not be found and opened. */ int -rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr) +rpcrdma_ia_open(struct rpcrdma_xprt *xprt) { struct rpcrdma_ia *ia = &xprt->rx_ia; int rc; - ia->ri_id = rpcrdma_create_id(xprt, ia, addr); + ia->ri_id = rpcrdma_create_id(xprt, ia); if (IS_ERR(ia->ri_id)) { rc = PTR_ERR(ia->ri_id); goto out_err; @@ -392,7 +387,7 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr) } switch (xprt_rdma_memreg_strategy) { - case RPCRDMA_FRMR: + case RPCRDMA_FRWR: if (frwr_is_supported(ia)) { ia->ri_ops = &rpcrdma_frwr_memreg_ops; break; @@ -462,10 +457,12 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia) rpcrdma_dma_unmap_regbuf(req->rl_sendbuf); rpcrdma_dma_unmap_regbuf(req->rl_recvbuf); } - rpcrdma_destroy_mrs(buf); + rpcrdma_mrs_destroy(buf); /* Allow waiters to continue */ complete(&ia->ri_remove_done); + + trace_xprtrdma_remove(r_xprt); } /** @@ -476,7 +473,6 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia) void rpcrdma_ia_close(struct rpcrdma_ia *ia) { - dprintk("RPC: %s: entering\n", __func__); if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) { if (ia->ri_id->qp) rdma_destroy_qp(ia->ri_id); @@ -630,9 +626,6 @@ out1: void rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) { - dprintk("RPC: %s: entering, connected is %d\n", - __func__, ep->rep_connected); - cancel_delayed_work_sync(&ep->rep_connect_worker); if (ia->ri_id->qp) { @@ -653,13 +646,12 @@ static int rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt, struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) { - struct sockaddr *sap = (struct sockaddr *)&r_xprt->rx_data.addr; int rc, err; - pr_info("%s: r_xprt = %p\n", __func__, r_xprt); + trace_xprtrdma_reinsert(r_xprt); rc = -EHOSTUNREACH; - if (rpcrdma_ia_open(r_xprt, sap)) + if (rpcrdma_ia_open(r_xprt)) goto out1; rc = -ENOMEM; @@ -676,7 +668,7 @@ rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt, goto out3; } - rpcrdma_create_mrs(r_xprt); + rpcrdma_mrs_create(r_xprt); return 0; out3: @@ -691,16 +683,15 @@ static int rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) { - struct sockaddr *sap = (struct sockaddr *)&r_xprt->rx_data.addr; struct rdma_cm_id *id, *old; int err, rc; - dprintk("RPC: %s: reconnecting...\n", __func__); + trace_xprtrdma_reconnect(r_xprt); rpcrdma_ep_disconnect(ep, ia); rc = -EHOSTUNREACH; - id = rpcrdma_create_id(r_xprt, ia, sap); + id = rpcrdma_create_id(r_xprt, ia); if (IS_ERR(id)) goto out; @@ -817,16 +808,14 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) int rc; rc = rdma_disconnect(ia->ri_id); - if (!rc) { + if (!rc) /* returns without wait if not connected */ wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 1); - dprintk("RPC: %s: after wait, %sconnected\n", __func__, - (ep->rep_connected == 1) ? "still " : "dis"); - } else { - dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc); + else ep->rep_connected = rc; - } + trace_xprtrdma_disconnect(container_of(ep, struct rpcrdma_xprt, + rx_ep), rc); ib_drain_qp(ia->ri_id->qp); } @@ -998,15 +987,15 @@ rpcrdma_mr_recovery_worker(struct work_struct *work) { struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, rb_recovery_worker.work); - struct rpcrdma_mw *mw; + struct rpcrdma_mr *mr; spin_lock(&buf->rb_recovery_lock); while (!list_empty(&buf->rb_stale_mrs)) { - mw = rpcrdma_pop_mw(&buf->rb_stale_mrs); + mr = rpcrdma_mr_pop(&buf->rb_stale_mrs); spin_unlock(&buf->rb_recovery_lock); - dprintk("RPC: %s: recovering MR %p\n", __func__, mw); - mw->mw_xprt->rx_ia.ri_ops->ro_recover_mr(mw); + trace_xprtrdma_recover_mr(mr); + mr->mr_xprt->rx_ia.ri_ops->ro_recover_mr(mr); spin_lock(&buf->rb_recovery_lock); } @@ -1014,20 +1003,20 @@ rpcrdma_mr_recovery_worker(struct work_struct *work) } void -rpcrdma_defer_mr_recovery(struct rpcrdma_mw *mw) +rpcrdma_mr_defer_recovery(struct rpcrdma_mr *mr) { - struct rpcrdma_xprt *r_xprt = mw->mw_xprt; + struct rpcrdma_xprt *r_xprt = mr->mr_xprt; struct rpcrdma_buffer *buf = &r_xprt->rx_buf; spin_lock(&buf->rb_recovery_lock); - rpcrdma_push_mw(mw, &buf->rb_stale_mrs); + rpcrdma_mr_push(mr, &buf->rb_stale_mrs); spin_unlock(&buf->rb_recovery_lock); schedule_delayed_work(&buf->rb_recovery_worker, 0); } static void -rpcrdma_create_mrs(struct rpcrdma_xprt *r_xprt) +rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt) { struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_ia *ia = &r_xprt->rx_ia; @@ -1036,32 +1025,32 @@ rpcrdma_create_mrs(struct rpcrdma_xprt *r_xprt) LIST_HEAD(all); for (count = 0; count < 32; count++) { - struct rpcrdma_mw *mw; + struct rpcrdma_mr *mr; int rc; - mw = kzalloc(sizeof(*mw), GFP_KERNEL); - if (!mw) + mr = kzalloc(sizeof(*mr), GFP_KERNEL); + if (!mr) break; - rc = ia->ri_ops->ro_init_mr(ia, mw); + rc = ia->ri_ops->ro_init_mr(ia, mr); if (rc) { - kfree(mw); + kfree(mr); break; } - mw->mw_xprt = r_xprt; + mr->mr_xprt = r_xprt; - list_add(&mw->mw_list, &free); - list_add(&mw->mw_all, &all); + list_add(&mr->mr_list, &free); + list_add(&mr->mr_all, &all); } - spin_lock(&buf->rb_mwlock); - list_splice(&free, &buf->rb_mws); + spin_lock(&buf->rb_mrlock); + list_splice(&free, &buf->rb_mrs); list_splice(&all, &buf->rb_all); r_xprt->rx_stats.mrs_allocated += count; - spin_unlock(&buf->rb_mwlock); + spin_unlock(&buf->rb_mrlock); - dprintk("RPC: %s: created %u MRs\n", __func__, count); + trace_xprtrdma_createmrs(r_xprt, count); } static void @@ -1072,7 +1061,7 @@ rpcrdma_mr_refresh_worker(struct work_struct *work) struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, rx_buf); - rpcrdma_create_mrs(r_xprt); + rpcrdma_mrs_create(r_xprt); } struct rpcrdma_req * @@ -1093,10 +1082,17 @@ rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) return req; } -struct rpcrdma_rep * +/** + * rpcrdma_create_rep - Allocate an rpcrdma_rep object + * @r_xprt: controlling transport + * + * Returns 0 on success or a negative errno on failure. + */ +int rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) { struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_rep *rep; int rc; @@ -1121,12 +1117,18 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) rep->rr_recv_wr.wr_cqe = &rep->rr_cqe; rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; rep->rr_recv_wr.num_sge = 1; - return rep; + + spin_lock(&buf->rb_lock); + list_add(&rep->rr_list, &buf->rb_recv_bufs); + spin_unlock(&buf->rb_lock); + return 0; out_free: kfree(rep); out: - return ERR_PTR(rc); + dprintk("RPC: %s: reply buffer %d alloc failed\n", + __func__, rc); + return rc; } int @@ -1137,10 +1139,10 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) buf->rb_max_requests = r_xprt->rx_data.max_requests; buf->rb_bc_srv_max_requests = 0; - spin_lock_init(&buf->rb_mwlock); + spin_lock_init(&buf->rb_mrlock); spin_lock_init(&buf->rb_lock); spin_lock_init(&buf->rb_recovery_lock); - INIT_LIST_HEAD(&buf->rb_mws); + INIT_LIST_HEAD(&buf->rb_mrs); INIT_LIST_HEAD(&buf->rb_all); INIT_LIST_HEAD(&buf->rb_stale_mrs); INIT_DELAYED_WORK(&buf->rb_refresh_worker, @@ -1148,7 +1150,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) INIT_DELAYED_WORK(&buf->rb_recovery_worker, rpcrdma_mr_recovery_worker); - rpcrdma_create_mrs(r_xprt); + rpcrdma_mrs_create(r_xprt); INIT_LIST_HEAD(&buf->rb_send_bufs); INIT_LIST_HEAD(&buf->rb_allreqs); @@ -1167,17 +1169,10 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) } INIT_LIST_HEAD(&buf->rb_recv_bufs); - for (i = 0; i < buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS; i++) { - struct rpcrdma_rep *rep; - - rep = rpcrdma_create_rep(r_xprt); - if (IS_ERR(rep)) { - dprintk("RPC: %s: reply buffer %d alloc failed\n", - __func__, i); - rc = PTR_ERR(rep); + for (i = 0; i <= buf->rb_max_requests; i++) { + rc = rpcrdma_create_rep(r_xprt); + if (rc) goto out; - } - list_add(&rep->rr_list, &buf->rb_recv_bufs); } rc = rpcrdma_sendctxs_create(r_xprt); @@ -1229,26 +1224,26 @@ rpcrdma_destroy_req(struct rpcrdma_req *req) } static void -rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf) +rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf) { struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, rx_buf); struct rpcrdma_ia *ia = rdmab_to_ia(buf); - struct rpcrdma_mw *mw; + struct rpcrdma_mr *mr; unsigned int count; count = 0; - spin_lock(&buf->rb_mwlock); + spin_lock(&buf->rb_mrlock); while (!list_empty(&buf->rb_all)) { - mw = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all); - list_del(&mw->mw_all); + mr = list_entry(buf->rb_all.next, struct rpcrdma_mr, mr_all); + list_del(&mr->mr_all); - spin_unlock(&buf->rb_mwlock); - ia->ri_ops->ro_release_mr(mw); + spin_unlock(&buf->rb_mrlock); + ia->ri_ops->ro_release_mr(mr); count++; - spin_lock(&buf->rb_mwlock); + spin_lock(&buf->rb_mrlock); } - spin_unlock(&buf->rb_mwlock); + spin_unlock(&buf->rb_mrlock); r_xprt->rx_stats.mrs_allocated = 0; dprintk("RPC: %s: released %u MRs\n", __func__, count); @@ -1285,27 +1280,33 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) spin_unlock(&buf->rb_reqslock); buf->rb_recv_count = 0; - rpcrdma_destroy_mrs(buf); + rpcrdma_mrs_destroy(buf); } -struct rpcrdma_mw * -rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt) +/** + * rpcrdma_mr_get - Allocate an rpcrdma_mr object + * @r_xprt: controlling transport + * + * Returns an initialized rpcrdma_mr or NULL if no free + * rpcrdma_mr objects are available. + */ +struct rpcrdma_mr * +rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt) { struct rpcrdma_buffer *buf = &r_xprt->rx_buf; - struct rpcrdma_mw *mw = NULL; + struct rpcrdma_mr *mr = NULL; - spin_lock(&buf->rb_mwlock); - if (!list_empty(&buf->rb_mws)) - mw = rpcrdma_pop_mw(&buf->rb_mws); - spin_unlock(&buf->rb_mwlock); + spin_lock(&buf->rb_mrlock); + if (!list_empty(&buf->rb_mrs)) + mr = rpcrdma_mr_pop(&buf->rb_mrs); + spin_unlock(&buf->rb_mrlock); - if (!mw) - goto out_nomws; - mw->mw_flags = 0; - return mw; + if (!mr) + goto out_nomrs; + return mr; -out_nomws: - dprintk("RPC: %s: no MWs available\n", __func__); +out_nomrs: + trace_xprtrdma_nomrs(r_xprt); if (r_xprt->rx_ep.rep_connected != -ENODEV) schedule_delayed_work(&buf->rb_refresh_worker, 0); @@ -1315,14 +1316,39 @@ out_nomws: return NULL; } +static void +__rpcrdma_mr_put(struct rpcrdma_buffer *buf, struct rpcrdma_mr *mr) +{ + spin_lock(&buf->rb_mrlock); + rpcrdma_mr_push(mr, &buf->rb_mrs); + spin_unlock(&buf->rb_mrlock); +} + +/** + * rpcrdma_mr_put - Release an rpcrdma_mr object + * @mr: object to release + * + */ void -rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw) +rpcrdma_mr_put(struct rpcrdma_mr *mr) { - struct rpcrdma_buffer *buf = &r_xprt->rx_buf; + __rpcrdma_mr_put(&mr->mr_xprt->rx_buf, mr); +} + +/** + * rpcrdma_mr_unmap_and_put - DMA unmap an MR and release it + * @mr: object to release + * + */ +void +rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr) +{ + struct rpcrdma_xprt *r_xprt = mr->mr_xprt; - spin_lock(&buf->rb_mwlock); - rpcrdma_push_mw(mw, &buf->rb_mws); - spin_unlock(&buf->rb_mwlock); + trace_xprtrdma_dma_unmap(mr); + ib_dma_unmap_sg(r_xprt->rx_ia.ri_device, + mr->mr_sg, mr->mr_nents, mr->mr_dir); + __rpcrdma_mr_put(&r_xprt->rx_buf, mr); } static struct rpcrdma_rep * @@ -1359,11 +1385,11 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) req = rpcrdma_buffer_get_req_locked(buffers); req->rl_reply = rpcrdma_buffer_get_rep(buffers); spin_unlock(&buffers->rb_lock); + return req; out_reqbuf: spin_unlock(&buffers->rb_lock); - pr_warn("RPC: %s: out of request buffers\n", __func__); return NULL; } @@ -1519,9 +1545,6 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia, req->rl_reply = NULL; } - dprintk("RPC: %s: posting %d s/g entries\n", - __func__, send_wr->num_sge); - if (!ep->rep_send_count || test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) { send_wr->send_flags |= IB_SEND_SIGNALED; @@ -1530,14 +1553,12 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia, send_wr->send_flags &= ~IB_SEND_SIGNALED; --ep->rep_send_count; } + rc = ib_post_send(ia->ri_id->qp, send_wr, &send_wr_fail); + trace_xprtrdma_post_send(req, rc); if (rc) - goto out_postsend_err; + return -ENOTCONN; return 0; - -out_postsend_err: - pr_err("rpcrdma: RDMA Send ib_post_send returned %i\n", rc); - return -ENOTCONN; } int @@ -1550,23 +1571,20 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia, if (!rpcrdma_dma_map_regbuf(ia, rep->rr_rdmabuf)) goto out_map; rc = ib_post_recv(ia->ri_id->qp, &rep->rr_recv_wr, &recv_wr_fail); + trace_xprtrdma_post_recv(rep, rc); if (rc) - goto out_postrecv; + return -ENOTCONN; return 0; out_map: pr_err("rpcrdma: failed to DMA map the Receive buffer\n"); return -EIO; - -out_postrecv: - pr_err("rpcrdma: ib_post_recv returned %i\n", rc); - return -ENOTCONN; } /** * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests * @r_xprt: transport associated with these backchannel resources - * @min_reqs: minimum number of incoming requests expected + * @count: minimum number of incoming requests expected * * Returns zero if all requested buffers were posted, or a negative errno. */ @@ -1594,7 +1612,7 @@ rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count) out_reqbuf: spin_unlock(&buffers->rb_lock); - pr_warn("%s: no extra receive buffers\n", __func__); + trace_xprtrdma_noreps(r_xprt); return -ENOMEM; out_rc: diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index 1342f743f1c4..69883a960a3f 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -73,11 +73,10 @@ struct rpcrdma_ia { struct completion ri_remove_done; int ri_async_rc; unsigned int ri_max_segs; - unsigned int ri_max_frmr_depth; + unsigned int ri_max_frwr_depth; unsigned int ri_max_inline_write; unsigned int ri_max_inline_read; unsigned int ri_max_send_sges; - bool ri_reminv_expected; bool ri_implicit_roundup; enum ib_mr_type ri_mrtype; unsigned long ri_flags; @@ -101,7 +100,6 @@ struct rpcrdma_ep { wait_queue_head_t rep_connect_wait; struct rpcrdma_connect_private rep_cm_private; struct rdma_conn_param rep_remote_cma; - struct sockaddr_storage rep_remote_addr; struct delayed_work rep_connect_worker; }; @@ -232,29 +230,29 @@ enum { }; /* - * struct rpcrdma_mw - external memory region metadata + * struct rpcrdma_mr - external memory region metadata * * An external memory region is any buffer or page that is registered * on the fly (ie, not pre-registered). * - * Each rpcrdma_buffer has a list of free MWs anchored in rb_mws. During + * Each rpcrdma_buffer has a list of free MWs anchored in rb_mrs. During * call_allocate, rpcrdma_buffer_get() assigns one to each segment in * an rpcrdma_req. Then rpcrdma_register_external() grabs these to keep * track of registration metadata while each RPC is pending. * rpcrdma_deregister_external() uses this metadata to unmap and * release these resources when an RPC is complete. */ -enum rpcrdma_frmr_state { - FRMR_IS_INVALID, /* ready to be used */ - FRMR_IS_VALID, /* in use */ - FRMR_FLUSHED_FR, /* flushed FASTREG WR */ - FRMR_FLUSHED_LI, /* flushed LOCALINV WR */ +enum rpcrdma_frwr_state { + FRWR_IS_INVALID, /* ready to be used */ + FRWR_IS_VALID, /* in use */ + FRWR_FLUSHED_FR, /* flushed FASTREG WR */ + FRWR_FLUSHED_LI, /* flushed LOCALINV WR */ }; -struct rpcrdma_frmr { +struct rpcrdma_frwr { struct ib_mr *fr_mr; struct ib_cqe fr_cqe; - enum rpcrdma_frmr_state fr_state; + enum rpcrdma_frwr_state fr_state; struct completion fr_linv_done; union { struct ib_reg_wr fr_regwr; @@ -267,26 +265,20 @@ struct rpcrdma_fmr { u64 *fm_physaddrs; }; -struct rpcrdma_mw { - struct list_head mw_list; - struct scatterlist *mw_sg; - int mw_nents; - enum dma_data_direction mw_dir; - unsigned long mw_flags; +struct rpcrdma_mr { + struct list_head mr_list; + struct scatterlist *mr_sg; + int mr_nents; + enum dma_data_direction mr_dir; union { struct rpcrdma_fmr fmr; - struct rpcrdma_frmr frmr; + struct rpcrdma_frwr frwr; }; - struct rpcrdma_xprt *mw_xprt; - u32 mw_handle; - u32 mw_length; - u64 mw_offset; - struct list_head mw_all; -}; - -/* mw_flags */ -enum { - RPCRDMA_MW_F_RI = 1, + struct rpcrdma_xprt *mr_xprt; + u32 mr_handle; + u32 mr_length; + u64 mr_offset; + struct list_head mr_all; }; /* @@ -362,8 +354,7 @@ struct rpcrdma_req { /* rl_flags */ enum { - RPCRDMA_REQ_F_BACKCHANNEL = 0, - RPCRDMA_REQ_F_PENDING, + RPCRDMA_REQ_F_PENDING = 0, RPCRDMA_REQ_F_TX_RESOURCES, }; @@ -374,25 +365,25 @@ rpcrdma_set_xprtdata(struct rpc_rqst *rqst, struct rpcrdma_req *req) } static inline struct rpcrdma_req * -rpcr_to_rdmar(struct rpc_rqst *rqst) +rpcr_to_rdmar(const struct rpc_rqst *rqst) { return rqst->rq_xprtdata; } static inline void -rpcrdma_push_mw(struct rpcrdma_mw *mw, struct list_head *list) +rpcrdma_mr_push(struct rpcrdma_mr *mr, struct list_head *list) { - list_add_tail(&mw->mw_list, list); + list_add_tail(&mr->mr_list, list); } -static inline struct rpcrdma_mw * -rpcrdma_pop_mw(struct list_head *list) +static inline struct rpcrdma_mr * +rpcrdma_mr_pop(struct list_head *list) { - struct rpcrdma_mw *mw; + struct rpcrdma_mr *mr; - mw = list_first_entry(list, struct rpcrdma_mw, mw_list); - list_del(&mw->mw_list); - return mw; + mr = list_first_entry(list, struct rpcrdma_mr, mr_list); + list_del(&mr->mr_list); + return mr; } /* @@ -402,8 +393,8 @@ rpcrdma_pop_mw(struct list_head *list) * One of these is associated with a transport instance */ struct rpcrdma_buffer { - spinlock_t rb_mwlock; /* protect rb_mws list */ - struct list_head rb_mws; + spinlock_t rb_mrlock; /* protect rb_mrs list */ + struct list_head rb_mrs; struct list_head rb_all; unsigned long rb_sc_head; @@ -438,13 +429,11 @@ struct rpcrdma_buffer { * This data should be set with mount options */ struct rpcrdma_create_data_internal { - struct sockaddr_storage addr; /* RDMA server address */ unsigned int max_requests; /* max requests (slots) in flight */ unsigned int rsize; /* mount rsize - max read hdr+data */ unsigned int wsize; /* mount wsize - max write hdr+data */ unsigned int inline_rsize; /* max non-rdma read data payload */ unsigned int inline_wsize; /* max non-rdma write data payload */ - unsigned int padding; /* non-rdma write header padding */ }; /* @@ -484,17 +473,19 @@ struct rpcrdma_memreg_ops { struct rpcrdma_mr_seg * (*ro_map)(struct rpcrdma_xprt *, struct rpcrdma_mr_seg *, int, bool, - struct rpcrdma_mw **); + struct rpcrdma_mr **); + void (*ro_reminv)(struct rpcrdma_rep *rep, + struct list_head *mrs); void (*ro_unmap_sync)(struct rpcrdma_xprt *, struct list_head *); - void (*ro_recover_mr)(struct rpcrdma_mw *); + void (*ro_recover_mr)(struct rpcrdma_mr *mr); int (*ro_open)(struct rpcrdma_ia *, struct rpcrdma_ep *, struct rpcrdma_create_data_internal *); size_t (*ro_maxpages)(struct rpcrdma_xprt *); int (*ro_init_mr)(struct rpcrdma_ia *, - struct rpcrdma_mw *); - void (*ro_release_mr)(struct rpcrdma_mw *); + struct rpcrdma_mr *); + void (*ro_release_mr)(struct rpcrdma_mr *mr); const char *ro_displayname; const int ro_send_w_inv_ok; }; @@ -525,6 +516,18 @@ struct rpcrdma_xprt { #define rpcx_to_rdmax(x) container_of(x, struct rpcrdma_xprt, rx_xprt) #define rpcx_to_rdmad(x) (rpcx_to_rdmax(x)->rx_data) +static inline const char * +rpcrdma_addrstr(const struct rpcrdma_xprt *r_xprt) +{ + return r_xprt->rx_xprt.address_strings[RPC_DISPLAY_ADDR]; +} + +static inline const char * +rpcrdma_portstr(const struct rpcrdma_xprt *r_xprt) +{ + return r_xprt->rx_xprt.address_strings[RPC_DISPLAY_PORT]; +} + /* Setting this to 0 ensures interoperability with early servers. * Setting this to 1 enhances certain unaligned read/write performance. * Default is 0, see sysctl entry and rpc_rdma.c rpcrdma_convert_iovs() */ @@ -538,7 +541,7 @@ extern unsigned int xprt_rdma_memreg_strategy; /* * Interface Adapter calls - xprtrdma/verbs.c */ -int rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr); +int rpcrdma_ia_open(struct rpcrdma_xprt *xprt); void rpcrdma_ia_remove(struct rpcrdma_ia *ia); void rpcrdma_ia_close(struct rpcrdma_ia *); bool frwr_is_supported(struct rpcrdma_ia *); @@ -564,22 +567,23 @@ int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_rep *); * Buffer calls - xprtrdma/verbs.c */ struct rpcrdma_req *rpcrdma_create_req(struct rpcrdma_xprt *); -struct rpcrdma_rep *rpcrdma_create_rep(struct rpcrdma_xprt *); void rpcrdma_destroy_req(struct rpcrdma_req *); +int rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt); int rpcrdma_buffer_create(struct rpcrdma_xprt *); void rpcrdma_buffer_destroy(struct rpcrdma_buffer *); struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf); void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc); -struct rpcrdma_mw *rpcrdma_get_mw(struct rpcrdma_xprt *); -void rpcrdma_put_mw(struct rpcrdma_xprt *, struct rpcrdma_mw *); +struct rpcrdma_mr *rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt); +void rpcrdma_mr_put(struct rpcrdma_mr *mr); +void rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr); +void rpcrdma_mr_defer_recovery(struct rpcrdma_mr *mr); + struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *); void rpcrdma_buffer_put(struct rpcrdma_req *); void rpcrdma_recv_buffer_get(struct rpcrdma_req *); void rpcrdma_recv_buffer_put(struct rpcrdma_rep *); -void rpcrdma_defer_mr_recovery(struct rpcrdma_mw *); - struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(size_t, enum dma_data_direction, gfp_t); bool __rpcrdma_dma_map_regbuf(struct rpcrdma_ia *, struct rpcrdma_regbuf *); @@ -663,7 +667,7 @@ int xprt_rdma_bc_up(struct svc_serv *, struct net *); size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *); int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int); void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *); -int rpcrdma_bc_marshal_reply(struct rpc_rqst *); +int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst); void xprt_rdma_bc_free_rqst(struct rpc_rqst *); void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int); #endif /* CONFIG_SUNRPC_BACKCHANNEL */ @@ -671,3 +675,5 @@ void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int); extern struct xprt_class xprt_rdma_bc; #endif /* _LINUX_SUNRPC_XPRT_RDMA_H */ + +#include <trace/events/rpcrdma.h> diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 6d0cc3b8f932..18803021f242 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -52,6 +52,8 @@ #include "sunrpc.h" +#define RPC_TCP_READ_CHUNK_SZ (3*512*1024) + static void xs_close(struct rpc_xprt *xprt); static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt, struct socket *sock); @@ -1003,6 +1005,7 @@ static void xs_local_data_receive(struct sock_xprt *transport) struct sock *sk; int err; +restart: mutex_lock(&transport->recv_mutex); sk = transport->inet; if (sk == NULL) @@ -1016,6 +1019,11 @@ static void xs_local_data_receive(struct sock_xprt *transport) } if (!test_and_clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) break; + if (need_resched()) { + mutex_unlock(&transport->recv_mutex); + cond_resched(); + goto restart; + } } out: mutex_unlock(&transport->recv_mutex); @@ -1094,6 +1102,7 @@ static void xs_udp_data_receive(struct sock_xprt *transport) struct sock *sk; int err; +restart: mutex_lock(&transport->recv_mutex); sk = transport->inet; if (sk == NULL) @@ -1107,6 +1116,11 @@ static void xs_udp_data_receive(struct sock_xprt *transport) } if (!test_and_clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) break; + if (need_resched()) { + mutex_unlock(&transport->recv_mutex); + cond_resched(); + goto restart; + } } out: mutex_unlock(&transport->recv_mutex); @@ -1479,6 +1493,7 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns .offset = offset, .count = len, }; + size_t ret; dprintk("RPC: xs_tcp_data_recv started\n"); do { @@ -1507,9 +1522,14 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns /* Skip over any trailing bytes on short reads */ xs_tcp_read_discard(transport, &desc); } while (desc.count); + ret = len - desc.count; + if (ret < rd_desc->count) + rd_desc->count -= ret; + else + rd_desc->count = 0; trace_xs_tcp_data_recv(transport); dprintk("RPC: xs_tcp_data_recv done\n"); - return len - desc.count; + return ret; } static void xs_tcp_data_receive(struct sock_xprt *transport) @@ -1517,30 +1537,34 @@ static void xs_tcp_data_receive(struct sock_xprt *transport) struct rpc_xprt *xprt = &transport->xprt; struct sock *sk; read_descriptor_t rd_desc = { - .count = 2*1024*1024, .arg.data = xprt, }; unsigned long total = 0; - int loop; int read = 0; +restart: mutex_lock(&transport->recv_mutex); sk = transport->inet; if (sk == NULL) goto out; /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */ - for (loop = 0; loop < 64; loop++) { + for (;;) { + rd_desc.count = RPC_TCP_READ_CHUNK_SZ; lock_sock(sk); read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv); - if (read <= 0) { + if (rd_desc.count != 0 || read < 0) { clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state); release_sock(sk); break; } release_sock(sk); total += read; - rd_desc.count = 65536; + if (need_resched()) { + mutex_unlock(&transport->recv_mutex); + cond_resched(); + goto restart; + } } if (test_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) queue_work(xprtiod_workqueue, &transport->recv_worker); |