summaryrefslogtreecommitdiff
path: root/net/rxrpc
diff options
context:
space:
mode:
authorDavid Howells <dhowells@redhat.com>2022-10-10 15:51:39 +0100
committerDavid Howells <dhowells@redhat.com>2022-12-01 13:36:41 +0000
commit15f661dc95daec9b38e8e4cc931c95afe0ae0cef (patch)
treeaa5687f899ffe2c55438704c386ac5cdaed4370a /net/rxrpc
parent81f2e8adc0fd10847637873dafe8610f3fb4cdff (diff)
rxrpc: Implement a mechanism to send an event notification to a call
Provide a means by which an event notification can be sent to a call such that the I/O thread can process it rather than it being done in a separate workqueue. This will allow a lot of locking to be removed. Signed-off-by: David Howells <dhowells@redhat.com> cc: Marc Dionne <marc.dionne@auristor.com> cc: linux-afs@lists.infradead.org
Diffstat (limited to 'net/rxrpc')
-rw-r--r--net/rxrpc/ar-internal.h5
-rw-r--r--net/rxrpc/call_object.c24
-rw-r--r--net/rxrpc/input.c3
-rw-r--r--net/rxrpc/io_thread.c20
-rw-r--r--net/rxrpc/local_object.c1
5 files changed, 48 insertions, 5 deletions
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 654e9dab107c..a80655fa9dfb 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -292,6 +292,7 @@ struct rxrpc_local {
struct sk_buff_head reject_queue; /* packets awaiting rejection */
struct sk_buff_head event_queue; /* endpoint event packets awaiting processing */
struct sk_buff_head rx_queue; /* Received packets */
+ struct list_head call_attend_q; /* Calls requiring immediate attention */
struct rb_root client_bundles; /* Client connection bundles by socket params */
spinlock_t client_bundles_lock; /* Lock for client_bundles */
spinlock_t lock; /* access lock */
@@ -616,6 +617,7 @@ struct rxrpc_call {
struct list_head recvmsg_link; /* Link in rx->recvmsg_q */
struct list_head sock_link; /* Link in rx->sock_calls */
struct rb_node sock_node; /* Node in rx->calls */
+ struct list_head attend_link; /* Link in local->call_attend_q */
struct rxrpc_txbuf *tx_pending; /* Tx buffer being filled */
wait_queue_head_t waitq; /* Wait queue for channel or Tx */
s64 tx_total_len; /* Total length left to be transmitted (or -1) */
@@ -843,6 +845,7 @@ extern const char *const rxrpc_call_states[];
extern const char *const rxrpc_call_completions[];
extern struct kmem_cache *rxrpc_call_jar;
+void rxrpc_poke_call(struct rxrpc_call *call, enum rxrpc_call_poke_trace what);
struct rxrpc_call *rxrpc_find_call_by_user_ID(struct rxrpc_sock *, unsigned long);
struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *, gfp_t, unsigned int);
struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *,
@@ -951,7 +954,7 @@ void rxrpc_unpublish_service_conn(struct rxrpc_connection *);
/*
* input.c
*/
-void rxrpc_input_call_packet(struct rxrpc_call *, struct sk_buff *);
+void rxrpc_input_call_event(struct rxrpc_call *, struct sk_buff *);
void rxrpc_input_implicit_end_call(struct rxrpc_sock *, struct rxrpc_connection *,
struct rxrpc_call *);
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index f6d1b3a6f8c6..997641e3d1c8 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -45,6 +45,29 @@ static struct semaphore rxrpc_call_limiter =
static struct semaphore rxrpc_kernel_call_limiter =
__SEMAPHORE_INITIALIZER(rxrpc_kernel_call_limiter, 1000);
+void rxrpc_poke_call(struct rxrpc_call *call, enum rxrpc_call_poke_trace what)
+{
+ struct rxrpc_local *local;
+ struct rxrpc_peer *peer = call->peer;
+ bool busy;
+
+ if (WARN_ON_ONCE(!peer))
+ return;
+ local = peer->local;
+
+ if (call->state < RXRPC_CALL_COMPLETE) {
+ spin_lock_bh(&local->lock);
+ busy = !list_empty(&call->attend_link);
+ trace_rxrpc_poke_call(call, busy, what);
+ if (!busy) {
+ rxrpc_get_call(call, rxrpc_call_get_poke);
+ list_add_tail(&call->attend_link, &local->call_attend_q);
+ }
+ spin_unlock_bh(&local->lock);
+ rxrpc_wake_up_io_thread(local);
+ }
+}
+
static void rxrpc_call_timer_expired(struct timer_list *t)
{
struct rxrpc_call *call = from_timer(call, t, timer);
@@ -137,6 +160,7 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
INIT_LIST_HEAD(&call->accept_link);
INIT_LIST_HEAD(&call->recvmsg_link);
INIT_LIST_HEAD(&call->sock_link);
+ INIT_LIST_HEAD(&call->attend_link);
INIT_LIST_HEAD(&call->tx_buffer);
skb_queue_head_init(&call->recvmsg_queue);
skb_queue_head_init(&call->rx_oos_queue);
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index 13c52145a926..036f02371051 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -1017,8 +1017,7 @@ static void rxrpc_input_abort(struct rxrpc_call *call, struct sk_buff *skb)
/*
* Process an incoming call packet.
*/
-void rxrpc_input_call_packet(struct rxrpc_call *call,
- struct sk_buff *skb)
+void rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
{
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
unsigned long timo;
diff --git a/net/rxrpc/io_thread.c b/net/rxrpc/io_thread.c
index 416c6101cf78..cc249bc6b8cd 100644
--- a/net/rxrpc/io_thread.c
+++ b/net/rxrpc/io_thread.c
@@ -366,7 +366,7 @@ static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff *skb)
/* Process a call packet; this either discards or passes on the ref
* elsewhere.
*/
- rxrpc_input_call_packet(call, skb);
+ rxrpc_input_call_event(call, skb);
goto out;
discard:
@@ -413,6 +413,7 @@ int rxrpc_io_thread(void *data)
{
struct sk_buff_head rx_queue;
struct rxrpc_local *local = data;
+ struct rxrpc_call *call;
struct sk_buff *skb;
skb_queue_head_init(&rx_queue);
@@ -422,6 +423,20 @@ int rxrpc_io_thread(void *data)
for (;;) {
rxrpc_inc_stat(local->rxnet, stat_io_loop);
+ /* Deal with calls that want immediate attention. */
+ if ((call = list_first_entry_or_null(&local->call_attend_q,
+ struct rxrpc_call,
+ attend_link))) {
+ spin_lock_bh(&local->lock);
+ list_del_init(&call->attend_link);
+ spin_unlock_bh(&local->lock);
+
+ trace_rxrpc_call_poked(call);
+ rxrpc_input_call_event(call, NULL);
+ rxrpc_put_call(call, rxrpc_call_put_poke);
+ continue;
+ }
+
/* Process received packets and errors. */
if ((skb = __skb_dequeue(&rx_queue))) {
switch (skb->mark) {
@@ -450,7 +465,8 @@ int rxrpc_io_thread(void *data)
}
set_current_state(TASK_INTERRUPTIBLE);
- if (!skb_queue_empty(&local->rx_queue)) {
+ if (!skb_queue_empty(&local->rx_queue) ||
+ !list_empty(&local->call_attend_q)) {
__set_current_state(TASK_RUNNING);
continue;
}
diff --git a/net/rxrpc/local_object.c b/net/rxrpc/local_object.c
index 6b4d77219f36..03f491cc23ef 100644
--- a/net/rxrpc/local_object.c
+++ b/net/rxrpc/local_object.c
@@ -104,6 +104,7 @@ static struct rxrpc_local *rxrpc_alloc_local(struct rxrpc_net *rxnet,
skb_queue_head_init(&local->reject_queue);
skb_queue_head_init(&local->event_queue);
skb_queue_head_init(&local->rx_queue);
+ INIT_LIST_HEAD(&local->call_attend_q);
local->client_bundles = RB_ROOT;
spin_lock_init(&local->client_bundles_lock);
spin_lock_init(&local->lock);