summaryrefslogtreecommitdiff
path: root/net/tipc/link.c
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2014-06-25 20:41:32 -0500
committerDavid S. Miller <davem@davemloft.net>2014-06-27 12:50:54 -0700
commit4f1688b2c63cd86f0d7bcf95a9b3040e38bd3c1a (patch)
tree757c43e51ba7260d1745a8d5b2e3bdfb6b0a005a /net/tipc/link.c
parente4de5fab806f74622600ab7fd6ed22b7f911a8c5 (diff)
tipc: introduce send functions for chained buffers in link
The current link implementation provides several different transmit functions, depending on the characteristics of the message to be sent: if it is an iovec or an sk_buff, if it needs fragmentation or not, if the caller holds the node_lock or not. The permutation of these options gives us an unwanted amount of unnecessarily complex code. As a first step towards simplifying the send path for all messages, we introduce two new send functions at link level, tipc_link_xmit2() and __tipc_link_xmit2(). The former looks up a link to the message destination, and if one is found, it grabs the node lock and calls the second function, which works exclusively inside the node lock protection. If no link is found, and the destination is on the same node, it delivers the message directly to the local destination socket. The new functions take a buffer chain where all packet headers are already prepared, and the correct MTU has been used. These two functions will later replace all other link-level transmit functions. The functions are not backwards compatible, so we have added them as new functions with temporary names. They are tested, but have no users yet. Those will be added later in this series. Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Reviewed-by: Erik Hugne <erik.hugne@ericsson.com> Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/link.c')
-rw-r--r--net/tipc/link.c140
1 files changed, 139 insertions, 1 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index ad2c57f5868d..68d2afb44f2f 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -850,6 +850,144 @@ int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector)
return res;
}
+/* tipc_link_cong: determine return value and how to treat the
+ * sent buffer during link congestion.
+ * - For plain, errorless user data messages we keep the buffer and
+ * return -ELINKONG.
+ * - For all other messages we discard the buffer and return -EHOSTUNREACH
+ * - For TIPC internal messages we also reset the link
+ */
+static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf)
+{
+ struct tipc_msg *msg = buf_msg(buf);
+ uint psz = msg_size(msg);
+ uint imp = tipc_msg_tot_importance(msg);
+ u32 oport = msg_tot_origport(msg);
+
+ if (likely(imp <= TIPC_CRITICAL_IMPORTANCE)) {
+ if (!msg_errcode(msg) && !msg_reroute_cnt(msg)) {
+ link_schedule_port(link, oport, psz);
+ return -ELINKCONG;
+ }
+ } else {
+ pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
+ tipc_link_reset(link);
+ }
+ kfree_skb_list(buf);
+ return -EHOSTUNREACH;
+}
+
+/**
+ * __tipc_link_xmit2(): same as tipc_link_xmit2, but destlink is known & locked
+ * @link: link to use
+ * @buf: chain of buffers containing message
+ * Consumes the buffer chain, except when returning -ELINKCONG
+ * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket
+ * user data messages) or -EHOSTUNREACH (all other messages/senders)
+ * Only the socket functions tipc_send_stream() and tipc_send_packet() need
+ * to act on the return value, since they may need to do more send attempts.
+ */
+int __tipc_link_xmit2(struct tipc_link *link, struct sk_buff *buf)
+{
+ struct tipc_msg *msg = buf_msg(buf);
+ uint psz = msg_size(msg);
+ uint qsz = link->out_queue_size;
+ uint sndlim = link->queue_limit[0];
+ uint imp = tipc_msg_tot_importance(msg);
+ uint mtu = link->max_pkt;
+ uint ack = mod(link->next_in_no - 1);
+ uint seqno = link->next_out_no;
+ uint bc_last_in = link->owner->bclink.last_in;
+ struct tipc_media_addr *addr = &link->media_addr;
+ struct sk_buff *next = buf->next;
+
+ /* Match queue limits against msg importance: */
+ if (unlikely(qsz >= link->queue_limit[imp]))
+ return tipc_link_cong(link, buf);
+
+ /* Has valid packet limit been used ? */
+ if (unlikely(psz > mtu)) {
+ kfree_skb_list(buf);
+ return -EMSGSIZE;
+ }
+
+ /* Prepare each packet for sending, and add to outqueue: */
+ while (buf) {
+ next = buf->next;
+ msg = buf_msg(buf);
+ msg_set_word(msg, 2, ((ack << 16) | mod(seqno)));
+ msg_set_bcast_ack(msg, bc_last_in);
+
+ if (!link->first_out) {
+ link->first_out = buf;
+ } else if (qsz < sndlim) {
+ link->last_out->next = buf;
+ } else if (tipc_msg_bundle(link->last_out, buf, mtu)) {
+ link->stats.sent_bundled++;
+ buf = next;
+ next = buf->next;
+ continue;
+ } else if (tipc_msg_make_bundle(&buf, mtu, link->addr)) {
+ link->stats.sent_bundled++;
+ link->stats.sent_bundles++;
+ link->last_out->next = buf;
+ if (!link->next_out)
+ link->next_out = buf;
+ } else {
+ link->last_out->next = buf;
+ if (!link->next_out)
+ link->next_out = buf;
+ }
+
+ /* Send packet if possible: */
+ if (likely(++qsz <= sndlim)) {
+ tipc_bearer_send(link->bearer_id, buf, addr);
+ link->next_out = next;
+ link->unacked_window = 0;
+ }
+ seqno++;
+ link->last_out = buf;
+ buf = next;
+ }
+ link->next_out_no = seqno;
+ link->out_queue_size = qsz;
+ return 0;
+}
+
+/**
+ * tipc_link_xmit2() is the general link level function for message sending
+ * @buf: chain of buffers containing message
+ * @dsz: amount of user data to be sent
+ * @dnode: address of destination node
+ * @selector: a number used for deterministic link selection
+ * Consumes the buffer chain, except when returning -ELINKCONG
+ * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
+ */
+int tipc_link_xmit2(struct sk_buff *buf, u32 dnode, u32 selector)
+{
+ struct tipc_link *link = NULL;
+ struct tipc_node *node;
+ int rc = -EHOSTUNREACH;
+
+ node = tipc_node_find(dnode);
+ if (node) {
+ tipc_node_lock(node);
+ link = node->active_links[selector & 1];
+ if (link)
+ rc = __tipc_link_xmit2(link, buf);
+ tipc_node_unlock(node);
+ }
+
+ if (link)
+ return rc;
+
+ if (likely(in_own_node(dnode)))
+ return tipc_sk_rcv(buf);
+
+ kfree_skb_list(buf);
+ return rc;
+}
+
/*
* tipc_link_sync_xmit - synchronize broadcast link endpoints.
*
@@ -1238,7 +1376,7 @@ static u32 tipc_link_push_packet(struct tipc_link *l_ptr)
tipc_bearer_send(l_ptr->bearer_id, buf,
&l_ptr->media_addr);
if (msg_user(msg) == MSG_BUNDLER)
- msg_set_type(msg, CLOSED_MSG);
+ msg_set_type(msg, BUNDLE_CLOSED);
l_ptr->next_out = buf->next;
return 0;
}