From 566050e17e53db283d4e26b73b4b50556f97ce7b Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 12 Nov 2020 12:55:39 +0100 Subject: libceph: separate msgr1 protocol implementation In preparation for msgr2, define internal messenger <-> protocol interface (as opposed to external messenger <-> client interface, which is struct ceph_connection_operations) consisting of try_read(), try_write(), revoke(), revoke_incoming(), opened(), reset_session() and reset_protocol() ops. The semantics are exactly the same as they are now. Signed-off-by: Ilya Dryomov --- include/linux/ceph/messenger.h | 8 +++ net/ceph/messenger.c | 138 ++++++++++++++++++++++++++--------------- 2 files changed, 96 insertions(+), 50 deletions(-) diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index 93815f1a42b5..8cc8b08eb3dd 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h @@ -382,6 +382,14 @@ int ceph_con_in_msg_alloc(struct ceph_connection *con, struct ceph_msg_header *hdr, int *skip); void ceph_con_get_out_msg(struct ceph_connection *con); +int ceph_con_v1_try_read(struct ceph_connection *con); +int ceph_con_v1_try_write(struct ceph_connection *con); +void ceph_con_v1_revoke(struct ceph_connection *con); +void ceph_con_v1_revoke_incoming(struct ceph_connection *con); +bool ceph_con_v1_opened(struct ceph_connection *con); +void ceph_con_v1_reset_session(struct ceph_connection *con); +void ceph_con_v1_reset_protocol(struct ceph_connection *con); + extern const char *ceph_pr_addr(const struct ceph_entity_addr *addr); diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 85d20372f923..4ca7d9b594c7 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -593,6 +593,11 @@ int ceph_con_close_socket(struct ceph_connection *con) return rc; } +void ceph_con_v1_reset_protocol(struct ceph_connection *con) +{ + con->out_skip = 0; +} + static void ceph_con_reset_protocol(struct ceph_connection *con) { dout("%s con %p\n", __func__, con); @@ -609,7 +614,7 @@ static void ceph_con_reset_protocol(struct ceph_connection *con) con->out_msg = NULL; } - con->out_skip = 0; + ceph_con_v1_reset_protocol(con); } /* @@ -631,6 +636,12 @@ static void ceph_msg_remove_list(struct list_head *head) } } +void ceph_con_v1_reset_session(struct ceph_connection *con) +{ + con->connect_seq = 0; + con->peer_global_seq = 0; +} + void ceph_con_reset_session(struct ceph_connection *con) { dout("%s con %p\n", __func__, con); @@ -643,8 +654,7 @@ void ceph_con_reset_session(struct ceph_connection *con) con->in_seq = 0; con->in_seq_acked = 0; - con->connect_seq = 0; - con->peer_global_seq = 0; + ceph_con_v1_reset_session(con); } /* @@ -692,12 +702,17 @@ void ceph_con_open(struct ceph_connection *con, } EXPORT_SYMBOL(ceph_con_open); +bool ceph_con_v1_opened(struct ceph_connection *con) +{ + return con->connect_seq; +} + /* * return true if this connection ever successfully opened */ bool ceph_con_opened(struct ceph_connection *con) { - return con->connect_seq > 0; + return ceph_con_v1_opened(con); } /* @@ -2552,7 +2567,7 @@ static int read_keepalive_ack(struct ceph_connection *con) * Write something to the socket. Called in a worker thread when the * socket appears to be writeable and we have something ready to send. */ -static int try_write(struct ceph_connection *con) +int ceph_con_v1_try_write(struct ceph_connection *con) { int ret = 1; @@ -2649,7 +2664,7 @@ out: /* * Read what we can from the socket. */ -static int try_read(struct ceph_connection *con) +int ceph_con_v1_try_read(struct ceph_connection *con) { int ret = -1; @@ -2930,7 +2945,7 @@ static void ceph_con_workfn(struct work_struct *work) BUG_ON(con->sock); } - ret = try_read(con); + ret = ceph_con_v1_try_read(con); if (ret < 0) { if (ret == -EAGAIN) continue; @@ -2940,7 +2955,7 @@ static void ceph_con_workfn(struct work_struct *work) break; } - ret = try_write(con); + ret = ceph_con_v1_try_write(con); if (ret < 0) { if (ret == -EAGAIN) continue; @@ -3116,6 +3131,29 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) } EXPORT_SYMBOL(ceph_con_send); +void ceph_con_v1_revoke(struct ceph_connection *con) +{ + struct ceph_msg *msg = con->out_msg; + + WARN_ON(con->out_skip); + /* footer */ + if (con->out_msg_done) { + con->out_skip += con_out_kvec_skip(con); + } else { + WARN_ON(!msg->data_length); + con->out_skip += sizeof_footer(con); + } + /* data, middle, front */ + if (msg->data_length) + con->out_skip += msg->cursor.total_resid; + if (msg->middle) + con->out_skip += con_out_kvec_skip(con); + con->out_skip += con_out_kvec_skip(con); + + dout("%s con %p out_kvec_bytes %d out_skip %d\n", __func__, con, + con->out_kvec_bytes, con->out_skip); +} + /* * Revoke a message that was previously queued for send */ @@ -3129,39 +3167,50 @@ void ceph_msg_revoke(struct ceph_msg *msg) } mutex_lock(&con->mutex); - if (!list_empty(&msg->list_head)) { - dout("%s %p msg %p - was on queue\n", __func__, con, msg); - list_del_init(&msg->list_head); - msg->hdr.seq = 0; - - ceph_msg_put(msg); + if (list_empty(&msg->list_head)) { + WARN_ON(con->out_msg == msg); + dout("%s con %p msg %p not linked\n", __func__, con, msg); + mutex_unlock(&con->mutex); + return; } - if (con->out_msg == msg) { - BUG_ON(con->out_skip); - /* footer */ - if (con->out_msg_done) { - con->out_skip += con_out_kvec_skip(con); - } else { - BUG_ON(!msg->data_length); - con->out_skip += sizeof_footer(con); - } - /* data, middle, front */ - if (msg->data_length) - con->out_skip += msg->cursor.total_resid; - if (msg->middle) - con->out_skip += con_out_kvec_skip(con); - con->out_skip += con_out_kvec_skip(con); - dout("%s %p msg %p - was sending, will write %d skip %d\n", - __func__, con, msg, con->out_kvec_bytes, con->out_skip); - msg->hdr.seq = 0; + dout("%s con %p msg %p was linked\n", __func__, con, msg); + msg->hdr.seq = 0; + ceph_msg_remove(msg); + + if (con->out_msg == msg) { + WARN_ON(con->state != CEPH_CON_S_OPEN); + dout("%s con %p msg %p was sending\n", __func__, con, msg); + ceph_con_v1_revoke(con); + ceph_msg_put(con->out_msg); con->out_msg = NULL; - ceph_msg_put(msg); + } else { + dout("%s con %p msg %p not current, out_msg %p\n", __func__, + con, msg, con->out_msg); } - mutex_unlock(&con->mutex); } +void ceph_con_v1_revoke_incoming(struct ceph_connection *con) +{ + unsigned int front_len = le32_to_cpu(con->in_hdr.front_len); + unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len); + unsigned int data_len = le32_to_cpu(con->in_hdr.data_len); + + /* skip rest of message */ + con->in_base_pos = con->in_base_pos - + sizeof(struct ceph_msg_header) - + front_len - + middle_len - + data_len - + sizeof(struct ceph_msg_footer); + + con->in_tag = CEPH_MSGR_TAG_READY; + con->in_seq++; + + dout("%s con %p in_base_pos %d\n", __func__, con, con->in_base_pos); +} + /* * Revoke a message that we may be reading data into */ @@ -3176,25 +3225,14 @@ void ceph_msg_revoke_incoming(struct ceph_msg *msg) mutex_lock(&con->mutex); if (con->in_msg == msg) { - unsigned int front_len = le32_to_cpu(con->in_hdr.front_len); - unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len); - unsigned int data_len = le32_to_cpu(con->in_hdr.data_len); - - /* skip rest of message */ - dout("%s %p msg %p revoked\n", __func__, con, msg); - con->in_base_pos = con->in_base_pos - - sizeof(struct ceph_msg_header) - - front_len - - middle_len - - data_len - - sizeof(struct ceph_msg_footer); + WARN_ON(con->state != CEPH_CON_S_OPEN); + dout("%s con %p msg %p was recving\n", __func__, con, msg); + ceph_con_v1_revoke_incoming(con); ceph_msg_put(con->in_msg); con->in_msg = NULL; - con->in_tag = CEPH_MSGR_TAG_READY; - con->in_seq++; } else { - dout("%s %p in_msg %p msg %p no-op\n", - __func__, con, con->in_msg, msg); + dout("%s con %p msg %p not current, in_msg %p\n", __func__, + con, msg, con->in_msg); } mutex_unlock(&con->mutex); } -- cgit v1.2.3-58-ga151