diff options
Diffstat (limited to 'net/smc/smc_cdc.c')
-rw-r--r-- | net/smc/smc_cdc.c | 101 |
1 files changed, 68 insertions, 33 deletions
diff --git a/net/smc/smc_cdc.c b/net/smc/smc_cdc.c index b42395d24cba..a7e8d63fc8ae 100644 --- a/net/smc/smc_cdc.c +++ b/net/smc/smc_cdc.c @@ -44,13 +44,13 @@ static void smc_cdc_tx_handler(struct smc_wr_tx_pend_priv *pnd_snd, smc = container_of(cdcpend->conn, struct smc_sock, conn); bh_lock_sock(&smc->sk); if (!wc_status) { - diff = smc_curs_diff(cdcpend->conn->sndbuf_size, + diff = smc_curs_diff(cdcpend->conn->sndbuf_desc->len, &cdcpend->conn->tx_curs_fin, &cdcpend->cursor); /* sndbuf_space is decreased in smc_sendmsg */ smp_mb__before_atomic(); atomic_add(diff, &cdcpend->conn->sndbuf_space); - /* guarantee 0 <= sndbuf_space <= sndbuf_size */ + /* guarantee 0 <= sndbuf_space <= sndbuf_desc->len */ smp_mb__after_atomic(); smc_curs_write(&cdcpend->conn->tx_curs_fin, smc_curs_read(&cdcpend->cursor, cdcpend->conn), @@ -82,7 +82,7 @@ static inline void smc_cdc_add_pending_send(struct smc_connection *conn, sizeof(struct smc_cdc_msg) > SMC_WR_BUF_SIZE, "must increase SMC_WR_BUF_SIZE to at least sizeof(struct smc_cdc_msg)"); BUILD_BUG_ON_MSG( - offsetof(struct smc_cdc_msg, reserved) > SMC_WR_TX_SIZE, + sizeof(struct smc_cdc_msg) != SMC_WR_TX_SIZE, "must adapt SMC_WR_TX_SIZE to sizeof(struct smc_cdc_msg); if not all smc_wr upper layer protocols use the same message size any more, must start to set link->wr_tx_sges[i].length on each individual smc_wr_tx_send()"); BUILD_BUG_ON_MSG( sizeof(struct smc_cdc_tx_pend) > SMC_WR_TX_PEND_PRIV_SIZE, @@ -164,20 +164,35 @@ static inline bool smc_cdc_before(u16 seq1, u16 seq2) return (s16)(seq1 - seq2) < 0; } +static void smc_cdc_handle_urg_data_arrival(struct smc_sock *smc, + int *diff_prod) +{ + struct smc_connection *conn = &smc->conn; + char *base; + + /* new data included urgent business */ + smc_curs_write(&conn->urg_curs, + smc_curs_read(&conn->local_rx_ctrl.prod, conn), + conn); + conn->urg_state = SMC_URG_VALID; + if (!sock_flag(&smc->sk, SOCK_URGINLINE)) + /* we'll skip the urgent byte, so don't account for it */ + (*diff_prod)--; + base = (char *)conn->rmb_desc->cpu_addr; + if (conn->urg_curs.count) + conn->urg_rx_byte = *(base + conn->urg_curs.count - 1); + else + conn->urg_rx_byte = *(base + conn->rmb_desc->len - 1); + sk_send_sigurg(&smc->sk); +} + static void smc_cdc_msg_recv_action(struct smc_sock *smc, - struct smc_link *link, struct smc_cdc_msg *cdc) { union smc_host_cursor cons_old, prod_old; struct smc_connection *conn = &smc->conn; int diff_cons, diff_prod; - if (!cdc->prod_flags.failover_validation) { - if (smc_cdc_before(ntohs(cdc->seqno), - conn->local_rx_ctrl.seqno)) - /* received seqno is old */ - return; - } smc_curs_write(&prod_old, smc_curs_read(&conn->local_rx_ctrl.prod, conn), conn); @@ -198,18 +213,28 @@ static void smc_cdc_msg_recv_action(struct smc_sock *smc, smp_mb__after_atomic(); } - diff_prod = smc_curs_diff(conn->rmbe_size, &prod_old, + diff_prod = smc_curs_diff(conn->rmb_desc->len, &prod_old, &conn->local_rx_ctrl.prod); if (diff_prod) { + if (conn->local_rx_ctrl.prod_flags.urg_data_present) + smc_cdc_handle_urg_data_arrival(smc, &diff_prod); /* bytes_to_rcv is decreased in smc_recvmsg */ smp_mb__before_atomic(); atomic_add(diff_prod, &conn->bytes_to_rcv); - /* guarantee 0 <= bytes_to_rcv <= rmbe_size */ + /* guarantee 0 <= bytes_to_rcv <= rmb_desc->len */ smp_mb__after_atomic(); smc->sk.sk_data_ready(&smc->sk); - } else if ((conn->local_rx_ctrl.prod_flags.write_blocked) || - (conn->local_rx_ctrl.prod_flags.cons_curs_upd_req)) { - smc->sk.sk_data_ready(&smc->sk); + } else { + if (conn->local_rx_ctrl.prod_flags.write_blocked || + conn->local_rx_ctrl.prod_flags.cons_curs_upd_req || + conn->local_rx_ctrl.prod_flags.urg_data_pending) { + if (conn->local_rx_ctrl.prod_flags.urg_data_pending) + conn->urg_state = SMC_URG_NOTYET; + /* force immediate tx of current consumer cursor, but + * under send_lock to guarantee arrival in seqno-order + */ + smc_tx_sndbuf_nonempty(conn); + } } /* piggy backed tx info */ @@ -219,6 +244,12 @@ static void smc_cdc_msg_recv_action(struct smc_sock *smc, /* trigger socket release if connection closed */ smc_close_wake_tx_prepared(smc); } + if (diff_cons && conn->urg_tx_pend && + atomic_read(&conn->peer_rmbe_space) == conn->peer_rmbe_size) { + /* urg data confirmed by peer, indicate we're ready for more */ + conn->urg_tx_pend = false; + smc->sk.sk_write_space(&smc->sk); + } if (conn->local_rx_ctrl.conn_state_flags.peer_conn_abort) { smc->sk.sk_err = ECONNRESET; @@ -236,26 +267,11 @@ static void smc_cdc_msg_recv_action(struct smc_sock *smc, } /* called under tasklet context */ -static inline void smc_cdc_msg_recv(struct smc_cdc_msg *cdc, - struct smc_link *link, u64 wr_id) +static void smc_cdc_msg_recv(struct smc_sock *smc, struct smc_cdc_msg *cdc) { - struct smc_link_group *lgr = container_of(link, struct smc_link_group, - lnk[SMC_SINGLE_LINK]); - struct smc_connection *connection; - struct smc_sock *smc; - - /* lookup connection */ - read_lock_bh(&lgr->conns_lock); - connection = smc_lgr_find_conn(ntohl(cdc->token), lgr); - if (!connection) { - read_unlock_bh(&lgr->conns_lock); - return; - } - smc = container_of(connection, struct smc_sock, conn); sock_hold(&smc->sk); - read_unlock_bh(&lgr->conns_lock); bh_lock_sock(&smc->sk); - smc_cdc_msg_recv_action(smc, link, cdc); + smc_cdc_msg_recv_action(smc, cdc); bh_unlock_sock(&smc->sk); sock_put(&smc->sk); /* no free sk in softirq-context */ } @@ -266,12 +282,31 @@ static void smc_cdc_rx_handler(struct ib_wc *wc, void *buf) { struct smc_link *link = (struct smc_link *)wc->qp->qp_context; struct smc_cdc_msg *cdc = buf; + struct smc_connection *conn; + struct smc_link_group *lgr; + struct smc_sock *smc; if (wc->byte_len < offsetof(struct smc_cdc_msg, reserved)) return; /* short message */ if (cdc->len != SMC_WR_TX_SIZE) return; /* invalid message */ - smc_cdc_msg_recv(cdc, link, wc->wr_id); + + /* lookup connection */ + lgr = container_of(link, struct smc_link_group, lnk[SMC_SINGLE_LINK]); + read_lock_bh(&lgr->conns_lock); + conn = smc_lgr_find_conn(ntohl(cdc->token), lgr); + read_unlock_bh(&lgr->conns_lock); + if (!conn) + return; + smc = container_of(conn, struct smc_sock, conn); + + if (!cdc->prod_flags.failover_validation) { + if (smc_cdc_before(ntohs(cdc->seqno), + conn->local_rx_ctrl.seqno)) + /* received seqno is old */ + return; + } + smc_cdc_msg_recv(smc, cdc); } static struct smc_wr_rx_handler smc_cdc_rx_handlers[] = { |