summaryrefslogtreecommitdiff
path: root/net/tipc/link.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/link.c')
-rw-r--r--net/tipc/link.c119
1 files changed, 54 insertions, 65 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index fb1485dc6736..6c775a107a02 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -275,7 +275,7 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
link_init_max_pkt(l_ptr);
l_ptr->next_out_no = 1;
- INIT_LIST_HEAD(&l_ptr->waiting_ports);
+ __skb_queue_head_init(&l_ptr->waiting_sks);
link_reset_statistics(l_ptr);
@@ -322,66 +322,47 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down)
}
/**
- * link_schedule_port - schedule port for deferred sending
- * @l_ptr: pointer to link
- * @origport: reference to sending port
- * @sz: amount of data to be sent
- *
- * Schedules port for renewed sending of messages after link congestion
- * has abated.
+ * link_schedule_user - schedule user for wakeup after congestion
+ * @link: congested link
+ * @oport: sending port
+ * @chain_sz: size of buffer chain that was attempted sent
+ * @imp: importance of message attempted sent
+ * Create pseudo msg to send back to user when congestion abates
*/
-static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz)
+static bool link_schedule_user(struct tipc_link *link, u32 oport,
+ uint chain_sz, uint imp)
{
- struct tipc_port *p_ptr;
- struct tipc_sock *tsk;
+ struct sk_buff *buf;
- spin_lock_bh(&tipc_port_list_lock);
- p_ptr = tipc_port_lock(origport);
- if (p_ptr) {
- if (!list_empty(&p_ptr->wait_list))
- goto exit;
- tsk = tipc_port_to_sock(p_ptr);
- tsk->link_cong = 1;
- p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt);
- list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports);
- l_ptr->stats.link_congs++;
-exit:
- tipc_port_unlock(p_ptr);
- }
- spin_unlock_bh(&tipc_port_list_lock);
- return -ELINKCONG;
+ buf = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0, tipc_own_addr,
+ tipc_own_addr, oport, 0, 0);
+ if (!buf)
+ return false;
+ TIPC_SKB_CB(buf)->chain_sz = chain_sz;
+ TIPC_SKB_CB(buf)->chain_imp = imp;
+ __skb_queue_tail(&link->waiting_sks, buf);
+ link->stats.link_congs++;
+ return true;
}
-void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all)
+/**
+ * link_prepare_wakeup - prepare users for wakeup after congestion
+ * @link: congested link
+ * Move a number of waiting users, as permitted by available space in
+ * the send queue, from link wait queue to node wait queue for wakeup
+ */
+static void link_prepare_wakeup(struct tipc_link *link)
{
- struct tipc_port *p_ptr;
- struct tipc_sock *tsk;
- struct tipc_port *temp_p_ptr;
- int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size;
-
- if (all)
- win = 100000;
- if (win <= 0)
- return;
- if (!spin_trylock_bh(&tipc_port_list_lock))
- return;
- if (link_congested(l_ptr))
- goto exit;
- list_for_each_entry_safe(p_ptr, temp_p_ptr, &l_ptr->waiting_ports,
- wait_list) {
- if (win <= 0)
+ struct sk_buff_head *wq = &link->waiting_sks;
+ struct sk_buff *buf;
+ uint pend_qsz = link->out_queue_size;
+
+ for (buf = skb_peek(wq); buf; buf = skb_peek(wq)) {
+ if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(buf)->chain_imp])
break;
- tsk = tipc_port_to_sock(p_ptr);
- list_del_init(&p_ptr->wait_list);
- spin_lock_bh(p_ptr->lock);
- tsk->link_cong = 0;
- tipc_sock_wakeup(tsk);
- win -= p_ptr->waiting_pkts;
- spin_unlock_bh(p_ptr->lock);
+ pend_qsz += TIPC_SKB_CB(buf)->chain_sz;
+ __skb_queue_tail(&link->owner->waiting_sks, __skb_dequeue(wq));
}
-
-exit:
- spin_unlock_bh(&tipc_port_list_lock);
}
/**
@@ -423,6 +404,7 @@ void tipc_link_reset(struct tipc_link *l_ptr)
u32 prev_state = l_ptr->state;
u32 checkpoint = l_ptr->next_in_no;
int was_active_link = tipc_link_is_active(l_ptr);
+ struct tipc_node *owner = l_ptr->owner;
msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));
@@ -450,9 +432,10 @@ void tipc_link_reset(struct tipc_link *l_ptr)
kfree_skb(l_ptr->proto_msg_queue);
l_ptr->proto_msg_queue = NULL;
kfree_skb_list(l_ptr->oldest_deferred_in);
- if (!list_empty(&l_ptr->waiting_ports))
- tipc_link_wakeup_ports(l_ptr, 1);
-
+ if (!skb_queue_empty(&l_ptr->waiting_sks)) {
+ skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks);
+ owner->action_flags |= TIPC_WAKEUP_USERS;
+ }
l_ptr->retransm_queue_head = 0;
l_ptr->retransm_queue_size = 0;
l_ptr->last_out = NULL;
@@ -688,19 +671,23 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf)
{
struct tipc_msg *msg = buf_msg(buf);
- uint psz = msg_size(msg);
uint imp = tipc_msg_tot_importance(msg);
u32 oport = msg_tot_origport(msg);
- if (likely(imp <= TIPC_CRITICAL_IMPORTANCE)) {
- if (!msg_errcode(msg) && !msg_reroute_cnt(msg)) {
- link_schedule_port(link, oport, psz);
- return -ELINKCONG;
- }
- } else {
+ if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
tipc_link_reset(link);
+ goto drop;
}
+ if (unlikely(msg_errcode(msg)))
+ goto drop;
+ if (unlikely(msg_reroute_cnt(msg)))
+ goto drop;
+ if (TIPC_SKB_CB(buf)->wakeup_pending)
+ return -ELINKCONG;
+ if (link_schedule_user(link, oport, TIPC_SKB_CB(buf)->chain_sz, imp))
+ return -ELINKCONG;
+drop:
kfree_skb_list(buf);
return -EHOSTUNREACH;
}
@@ -1202,8 +1189,10 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
if (unlikely(l_ptr->next_out))
tipc_link_push_queue(l_ptr);
- if (unlikely(!list_empty(&l_ptr->waiting_ports)))
- tipc_link_wakeup_ports(l_ptr, 0);
+ if (released && !skb_queue_empty(&l_ptr->waiting_sks)) {
+ link_prepare_wakeup(l_ptr);
+ l_ptr->owner->action_flags |= TIPC_WAKEUP_USERS;
+ }
/* Process the incoming packet */
if (unlikely(!link_working_working(l_ptr))) {