diff options
Diffstat (limited to 'net/sunrpc/xprt.c')
-rw-r--r-- | net/sunrpc/xprt.c | 908 |
1 files changed, 647 insertions, 261 deletions
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index a8db2e3f8904..86bea4520c4d 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -68,8 +68,6 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net); static __be32 xprt_alloc_xid(struct rpc_xprt *xprt); static void xprt_connect_status(struct rpc_task *task); -static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *); -static void __xprt_put_cong(struct rpc_xprt *, struct rpc_rqst *); static void xprt_destroy(struct rpc_xprt *xprt); static DEFINE_SPINLOCK(xprt_list_lock); @@ -171,6 +169,17 @@ out: } EXPORT_SYMBOL_GPL(xprt_load_transport); +static void xprt_clear_locked(struct rpc_xprt *xprt) +{ + xprt->snd_task = NULL; + if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) { + smp_mb__before_atomic(); + clear_bit(XPRT_LOCKED, &xprt->state); + smp_mb__after_atomic(); + } else + queue_work(xprtiod_workqueue, &xprt->task_cleanup); +} + /** * xprt_reserve_xprt - serialize write access to transports * @task: task that is requesting access to the transport @@ -183,44 +192,53 @@ EXPORT_SYMBOL_GPL(xprt_load_transport); int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task) { struct rpc_rqst *req = task->tk_rqstp; - int priority; if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { if (task == xprt->snd_task) return 1; goto out_sleep; } + if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) + goto out_unlock; xprt->snd_task = task; - if (req != NULL) - req->rq_ntrans++; return 1; +out_unlock: + xprt_clear_locked(xprt); out_sleep: dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt); - task->tk_timeout = 0; + task->tk_timeout = RPC_IS_SOFT(task) ? req->rq_timeout : 0; task->tk_status = -EAGAIN; - if (req == NULL) - priority = RPC_PRIORITY_LOW; - else if (!req->rq_ntrans) - priority = RPC_PRIORITY_NORMAL; - else - priority = RPC_PRIORITY_HIGH; - rpc_sleep_on_priority(&xprt->sending, task, NULL, priority); + rpc_sleep_on(&xprt->sending, task, NULL); return 0; } EXPORT_SYMBOL_GPL(xprt_reserve_xprt); -static void xprt_clear_locked(struct rpc_xprt *xprt) +static bool +xprt_need_congestion_window_wait(struct rpc_xprt *xprt) { - xprt->snd_task = NULL; - if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) { - smp_mb__before_atomic(); - clear_bit(XPRT_LOCKED, &xprt->state); - smp_mb__after_atomic(); - } else - queue_work(xprtiod_workqueue, &xprt->task_cleanup); + return test_bit(XPRT_CWND_WAIT, &xprt->state); +} + +static void +xprt_set_congestion_window_wait(struct rpc_xprt *xprt) +{ + if (!list_empty(&xprt->xmit_queue)) { + /* Peek at head of queue to see if it can make progress */ + if (list_first_entry(&xprt->xmit_queue, struct rpc_rqst, + rq_xmit)->rq_cong) + return; + } + set_bit(XPRT_CWND_WAIT, &xprt->state); +} + +static void +xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt) +{ + if (!RPCXPRT_CONGESTED(xprt)) + clear_bit(XPRT_CWND_WAIT, &xprt->state); } /* @@ -230,11 +248,11 @@ static void xprt_clear_locked(struct rpc_xprt *xprt) * Same as xprt_reserve_xprt, but Van Jacobson congestion control is * integrated into the decision of whether a request is allowed to be * woken up and given access to the transport. + * Note that the lock is only granted if we know there are free slots. */ int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) { struct rpc_rqst *req = task->tk_rqstp; - int priority; if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { if (task == xprt->snd_task) @@ -245,25 +263,19 @@ int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) xprt->snd_task = task; return 1; } - if (__xprt_get_cong(xprt, task)) { + if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) + goto out_unlock; + if (!xprt_need_congestion_window_wait(xprt)) { xprt->snd_task = task; - req->rq_ntrans++; return 1; } +out_unlock: xprt_clear_locked(xprt); out_sleep: - if (req) - __xprt_put_cong(xprt, req); dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt); - task->tk_timeout = 0; + task->tk_timeout = RPC_IS_SOFT(task) ? req->rq_timeout : 0; task->tk_status = -EAGAIN; - if (req == NULL) - priority = RPC_PRIORITY_LOW; - else if (!req->rq_ntrans) - priority = RPC_PRIORITY_NORMAL; - else - priority = RPC_PRIORITY_HIGH; - rpc_sleep_on_priority(&xprt->sending, task, NULL, priority); + rpc_sleep_on(&xprt->sending, task, NULL); return 0; } EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong); @@ -272,6 +284,8 @@ static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) { int retval; + if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task) + return 1; spin_lock_bh(&xprt->transport_lock); retval = xprt->ops->reserve_xprt(xprt, task); spin_unlock_bh(&xprt->transport_lock); @@ -281,12 +295,8 @@ static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) static bool __xprt_lock_write_func(struct rpc_task *task, void *data) { struct rpc_xprt *xprt = data; - struct rpc_rqst *req; - req = task->tk_rqstp; xprt->snd_task = task; - if (req) - req->rq_ntrans++; return true; } @@ -294,53 +304,30 @@ static void __xprt_lock_write_next(struct rpc_xprt *xprt) { if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) return; - + if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) + goto out_unlock; if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, __xprt_lock_write_func, xprt)) return; +out_unlock: xprt_clear_locked(xprt); } -static bool __xprt_lock_write_cong_func(struct rpc_task *task, void *data) -{ - struct rpc_xprt *xprt = data; - struct rpc_rqst *req; - - req = task->tk_rqstp; - if (req == NULL) { - xprt->snd_task = task; - return true; - } - if (__xprt_get_cong(xprt, task)) { - xprt->snd_task = task; - req->rq_ntrans++; - return true; - } - return false; -} - static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) { if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) return; - if (RPCXPRT_CONGESTED(xprt)) + if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) + goto out_unlock; + if (xprt_need_congestion_window_wait(xprt)) goto out_unlock; if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, - __xprt_lock_write_cong_func, xprt)) + __xprt_lock_write_func, xprt)) return; out_unlock: xprt_clear_locked(xprt); } -static void xprt_task_clear_bytes_sent(struct rpc_task *task) -{ - if (task != NULL) { - struct rpc_rqst *req = task->tk_rqstp; - if (req != NULL) - req->rq_bytes_sent = 0; - } -} - /** * xprt_release_xprt - allow other requests to use a transport * @xprt: transport with other tasks potentially waiting @@ -351,7 +338,6 @@ static void xprt_task_clear_bytes_sent(struct rpc_task *task) void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) { if (xprt->snd_task == task) { - xprt_task_clear_bytes_sent(task); xprt_clear_locked(xprt); __xprt_lock_write_next(xprt); } @@ -369,7 +355,6 @@ EXPORT_SYMBOL_GPL(xprt_release_xprt); void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) { if (xprt->snd_task == task) { - xprt_task_clear_bytes_sent(task); xprt_clear_locked(xprt); __xprt_lock_write_next_cong(xprt); } @@ -378,6 +363,8 @@ EXPORT_SYMBOL_GPL(xprt_release_xprt_cong); static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) { + if (xprt->snd_task != task) + return; spin_lock_bh(&xprt->transport_lock); xprt->ops->release_xprt(xprt, task); spin_unlock_bh(&xprt->transport_lock); @@ -388,16 +375,16 @@ static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *ta * overflowed. Put the task to sleep if this is the case. */ static int -__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task) +__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) { - struct rpc_rqst *req = task->tk_rqstp; - if (req->rq_cong) return 1; dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n", - task->tk_pid, xprt->cong, xprt->cwnd); - if (RPCXPRT_CONGESTED(xprt)) + req->rq_task->tk_pid, xprt->cong, xprt->cwnd); + if (RPCXPRT_CONGESTED(xprt)) { + xprt_set_congestion_window_wait(xprt); return 0; + } req->rq_cong = 1; xprt->cong += RPC_CWNDSCALE; return 1; @@ -414,10 +401,32 @@ __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) return; req->rq_cong = 0; xprt->cong -= RPC_CWNDSCALE; + xprt_test_and_clear_congestion_window_wait(xprt); __xprt_lock_write_next_cong(xprt); } /** + * xprt_request_get_cong - Request congestion control credits + * @xprt: pointer to transport + * @req: pointer to RPC request + * + * Useful for transports that require congestion control. + */ +bool +xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) +{ + bool ret = false; + + if (req->rq_cong) + return true; + spin_lock_bh(&xprt->transport_lock); + ret = __xprt_get_cong(xprt, req) != 0; + spin_unlock_bh(&xprt->transport_lock); + return ret; +} +EXPORT_SYMBOL_GPL(xprt_request_get_cong); + +/** * xprt_release_rqst_cong - housekeeping when request is complete * @task: RPC request that recently completed * @@ -431,6 +440,20 @@ void xprt_release_rqst_cong(struct rpc_task *task) } EXPORT_SYMBOL_GPL(xprt_release_rqst_cong); +/* + * Clear the congestion window wait flag and wake up the next + * entry on xprt->sending + */ +static void +xprt_clear_congestion_window_wait(struct rpc_xprt *xprt) +{ + if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) { + spin_lock_bh(&xprt->transport_lock); + __xprt_lock_write_next_cong(xprt); + spin_unlock_bh(&xprt->transport_lock); + } +} + /** * xprt_adjust_cwnd - adjust transport congestion window * @xprt: pointer to xprt @@ -488,39 +511,46 @@ EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks); /** * xprt_wait_for_buffer_space - wait for transport output buffer to clear - * @task: task to be put to sleep - * @action: function pointer to be executed after wait + * @xprt: transport * * Note that we only set the timer for the case of RPC_IS_SOFT(), since * we don't in general want to force a socket disconnection due to * an incomplete RPC call transmission. */ -void xprt_wait_for_buffer_space(struct rpc_task *task, rpc_action action) +void xprt_wait_for_buffer_space(struct rpc_xprt *xprt) { - struct rpc_rqst *req = task->tk_rqstp; - struct rpc_xprt *xprt = req->rq_xprt; - - task->tk_timeout = RPC_IS_SOFT(task) ? req->rq_timeout : 0; - rpc_sleep_on(&xprt->pending, task, action); + set_bit(XPRT_WRITE_SPACE, &xprt->state); } EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space); +static bool +xprt_clear_write_space_locked(struct rpc_xprt *xprt) +{ + if (test_and_clear_bit(XPRT_WRITE_SPACE, &xprt->state)) { + __xprt_lock_write_next(xprt); + dprintk("RPC: write space: waking waiting task on " + "xprt %p\n", xprt); + return true; + } + return false; +} + /** * xprt_write_space - wake the task waiting for transport output buffer space * @xprt: transport with waiting tasks * * Can be called in a soft IRQ context, so xprt_write_space never sleeps. */ -void xprt_write_space(struct rpc_xprt *xprt) +bool xprt_write_space(struct rpc_xprt *xprt) { + bool ret; + + if (!test_bit(XPRT_WRITE_SPACE, &xprt->state)) + return false; spin_lock_bh(&xprt->transport_lock); - if (xprt->snd_task) { - dprintk("RPC: write space: waking waiting task on " - "xprt %p\n", xprt); - rpc_wake_up_queued_task_on_wq(xprtiod_workqueue, - &xprt->pending, xprt->snd_task); - } + ret = xprt_clear_write_space_locked(xprt); spin_unlock_bh(&xprt->transport_lock); + return ret; } EXPORT_SYMBOL_GPL(xprt_write_space); @@ -631,6 +661,7 @@ void xprt_disconnect_done(struct rpc_xprt *xprt) dprintk("RPC: disconnected transport %p\n", xprt); spin_lock_bh(&xprt->transport_lock); xprt_clear_connected(xprt); + xprt_clear_write_space_locked(xprt); xprt_wake_pending_tasks(xprt, -EAGAIN); spin_unlock_bh(&xprt->transport_lock); } @@ -654,6 +685,22 @@ void xprt_force_disconnect(struct rpc_xprt *xprt) } EXPORT_SYMBOL_GPL(xprt_force_disconnect); +static unsigned int +xprt_connect_cookie(struct rpc_xprt *xprt) +{ + return READ_ONCE(xprt->connect_cookie); +} + +static bool +xprt_request_retransmit_after_disconnect(struct rpc_task *task) +{ + struct rpc_rqst *req = task->tk_rqstp; + struct rpc_xprt *xprt = req->rq_xprt; + + return req->rq_connect_cookie != xprt_connect_cookie(xprt) || + !xprt_connected(xprt); +} + /** * xprt_conditional_disconnect - force a transport to disconnect * @xprt: transport to disconnect @@ -692,7 +739,7 @@ static void xprt_schedule_autodisconnect(struct rpc_xprt *xprt) __must_hold(&xprt->transport_lock) { - if (list_empty(&xprt->recv) && xprt_has_timer(xprt)) + if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt)) mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout); } @@ -702,7 +749,7 @@ xprt_init_autodisconnect(struct timer_list *t) struct rpc_xprt *xprt = from_timer(xprt, t, timer); spin_lock(&xprt->transport_lock); - if (!list_empty(&xprt->recv)) + if (!RB_EMPTY_ROOT(&xprt->recv_queue)) goto out_abort; /* Reset xprt->last_used to avoid connect/autodisconnect cycling */ xprt->last_used = jiffies; @@ -726,7 +773,6 @@ bool xprt_lock_connect(struct rpc_xprt *xprt, goto out; if (xprt->snd_task != task) goto out; - xprt_task_clear_bytes_sent(task); xprt->snd_task = cookie; ret = true; out: @@ -772,7 +818,6 @@ void xprt_connect(struct rpc_task *task) xprt->ops->close(xprt); if (!xprt_connected(xprt)) { - task->tk_rqstp->rq_bytes_sent = 0; task->tk_timeout = task->tk_rqstp->rq_timeout; task->tk_rqstp->rq_connect_cookie = xprt->connect_cookie; rpc_sleep_on(&xprt->pending, task, xprt_connect_status); @@ -789,17 +834,11 @@ void xprt_connect(struct rpc_task *task) static void xprt_connect_status(struct rpc_task *task) { - struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; - - if (task->tk_status == 0) { - xprt->stat.connect_count++; - xprt->stat.connect_time += (long)jiffies - xprt->stat.connect_start; + switch (task->tk_status) { + case 0: dprintk("RPC: %5u xprt_connect_status: connection established\n", task->tk_pid); - return; - } - - switch (task->tk_status) { + break; case -ECONNREFUSED: case -ECONNRESET: case -ECONNABORTED: @@ -816,28 +855,97 @@ static void xprt_connect_status(struct rpc_task *task) default: dprintk("RPC: %5u xprt_connect_status: error %d connecting to " "server %s\n", task->tk_pid, -task->tk_status, - xprt->servername); + task->tk_rqstp->rq_xprt->servername); task->tk_status = -EIO; } } +enum xprt_xid_rb_cmp { + XID_RB_EQUAL, + XID_RB_LEFT, + XID_RB_RIGHT, +}; +static enum xprt_xid_rb_cmp +xprt_xid_cmp(__be32 xid1, __be32 xid2) +{ + if (xid1 == xid2) + return XID_RB_EQUAL; + if ((__force u32)xid1 < (__force u32)xid2) + return XID_RB_LEFT; + return XID_RB_RIGHT; +} + +static struct rpc_rqst * +xprt_request_rb_find(struct rpc_xprt *xprt, __be32 xid) +{ + struct rb_node *n = xprt->recv_queue.rb_node; + struct rpc_rqst *req; + + while (n != NULL) { + req = rb_entry(n, struct rpc_rqst, rq_recv); + switch (xprt_xid_cmp(xid, req->rq_xid)) { + case XID_RB_LEFT: + n = n->rb_left; + break; + case XID_RB_RIGHT: + n = n->rb_right; + break; + case XID_RB_EQUAL: + return req; + } + } + return NULL; +} + +static void +xprt_request_rb_insert(struct rpc_xprt *xprt, struct rpc_rqst *new) +{ + struct rb_node **p = &xprt->recv_queue.rb_node; + struct rb_node *n = NULL; + struct rpc_rqst *req; + + while (*p != NULL) { + n = *p; + req = rb_entry(n, struct rpc_rqst, rq_recv); + switch(xprt_xid_cmp(new->rq_xid, req->rq_xid)) { + case XID_RB_LEFT: + p = &n->rb_left; + break; + case XID_RB_RIGHT: + p = &n->rb_right; + break; + case XID_RB_EQUAL: + WARN_ON_ONCE(new != req); + return; + } + } + rb_link_node(&new->rq_recv, n, p); + rb_insert_color(&new->rq_recv, &xprt->recv_queue); +} + +static void +xprt_request_rb_remove(struct rpc_xprt *xprt, struct rpc_rqst *req) +{ + rb_erase(&req->rq_recv, &xprt->recv_queue); +} + /** * xprt_lookup_rqst - find an RPC request corresponding to an XID * @xprt: transport on which the original request was transmitted * @xid: RPC XID of incoming reply * - * Caller holds xprt->recv_lock. + * Caller holds xprt->queue_lock. */ struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) { struct rpc_rqst *entry; - list_for_each_entry(entry, &xprt->recv, rq_list) - if (entry->rq_xid == xid) { - trace_xprt_lookup_rqst(xprt, xid, 0); - entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime); - return entry; - } + entry = xprt_request_rb_find(xprt, xid); + if (entry != NULL) { + trace_xprt_lookup_rqst(xprt, xid, 0); + entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime); + return entry; + } dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n", ntohl(xid)); @@ -847,16 +955,22 @@ struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) } EXPORT_SYMBOL_GPL(xprt_lookup_rqst); +static bool +xprt_is_pinned_rqst(struct rpc_rqst *req) +{ + return atomic_read(&req->rq_pin) != 0; +} + /** * xprt_pin_rqst - Pin a request on the transport receive list * @req: Request to pin * * Caller must ensure this is atomic with the call to xprt_lookup_rqst() - * so should be holding the xprt transport lock. + * so should be holding the xprt receive lock. */ void xprt_pin_rqst(struct rpc_rqst *req) { - set_bit(RPC_TASK_MSG_RECV, &req->rq_task->tk_runstate); + atomic_inc(&req->rq_pin); } EXPORT_SYMBOL_GPL(xprt_pin_rqst); @@ -864,38 +978,87 @@ EXPORT_SYMBOL_GPL(xprt_pin_rqst); * xprt_unpin_rqst - Unpin a request on the transport receive list * @req: Request to pin * - * Caller should be holding the xprt transport lock. + * Caller should be holding the xprt receive lock. */ void xprt_unpin_rqst(struct rpc_rqst *req) { - struct rpc_task *task = req->rq_task; - - clear_bit(RPC_TASK_MSG_RECV, &task->tk_runstate); - if (test_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate)) - wake_up_bit(&task->tk_runstate, RPC_TASK_MSG_RECV); + if (!test_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate)) { + atomic_dec(&req->rq_pin); + return; + } + if (atomic_dec_and_test(&req->rq_pin)) + wake_up_var(&req->rq_pin); } EXPORT_SYMBOL_GPL(xprt_unpin_rqst); static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req) -__must_hold(&req->rq_xprt->recv_lock) { - struct rpc_task *task = req->rq_task; + wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req)); +} - if (task && test_bit(RPC_TASK_MSG_RECV, &task->tk_runstate)) { - spin_unlock(&req->rq_xprt->recv_lock); - set_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate); - wait_on_bit(&task->tk_runstate, RPC_TASK_MSG_RECV, - TASK_UNINTERRUPTIBLE); - clear_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate); - spin_lock(&req->rq_xprt->recv_lock); - } +static bool +xprt_request_data_received(struct rpc_task *task) +{ + return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && + READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) != 0; +} + +static bool +xprt_request_need_enqueue_receive(struct rpc_task *task, struct rpc_rqst *req) +{ + return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && + READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) == 0; +} + +/** + * xprt_request_enqueue_receive - Add an request to the receive queue + * @task: RPC task + * + */ +void +xprt_request_enqueue_receive(struct rpc_task *task) +{ + struct rpc_rqst *req = task->tk_rqstp; + struct rpc_xprt *xprt = req->rq_xprt; + + if (!xprt_request_need_enqueue_receive(task, req)) + return; + spin_lock(&xprt->queue_lock); + + /* Update the softirq receive buffer */ + memcpy(&req->rq_private_buf, &req->rq_rcv_buf, + sizeof(req->rq_private_buf)); + + /* Add request to the receive list */ + xprt_request_rb_insert(xprt, req); + set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate); + spin_unlock(&xprt->queue_lock); + + xprt_reset_majortimeo(req); + /* Turn off autodisconnect */ + del_singleshot_timer_sync(&xprt->timer); +} + +/** + * xprt_request_dequeue_receive_locked - Remove a request from the receive queue + * @task: RPC task + * + * Caller must hold xprt->queue_lock. + */ +static void +xprt_request_dequeue_receive_locked(struct rpc_task *task) +{ + struct rpc_rqst *req = task->tk_rqstp; + + if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) + xprt_request_rb_remove(req->rq_xprt, req); } /** * xprt_update_rtt - Update RPC RTT statistics * @task: RPC request that recently completed * - * Caller holds xprt->recv_lock. + * Caller holds xprt->queue_lock. */ void xprt_update_rtt(struct rpc_task *task) { @@ -917,7 +1080,7 @@ EXPORT_SYMBOL_GPL(xprt_update_rtt); * @task: RPC request that recently completed * @copied: actual number of bytes received from the transport * - * Caller holds xprt->recv_lock. + * Caller holds xprt->queue_lock. */ void xprt_complete_rqst(struct rpc_task *task, int copied) { @@ -930,12 +1093,12 @@ void xprt_complete_rqst(struct rpc_task *task, int copied) xprt->stat.recvs++; - list_del_init(&req->rq_list); req->rq_private_buf.len = copied; /* Ensure all writes are done before we update */ /* req->rq_reply_bytes_recvd */ smp_wmb(); req->rq_reply_bytes_recvd = copied; + xprt_request_dequeue_receive_locked(task); rpc_wake_up_queued_task(&xprt->pending, task); } EXPORT_SYMBOL_GPL(xprt_complete_rqst); @@ -957,6 +1120,172 @@ static void xprt_timer(struct rpc_task *task) } /** + * xprt_request_wait_receive - wait for the reply to an RPC request + * @task: RPC task about to send a request + * + */ +void xprt_request_wait_receive(struct rpc_task *task) +{ + struct rpc_rqst *req = task->tk_rqstp; + struct rpc_xprt *xprt = req->rq_xprt; + + if (!test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) + return; + /* + * Sleep on the pending queue if we're expecting a reply. + * The spinlock ensures atomicity between the test of + * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on(). + */ + spin_lock(&xprt->queue_lock); + if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) { + xprt->ops->set_retrans_timeout(task); + rpc_sleep_on(&xprt->pending, task, xprt_timer); + /* + * Send an extra queue wakeup call if the + * connection was dropped in case the call to + * rpc_sleep_on() raced. + */ + if (xprt_request_retransmit_after_disconnect(task)) + rpc_wake_up_queued_task_set_status(&xprt->pending, + task, -ENOTCONN); + } + spin_unlock(&xprt->queue_lock); +} + +static bool +xprt_request_need_enqueue_transmit(struct rpc_task *task, struct rpc_rqst *req) +{ + return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); +} + +/** + * xprt_request_enqueue_transmit - queue a task for transmission + * @task: pointer to rpc_task + * + * Add a task to the transmission queue. + */ +void +xprt_request_enqueue_transmit(struct rpc_task *task) +{ + struct rpc_rqst *pos, *req = task->tk_rqstp; + struct rpc_xprt *xprt = req->rq_xprt; + + if (xprt_request_need_enqueue_transmit(task, req)) { + spin_lock(&xprt->queue_lock); + /* + * Requests that carry congestion control credits are added + * to the head of the list to avoid starvation issues. + */ + if (req->rq_cong) { + xprt_clear_congestion_window_wait(xprt); + list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { + if (pos->rq_cong) + continue; + /* Note: req is added _before_ pos */ + list_add_tail(&req->rq_xmit, &pos->rq_xmit); + INIT_LIST_HEAD(&req->rq_xmit2); + goto out; + } + } else if (RPC_IS_SWAPPER(task)) { + list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { + if (pos->rq_cong || pos->rq_bytes_sent) + continue; + if (RPC_IS_SWAPPER(pos->rq_task)) + continue; + /* Note: req is added _before_ pos */ + list_add_tail(&req->rq_xmit, &pos->rq_xmit); + INIT_LIST_HEAD(&req->rq_xmit2); + goto out; + } + } else { + list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { + if (pos->rq_task->tk_owner != task->tk_owner) + continue; + list_add_tail(&req->rq_xmit2, &pos->rq_xmit2); + INIT_LIST_HEAD(&req->rq_xmit); + goto out; + } + } + list_add_tail(&req->rq_xmit, &xprt->xmit_queue); + INIT_LIST_HEAD(&req->rq_xmit2); +out: + set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); + spin_unlock(&xprt->queue_lock); + } +} + +/** + * xprt_request_dequeue_transmit_locked - remove a task from the transmission queue + * @task: pointer to rpc_task + * + * Remove a task from the transmission queue + * Caller must hold xprt->queue_lock + */ +static void +xprt_request_dequeue_transmit_locked(struct rpc_task *task) +{ + struct rpc_rqst *req = task->tk_rqstp; + + if (!test_and_clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) + return; + if (!list_empty(&req->rq_xmit)) { + list_del(&req->rq_xmit); + if (!list_empty(&req->rq_xmit2)) { + struct rpc_rqst *next = list_first_entry(&req->rq_xmit2, + struct rpc_rqst, rq_xmit2); + list_del(&req->rq_xmit2); + list_add_tail(&next->rq_xmit, &next->rq_xprt->xmit_queue); + } + } else + list_del(&req->rq_xmit2); +} + +/** + * xprt_request_dequeue_transmit - remove a task from the transmission queue + * @task: pointer to rpc_task + * + * Remove a task from the transmission queue + */ +static void +xprt_request_dequeue_transmit(struct rpc_task *task) +{ + struct rpc_rqst *req = task->tk_rqstp; + struct rpc_xprt *xprt = req->rq_xprt; + + spin_lock(&xprt->queue_lock); + xprt_request_dequeue_transmit_locked(task); + spin_unlock(&xprt->queue_lock); +} + +/** + * xprt_request_prepare - prepare an encoded request for transport + * @req: pointer to rpc_rqst + * + * Calls into the transport layer to do whatever is needed to prepare + * the request for transmission or receive. + */ +void +xprt_request_prepare(struct rpc_rqst *req) +{ + struct rpc_xprt *xprt = req->rq_xprt; + + if (xprt->ops->prepare_request) + xprt->ops->prepare_request(req); +} + +/** + * xprt_request_need_retransmit - Test if a task needs retransmission + * @task: pointer to rpc_task + * + * Test for whether a connection breakage requires the task to retransmit + */ +bool +xprt_request_need_retransmit(struct rpc_task *task) +{ + return xprt_request_retransmit_after_disconnect(task); +} + +/** * xprt_prepare_transmit - reserve the transport before sending a request * @task: RPC task about to send a request * @@ -965,32 +1294,18 @@ bool xprt_prepare_transmit(struct rpc_task *task) { struct rpc_rqst *req = task->tk_rqstp; struct rpc_xprt *xprt = req->rq_xprt; - bool ret = false; dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid); - spin_lock_bh(&xprt->transport_lock); - if (!req->rq_bytes_sent) { - if (req->rq_reply_bytes_recvd) { - task->tk_status = req->rq_reply_bytes_recvd; - goto out_unlock; - } - if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) - && xprt_connected(xprt) - && req->rq_connect_cookie == xprt->connect_cookie) { - xprt->ops->set_retrans_timeout(task); - rpc_sleep_on(&xprt->pending, task, xprt_timer); - goto out_unlock; - } - } - if (!xprt->ops->reserve_xprt(xprt, task)) { - task->tk_status = -EAGAIN; - goto out_unlock; + if (!xprt_lock_write(xprt, task)) { + /* Race breaker: someone may have transmitted us */ + if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) + rpc_wake_up_queued_task_set_status(&xprt->sending, + task, 0); + return false; + } - ret = true; -out_unlock: - spin_unlock_bh(&xprt->transport_lock); - return ret; + return true; } void xprt_end_transmit(struct rpc_task *task) @@ -999,54 +1314,62 @@ void xprt_end_transmit(struct rpc_task *task) } /** - * xprt_transmit - send an RPC request on a transport - * @task: controlling RPC task + * xprt_request_transmit - send an RPC request on a transport + * @req: pointer to request to transmit + * @snd_task: RPC task that owns the transport lock * - * We have to copy the iovec because sendmsg fiddles with its contents. + * This performs the transmission of a single request. + * Note that if the request is not the same as snd_task, then it + * does need to be pinned. + * Returns '0' on success. */ -void xprt_transmit(struct rpc_task *task) +static int +xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task) { - struct rpc_rqst *req = task->tk_rqstp; - struct rpc_xprt *xprt = req->rq_xprt; + struct rpc_xprt *xprt = req->rq_xprt; + struct rpc_task *task = req->rq_task; unsigned int connect_cookie; + int is_retrans = RPC_WAS_SENT(task); int status; dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen); - if (!req->rq_reply_bytes_recvd) { - if (list_empty(&req->rq_list) && rpc_reply_expected(task)) { - /* - * Add to the list only if we're expecting a reply - */ - /* Update the softirq receive buffer */ - memcpy(&req->rq_private_buf, &req->rq_rcv_buf, - sizeof(req->rq_private_buf)); - /* Add request to the receive list */ - spin_lock(&xprt->recv_lock); - list_add_tail(&req->rq_list, &xprt->recv); - spin_unlock(&xprt->recv_lock); - xprt_reset_majortimeo(req); - /* Turn off autodisconnect */ - del_singleshot_timer_sync(&xprt->timer); + if (!req->rq_bytes_sent) { + if (xprt_request_data_received(task)) { + status = 0; + goto out_dequeue; } - } else if (!req->rq_bytes_sent) - return; + /* Verify that our message lies in the RPCSEC_GSS window */ + if (rpcauth_xmit_need_reencode(task)) { + status = -EBADMSG; + goto out_dequeue; + } + } + + /* + * Update req->rq_ntrans before transmitting to avoid races with + * xprt_update_rtt(), which needs to know that it is recording a + * reply to the first transmission. + */ + req->rq_ntrans++; connect_cookie = xprt->connect_cookie; - status = xprt->ops->send_request(task); + status = xprt->ops->send_request(req); trace_xprt_transmit(xprt, req->rq_xid, status); if (status != 0) { - task->tk_status = status; - return; + req->rq_ntrans--; + return status; } + + if (is_retrans) + task->tk_client->cl_stats->rpcretrans++; + xprt_inject_disconnect(xprt); dprintk("RPC: %5u xmit complete\n", task->tk_pid); task->tk_flags |= RPC_TASK_SENT; spin_lock_bh(&xprt->transport_lock); - xprt->ops->set_retrans_timeout(task); - xprt->stat.sends++; xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs; xprt->stat.bklog_u += xprt->backlog.qlen; @@ -1055,25 +1378,49 @@ void xprt_transmit(struct rpc_task *task) spin_unlock_bh(&xprt->transport_lock); req->rq_connect_cookie = connect_cookie; - if (rpc_reply_expected(task) && !READ_ONCE(req->rq_reply_bytes_recvd)) { - /* - * Sleep on the pending queue if we're expecting a reply. - * The spinlock ensures atomicity between the test of - * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on(). - */ - spin_lock(&xprt->recv_lock); - if (!req->rq_reply_bytes_recvd) { - rpc_sleep_on(&xprt->pending, task, xprt_timer); - /* - * Send an extra queue wakeup call if the - * connection was dropped in case the call to - * rpc_sleep_on() raced. - */ - if (!xprt_connected(xprt)) - xprt_wake_pending_tasks(xprt, -ENOTCONN); - } - spin_unlock(&xprt->recv_lock); +out_dequeue: + xprt_request_dequeue_transmit(task); + rpc_wake_up_queued_task_set_status(&xprt->sending, task, status); + return status; +} + +/** + * xprt_transmit - send an RPC request on a transport + * @task: controlling RPC task + * + * Attempts to drain the transmit queue. On exit, either the transport + * signalled an error that needs to be handled before transmission can + * resume, or @task finished transmitting, and detected that it already + * received a reply. + */ +void +xprt_transmit(struct rpc_task *task) +{ + struct rpc_rqst *next, *req = task->tk_rqstp; + struct rpc_xprt *xprt = req->rq_xprt; + int status; + + spin_lock(&xprt->queue_lock); + while (!list_empty(&xprt->xmit_queue)) { + next = list_first_entry(&xprt->xmit_queue, + struct rpc_rqst, rq_xmit); + xprt_pin_rqst(next); + spin_unlock(&xprt->queue_lock); + status = xprt_request_transmit(next, task); + if (status == -EBADMSG && next != req) + status = 0; + cond_resched(); + spin_lock(&xprt->queue_lock); + xprt_unpin_rqst(next); + if (status == 0) { + if (!xprt_request_data_received(task) || + test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) + continue; + } else if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) + task->tk_status = status; + break; } + spin_unlock(&xprt->queue_lock); } static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task) @@ -1170,20 +1517,6 @@ out_init_req: } EXPORT_SYMBOL_GPL(xprt_alloc_slot); -void xprt_lock_and_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) -{ - /* Note: grabbing the xprt_lock_write() ensures that we throttle - * new slot allocation if the transport is congested (i.e. when - * reconnecting a stream transport or when out of socket write - * buffer space). - */ - if (xprt_lock_write(xprt, task)) { - xprt_alloc_slot(xprt, task); - xprt_release_write(xprt, task); - } -} -EXPORT_SYMBOL_GPL(xprt_lock_and_alloc_slot); - void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) { spin_lock(&xprt->reserve_lock); @@ -1250,6 +1583,60 @@ void xprt_free(struct rpc_xprt *xprt) } EXPORT_SYMBOL_GPL(xprt_free); +static void +xprt_init_connect_cookie(struct rpc_rqst *req, struct rpc_xprt *xprt) +{ + req->rq_connect_cookie = xprt_connect_cookie(xprt) - 1; +} + +static __be32 +xprt_alloc_xid(struct rpc_xprt *xprt) +{ + __be32 xid; + + spin_lock(&xprt->reserve_lock); + xid = (__force __be32)xprt->xid++; + spin_unlock(&xprt->reserve_lock); + return xid; +} + +static void +xprt_init_xid(struct rpc_xprt *xprt) +{ + xprt->xid = prandom_u32(); +} + +static void +xprt_request_init(struct rpc_task *task) +{ + struct rpc_xprt *xprt = task->tk_xprt; + struct rpc_rqst *req = task->tk_rqstp; + + req->rq_timeout = task->tk_client->cl_timeout->to_initval; + req->rq_task = task; + req->rq_xprt = xprt; + req->rq_buffer = NULL; + req->rq_xid = xprt_alloc_xid(xprt); + xprt_init_connect_cookie(req, xprt); + req->rq_bytes_sent = 0; + req->rq_snd_buf.len = 0; + req->rq_snd_buf.buflen = 0; + req->rq_rcv_buf.len = 0; + req->rq_rcv_buf.buflen = 0; + req->rq_release_snd_buf = NULL; + xprt_reset_majortimeo(req); + dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid, + req, ntohl(req->rq_xid)); +} + +static void +xprt_do_reserve(struct rpc_xprt *xprt, struct rpc_task *task) +{ + xprt->ops->alloc_slot(xprt, task); + if (task->tk_rqstp != NULL) + xprt_request_init(task); +} + /** * xprt_reserve - allocate an RPC request slot * @task: RPC task requesting a slot allocation @@ -1269,7 +1656,7 @@ void xprt_reserve(struct rpc_task *task) task->tk_timeout = 0; task->tk_status = -EAGAIN; if (!xprt_throttle_congested(xprt, task)) - xprt->ops->alloc_slot(xprt, task); + xprt_do_reserve(xprt, task); } /** @@ -1291,45 +1678,29 @@ void xprt_retry_reserve(struct rpc_task *task) task->tk_timeout = 0; task->tk_status = -EAGAIN; - xprt->ops->alloc_slot(xprt, task); -} - -static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt) -{ - __be32 xid; - - spin_lock(&xprt->reserve_lock); - xid = (__force __be32)xprt->xid++; - spin_unlock(&xprt->reserve_lock); - return xid; + xprt_do_reserve(xprt, task); } -static inline void xprt_init_xid(struct rpc_xprt *xprt) -{ - xprt->xid = prandom_u32(); -} - -void xprt_request_init(struct rpc_task *task) +static void +xprt_request_dequeue_all(struct rpc_task *task, struct rpc_rqst *req) { - struct rpc_xprt *xprt = task->tk_xprt; - struct rpc_rqst *req = task->tk_rqstp; + struct rpc_xprt *xprt = req->rq_xprt; - INIT_LIST_HEAD(&req->rq_list); - req->rq_timeout = task->tk_client->cl_timeout->to_initval; - req->rq_task = task; - req->rq_xprt = xprt; - req->rq_buffer = NULL; - req->rq_xid = xprt_alloc_xid(xprt); - req->rq_connect_cookie = xprt->connect_cookie - 1; - req->rq_bytes_sent = 0; - req->rq_snd_buf.len = 0; - req->rq_snd_buf.buflen = 0; - req->rq_rcv_buf.len = 0; - req->rq_rcv_buf.buflen = 0; - req->rq_release_snd_buf = NULL; - xprt_reset_majortimeo(req); - dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid, - req, ntohl(req->rq_xid)); + if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) || + test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) || + xprt_is_pinned_rqst(req)) { + spin_lock(&xprt->queue_lock); + xprt_request_dequeue_transmit_locked(task); + xprt_request_dequeue_receive_locked(task); + while (xprt_is_pinned_rqst(req)) { + set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); + spin_unlock(&xprt->queue_lock); + xprt_wait_on_pinned_rqst(req); + spin_lock(&xprt->queue_lock); + clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); + } + spin_unlock(&xprt->queue_lock); + } } /** @@ -1345,8 +1716,7 @@ void xprt_release(struct rpc_task *task) if (req == NULL) { if (task->tk_client) { xprt = task->tk_xprt; - if (xprt->snd_task == task) - xprt_release_write(xprt, task); + xprt_release_write(xprt, task); } return; } @@ -1356,12 +1726,7 @@ void xprt_release(struct rpc_task *task) task->tk_ops->rpc_count_stats(task, task->tk_calldata); else if (task->tk_client) rpc_count_iostats(task, task->tk_client->cl_metrics); - spin_lock(&xprt->recv_lock); - if (!list_empty(&req->rq_list)) { - list_del_init(&req->rq_list); - xprt_wait_on_pinned_rqst(req); - } - spin_unlock(&xprt->recv_lock); + xprt_request_dequeue_all(task, req); spin_lock_bh(&xprt->transport_lock); xprt->ops->release_xprt(xprt, task); if (xprt->ops->release_request) @@ -1372,6 +1737,7 @@ void xprt_release(struct rpc_task *task) if (req->rq_buffer) xprt->ops->buf_free(task); xprt_inject_disconnect(xprt); + xdr_free_bvec(&req->rq_rcv_buf); if (req->rq_cred != NULL) put_rpccred(req->rq_cred); task->tk_rqstp = NULL; @@ -1385,16 +1751,36 @@ void xprt_release(struct rpc_task *task) xprt_free_bc_request(req); } +#ifdef CONFIG_SUNRPC_BACKCHANNEL +void +xprt_init_bc_request(struct rpc_rqst *req, struct rpc_task *task) +{ + struct xdr_buf *xbufp = &req->rq_snd_buf; + + task->tk_rqstp = req; + req->rq_task = task; + xprt_init_connect_cookie(req, req->rq_xprt); + /* + * Set up the xdr_buf length. + * This also indicates that the buffer is XDR encoded already. + */ + xbufp->len = xbufp->head[0].iov_len + xbufp->page_len + + xbufp->tail[0].iov_len; + req->rq_bytes_sent = 0; +} +#endif + static void xprt_init(struct rpc_xprt *xprt, struct net *net) { kref_init(&xprt->kref); spin_lock_init(&xprt->transport_lock); spin_lock_init(&xprt->reserve_lock); - spin_lock_init(&xprt->recv_lock); + spin_lock_init(&xprt->queue_lock); INIT_LIST_HEAD(&xprt->free); - INIT_LIST_HEAD(&xprt->recv); + xprt->recv_queue = RB_ROOT; + INIT_LIST_HEAD(&xprt->xmit_queue); #if defined(CONFIG_SUNRPC_BACKCHANNEL) spin_lock_init(&xprt->bc_pa_lock); INIT_LIST_HEAD(&xprt->bc_pa_list); @@ -1407,7 +1793,7 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net) rpc_init_wait_queue(&xprt->binding, "xprt_binding"); rpc_init_wait_queue(&xprt->pending, "xprt_pending"); - rpc_init_priority_wait_queue(&xprt->sending, "xprt_sending"); + rpc_init_wait_queue(&xprt->sending, "xprt_sending"); rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); xprt_init_xid(xprt); |