diff options
author | Ilya Dryomov <idryomov@gmail.com> | 2020-11-09 16:29:47 +0100 |
---|---|---|
committer | Ilya Dryomov <idryomov@gmail.com> | 2020-12-14 23:21:49 +0100 |
commit | 6503e0b69c9d4d78b5450db01e79328f8ed4ef21 (patch) | |
tree | f4dbc56a5453e605a50373e649bc997f90917e0a /net/ceph/messenger.c | |
parent | 699921d9e68ff3d9f8645488c12f4689c6533d70 (diff) |
libceph: export remaining protocol independent infrastructure
In preparation for msgr2, make all protocol independent functions
in messenger.c global.
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
Diffstat (limited to 'net/ceph/messenger.c')
-rw-r--r-- | net/ceph/messenger.c | 157 |
1 files changed, 75 insertions, 82 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index d3880fbe8424..85d20372f923 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -96,37 +96,37 @@ static bool con_flag_valid(unsigned long con_flag) } } -static void con_flag_clear(struct ceph_connection *con, unsigned long con_flag) +void ceph_con_flag_clear(struct ceph_connection *con, unsigned long con_flag) { BUG_ON(!con_flag_valid(con_flag)); clear_bit(con_flag, &con->flags); } -static void con_flag_set(struct ceph_connection *con, unsigned long con_flag) +void ceph_con_flag_set(struct ceph_connection *con, unsigned long con_flag) { BUG_ON(!con_flag_valid(con_flag)); set_bit(con_flag, &con->flags); } -static bool con_flag_test(struct ceph_connection *con, unsigned long con_flag) +bool ceph_con_flag_test(struct ceph_connection *con, unsigned long con_flag) { BUG_ON(!con_flag_valid(con_flag)); return test_bit(con_flag, &con->flags); } -static bool con_flag_test_and_clear(struct ceph_connection *con, - unsigned long con_flag) +bool ceph_con_flag_test_and_clear(struct ceph_connection *con, + unsigned long con_flag) { BUG_ON(!con_flag_valid(con_flag)); return test_and_clear_bit(con_flag, &con->flags); } -static bool con_flag_test_and_set(struct ceph_connection *con, - unsigned long con_flag) +bool ceph_con_flag_test_and_set(struct ceph_connection *con, + unsigned long con_flag) { BUG_ON(!con_flag_valid(con_flag)); @@ -199,7 +199,7 @@ const char *ceph_pr_addr(const struct ceph_entity_addr *addr) } EXPORT_SYMBOL(ceph_pr_addr); -static void encode_my_addr(struct ceph_messenger *msgr) +void ceph_encode_my_addr(struct ceph_messenger *msgr) { memcpy(&msgr->my_enc_addr, &msgr->inst.addr, sizeof(msgr->my_enc_addr)); ceph_encode_banner_addr(&msgr->my_enc_addr); @@ -370,7 +370,7 @@ static void ceph_sock_write_space(struct sock *sk) * buffer. See net/ipv4/tcp_input.c:tcp_check_space() * and net/core/stream.c:sk_stream_write_space(). */ - if (con_flag_test(con, CEPH_CON_F_WRITE_PENDING)) { + if (ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING)) { if (sk_stream_is_writeable(sk)) { dout("%s %p queueing write work\n", __func__, con); clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); @@ -396,7 +396,7 @@ static void ceph_sock_state_change(struct sock *sk) case TCP_CLOSE_WAIT: dout("%s TCP_CLOSE_WAIT\n", __func__); con_sock_state_closing(con); - con_flag_set(con, CEPH_CON_F_SOCK_CLOSED); + ceph_con_flag_set(con, CEPH_CON_F_SOCK_CLOSED); queue_con(con); break; case TCP_ESTABLISHED: @@ -430,13 +430,15 @@ static void set_sock_callbacks(struct socket *sock, /* * initiate connection to a remote socket. */ -static int ceph_tcp_connect(struct ceph_connection *con) +int ceph_tcp_connect(struct ceph_connection *con) { struct sockaddr_storage ss = con->peer_addr.in_addr; /* align */ struct socket *sock; unsigned int noio_flag; int ret; + dout("%s con %p peer_addr %s\n", __func__, con, + ceph_pr_addr(&con->peer_addr)); BUG_ON(con->sock); /* sock_create_kern() allocates with GFP_KERNEL */ @@ -454,8 +456,6 @@ static int ceph_tcp_connect(struct ceph_connection *con) set_sock_callbacks(sock, con); - dout("connect %s\n", ceph_pr_addr(&con->peer_addr)); - con_sock_state_connecting(con); ret = sock->ops->connect(sock, (struct sockaddr *)&ss, sizeof(ss), O_NONBLOCK); @@ -570,11 +570,11 @@ static int ceph_tcp_sendpage(struct socket *sock, struct page *page, /* * Shutdown/close the socket for the given connection. */ -static int con_close_socket(struct ceph_connection *con) +int ceph_con_close_socket(struct ceph_connection *con) { int rc = 0; - dout("con_close_socket on %p sock %p\n", con, con->sock); + dout("%s con %p sock %p\n", __func__, con, con->sock); if (con->sock) { rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR); sock_release(con->sock); @@ -587,7 +587,7 @@ static int con_close_socket(struct ceph_connection *con) * received a socket close event before we had the chance to * shut the socket down. */ - con_flag_clear(con, CEPH_CON_F_SOCK_CLOSED); + ceph_con_flag_clear(con, CEPH_CON_F_SOCK_CLOSED); con_sock_state_closed(con); return rc; @@ -597,7 +597,7 @@ static void ceph_con_reset_protocol(struct ceph_connection *con) { dout("%s con %p\n", __func__, con); - con_close_socket(con); + ceph_con_close_socket(con); if (con->in_msg) { WARN_ON(con->in_msg->con != con); ceph_msg_put(con->in_msg); @@ -631,7 +631,7 @@ static void ceph_msg_remove_list(struct list_head *head) } } -static void ceph_con_reset_session(struct ceph_connection *con) +void ceph_con_reset_session(struct ceph_connection *con) { dout("%s con %p\n", __func__, con); @@ -656,10 +656,11 @@ void ceph_con_close(struct ceph_connection *con) dout("con_close %p peer %s\n", con, ceph_pr_addr(&con->peer_addr)); con->state = CEPH_CON_S_CLOSED; - con_flag_clear(con, CEPH_CON_F_LOSSYTX); /* so we retry next connect */ - con_flag_clear(con, CEPH_CON_F_KEEPALIVE_PENDING); - con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); - con_flag_clear(con, CEPH_CON_F_BACKOFF); + ceph_con_flag_clear(con, CEPH_CON_F_LOSSYTX); /* so we retry next + connect */ + ceph_con_flag_clear(con, CEPH_CON_F_KEEPALIVE_PENDING); + ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); + ceph_con_flag_clear(con, CEPH_CON_F_BACKOFF); ceph_con_reset_protocol(con); ceph_con_reset_session(con); @@ -728,7 +729,7 @@ EXPORT_SYMBOL(ceph_con_init); * We maintain a global counter to order connection attempts. Get * a unique seq greater than @gt. */ -static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt) +u32 ceph_get_global_seq(struct ceph_messenger *msgr, u32 gt) { u32 ret; @@ -743,7 +744,7 @@ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt) /* * Discard messages that have been acked by the server. */ -static void ceph_con_discard_sent(struct ceph_connection *con, u64 ack_seq) +void ceph_con_discard_sent(struct ceph_connection *con, u64 ack_seq) { struct ceph_msg *msg; u64 seq; @@ -768,8 +769,7 @@ static void ceph_con_discard_sent(struct ceph_connection *con, u64 ack_seq) * reconnect_seq. This avoids gratuitously resending messages that * the server had received and handled prior to reconnect. */ -static void ceph_con_discard_requeued(struct ceph_connection *con, - u64 reconnect_seq) +void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq) { struct ceph_msg *msg; u64 seq; @@ -1150,8 +1150,8 @@ static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor) cursor->need_crc = true; } -static void ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor, - struct ceph_msg *msg, size_t length) +void ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor, + struct ceph_msg *msg, size_t length) { BUG_ON(!length); BUG_ON(length > msg->data_length); @@ -1168,9 +1168,9 @@ static void ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor, * data item, and supply the page offset and length of that piece. * Indicate whether this is the last piece in this data item. */ -static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor, - size_t *page_offset, size_t *length, - bool *last_piece) +struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor, + size_t *page_offset, size_t *length, + bool *last_piece) { struct page *page; @@ -1209,8 +1209,7 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor, * Returns true if the result moves the cursor on to the next piece * of the data item. */ -static void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, - size_t bytes) +void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, size_t bytes) { bool new_piece; @@ -1284,8 +1283,6 @@ static void prepare_write_message_footer(struct ceph_connection *con) con->out_msg_done = true; } -static void ceph_con_get_out_msg(struct ceph_connection *con); - /* * Prepare headers for the next outgoing message. */ @@ -1355,7 +1352,7 @@ static void prepare_write_message(struct ceph_connection *con) prepare_write_message_footer(con); } - con_flag_set(con, CEPH_CON_F_WRITE_PENDING); + ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); } /* @@ -1376,7 +1373,7 @@ static void prepare_write_ack(struct ceph_connection *con) &con->out_temp_ack); con->out_more = 1; /* more will follow.. eventually.. */ - con_flag_set(con, CEPH_CON_F_WRITE_PENDING); + ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); } /* @@ -1394,7 +1391,7 @@ static void prepare_write_seq(struct ceph_connection *con) con_out_kvec_add(con, sizeof (con->out_temp_ack), &con->out_temp_ack); - con_flag_set(con, CEPH_CON_F_WRITE_PENDING); + ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); } /* @@ -1415,7 +1412,7 @@ static void prepare_write_keepalive(struct ceph_connection *con) } else { con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive); } - con_flag_set(con, CEPH_CON_F_WRITE_PENDING); + ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); } /* @@ -1454,7 +1451,7 @@ static void prepare_write_banner(struct ceph_connection *con) &con->msgr->my_enc_addr); con->out_more = 0; - con_flag_set(con, CEPH_CON_F_WRITE_PENDING); + ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); } static void __prepare_write_connect(struct ceph_connection *con) @@ -1465,12 +1462,12 @@ static void __prepare_write_connect(struct ceph_connection *con) con->auth->authorizer_buf); con->out_more = 0; - con_flag_set(con, CEPH_CON_F_WRITE_PENDING); + ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); } static int prepare_write_connect(struct ceph_connection *con) { - unsigned int global_seq = get_global_seq(con->msgr, 0); + unsigned int global_seq = ceph_get_global_seq(con->msgr, 0); int proto; int ret; @@ -1549,9 +1546,8 @@ out: return ret; /* done! */ } -static u32 ceph_crc32c_page(u32 crc, struct page *page, - unsigned int page_offset, - unsigned int length) +u32 ceph_crc32c_page(u32 crc, struct page *page, unsigned int page_offset, + unsigned int length) { char *kaddr; @@ -1813,7 +1809,7 @@ static int verify_hello(struct ceph_connection *con) return 0; } -static bool addr_is_blank(struct ceph_entity_addr *addr) +bool ceph_addr_is_blank(const struct ceph_entity_addr *addr) { struct sockaddr_storage ss = addr->in_addr; /* align */ struct in_addr *addr4 = &((struct sockaddr_in *)&ss)->sin_addr; @@ -1829,7 +1825,7 @@ static bool addr_is_blank(struct ceph_entity_addr *addr) } } -static int addr_port(struct ceph_entity_addr *addr) +int ceph_addr_port(const struct ceph_entity_addr *addr) { switch (get_unaligned(&addr->in_addr.ss_family)) { case AF_INET: @@ -1840,7 +1836,7 @@ static int addr_port(struct ceph_entity_addr *addr) return 0; } -static void addr_set_port(struct ceph_entity_addr *addr, int p) +void ceph_addr_set_port(struct ceph_entity_addr *addr, int p) { switch (get_unaligned(&addr->in_addr.ss_family)) { case AF_INET: @@ -1998,7 +1994,7 @@ int ceph_parse_ips(const char *c, const char *end, port = CEPH_MON_PORT; } - addr_set_port(&addr[i], port); + ceph_addr_set_port(&addr[i], port); addr[i].type = CEPH_ENTITY_ADDR_TYPE_LEGACY; dout("parse_ips got %s\n", ceph_pr_addr(&addr[i])); @@ -2037,7 +2033,7 @@ static int process_banner(struct ceph_connection *con) */ if (memcmp(&con->peer_addr, &con->actual_peer_addr, sizeof(con->peer_addr)) != 0 && - !(addr_is_blank(&con->actual_peer_addr) && + !(ceph_addr_is_blank(&con->actual_peer_addr) && con->actual_peer_addr.nonce == con->peer_addr.nonce)) { pr_warn("wrong peer, want %s/%u, got %s/%u\n", ceph_pr_addr(&con->peer_addr), @@ -2051,12 +2047,12 @@ static int process_banner(struct ceph_connection *con) /* * did we learn our address? */ - if (addr_is_blank(my_addr)) { + if (ceph_addr_is_blank(my_addr)) { memcpy(&my_addr->in_addr, &con->peer_addr_for_me.in_addr, sizeof(con->peer_addr_for_me.in_addr)); - addr_set_port(my_addr, 0); - encode_my_addr(con->msgr); + ceph_addr_set_port(my_addr, 0); + ceph_encode_my_addr(con->msgr); dout("process_banner learned my addr is %s\n", ceph_pr_addr(my_addr)); } @@ -2192,8 +2188,8 @@ static int process_connect(struct ceph_connection *con) dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n", con->peer_global_seq, le32_to_cpu(con->in_reply.global_seq)); - get_global_seq(con->msgr, - le32_to_cpu(con->in_reply.global_seq)); + ceph_get_global_seq(con->msgr, + le32_to_cpu(con->in_reply.global_seq)); con_out_kvec_reset(con); ret = prepare_write_connect(con); if (ret < 0) @@ -2227,7 +2223,7 @@ static int process_connect(struct ceph_connection *con) le32_to_cpu(con->in_reply.connect_seq)); if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) - con_flag_set(con, CEPH_CON_F_LOSSYTX); + ceph_con_flag_set(con, CEPH_CON_F_LOSSYTX); con->delay = 0; /* reset backoff memory */ @@ -2351,9 +2347,6 @@ static int read_partial_msg_data(struct ceph_connection *con) /* * read (part of) a message. */ -static int ceph_con_in_msg_alloc(struct ceph_connection *con, - struct ceph_msg_header *hdr, int *skip); - static int read_partial_message(struct ceph_connection *con) { struct ceph_msg *m = con->in_msg; @@ -2515,7 +2508,7 @@ static int read_partial_message(struct ceph_connection *con) * be careful not to do anything that waits on other incoming messages or it * may deadlock. */ -static void process_message(struct ceph_connection *con) +void ceph_con_process_message(struct ceph_connection *con) { struct ceph_msg *msg = con->in_msg; @@ -2628,7 +2621,7 @@ more: do_next: if (con->state == CEPH_CON_S_OPEN) { - if (con_flag_test_and_clear(con, + if (ceph_con_flag_test_and_clear(con, CEPH_CON_F_KEEPALIVE_PENDING)) { prepare_write_keepalive(con); goto more; @@ -2645,7 +2638,7 @@ do_next: } /* Nothing to do! */ - con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); + ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); dout("try_write nothing else to write.\n"); ret = 0; out: @@ -2739,7 +2732,7 @@ more: prepare_read_keepalive_ack(con); break; case CEPH_MSGR_TAG_CLOSE: - con_close_socket(con); + ceph_con_close_socket(con); con->state = CEPH_CON_S_CLOSED; goto out; default: @@ -2764,7 +2757,7 @@ more: } if (con->in_tag == CEPH_MSGR_TAG_READY) goto more; - process_message(con); + ceph_con_process_message(con); if (con->state == CEPH_CON_S_OPEN) prepare_read_tag(con); goto more; @@ -2840,7 +2833,7 @@ static void cancel_con(struct ceph_connection *con) static bool con_sock_closed(struct ceph_connection *con) { - if (!con_flag_test_and_clear(con, CEPH_CON_F_SOCK_CLOSED)) + if (!ceph_con_flag_test_and_clear(con, CEPH_CON_F_SOCK_CLOSED)) return false; #define CASE(x) \ @@ -2867,7 +2860,7 @@ static bool con_backoff(struct ceph_connection *con) { int ret; - if (!con_flag_test_and_clear(con, CEPH_CON_F_BACKOFF)) + if (!ceph_con_flag_test_and_clear(con, CEPH_CON_F_BACKOFF)) return false; ret = queue_con_delay(con, con->delay); @@ -2875,7 +2868,7 @@ static bool con_backoff(struct ceph_connection *con) dout("%s: con %p FAILED to back off %lu\n", __func__, con, con->delay); BUG_ON(ret == -ENOENT); - con_flag_set(con, CEPH_CON_F_BACKOFF); + ceph_con_flag_set(con, CEPH_CON_F_BACKOFF); } return true; @@ -2987,7 +2980,7 @@ static void con_fault(struct ceph_connection *con) ceph_con_reset_protocol(con); - if (con_flag_test(con, CEPH_CON_F_LOSSYTX)) { + if (ceph_con_flag_test(con, CEPH_CON_F_LOSSYTX)) { dout("fault on LOSSYTX channel, marking CLOSED\n"); con->state = CEPH_CON_S_CLOSED; return; @@ -2999,9 +2992,9 @@ static void con_fault(struct ceph_connection *con) /* If there are no messages queued or keepalive pending, place * the connection in a STANDBY state */ if (list_empty(&con->out_queue) && - !con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)) { + !ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)) { dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); - con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); + ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); con->state = CEPH_CON_S_STANDBY; } else { /* retry after a delay. */ @@ -3013,7 +3006,7 @@ static void con_fault(struct ceph_connection *con) if (con->delay > MAX_DELAY_INTERVAL) con->delay = MAX_DELAY_INTERVAL; } - con_flag_set(con, CEPH_CON_F_BACKOFF); + ceph_con_flag_set(con, CEPH_CON_F_BACKOFF); queue_con(con); } } @@ -3023,7 +3016,7 @@ void ceph_messenger_reset_nonce(struct ceph_messenger *msgr) { u32 nonce = le32_to_cpu(msgr->inst.addr.nonce) + 1000000; msgr->inst.addr.nonce = cpu_to_le32(nonce); - encode_my_addr(msgr); + ceph_encode_my_addr(msgr); } /* @@ -3037,7 +3030,7 @@ void ceph_messenger_init(struct ceph_messenger *msgr, if (myaddr) { memcpy(&msgr->inst.addr.in_addr, &myaddr->in_addr, sizeof(msgr->inst.addr.in_addr)); - addr_set_port(&msgr->inst.addr, 0); + ceph_addr_set_port(&msgr->inst.addr, 0); } msgr->inst.addr.type = 0; @@ -3047,7 +3040,7 @@ void ceph_messenger_init(struct ceph_messenger *msgr, get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce)); } while (!msgr->inst.addr.nonce); - encode_my_addr(msgr); + ceph_encode_my_addr(msgr); atomic_set(&msgr->stopping, 0); write_pnet(&msgr->net, get_net(current->nsproxy->net_ns)); @@ -3076,8 +3069,8 @@ static void clear_standby(struct ceph_connection *con) dout("clear_standby %p and ++connect_seq\n", con); con->state = CEPH_CON_S_PREOPEN; con->connect_seq++; - WARN_ON(con_flag_test(con, CEPH_CON_F_WRITE_PENDING)); - WARN_ON(con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)); + WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING)); + WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)); } } @@ -3118,7 +3111,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) /* if there wasn't anything waiting to send before, queue * new work */ - if (con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING) == 0) + if (!ceph_con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING)) queue_con(con); } EXPORT_SYMBOL(ceph_con_send); @@ -3214,10 +3207,10 @@ void ceph_con_keepalive(struct ceph_connection *con) dout("con_keepalive %p\n", con); mutex_lock(&con->mutex); clear_standby(con); - con_flag_set(con, CEPH_CON_F_KEEPALIVE_PENDING); + ceph_con_flag_set(con, CEPH_CON_F_KEEPALIVE_PENDING); mutex_unlock(&con->mutex); - if (con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING) == 0) + if (!ceph_con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING)) queue_con(con); } EXPORT_SYMBOL(ceph_con_keepalive); @@ -3423,8 +3416,8 @@ static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) * On error (ENOMEM, EAGAIN, ...), * - con->in_msg == NULL */ -static int ceph_con_in_msg_alloc(struct ceph_connection *con, - struct ceph_msg_header *hdr, int *skip) +int ceph_con_in_msg_alloc(struct ceph_connection *con, + struct ceph_msg_header *hdr, int *skip) { int middle_len = le32_to_cpu(hdr->middle_len); struct ceph_msg *msg; @@ -3470,7 +3463,7 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, return ret; } -static void ceph_con_get_out_msg(struct ceph_connection *con) +void ceph_con_get_out_msg(struct ceph_connection *con) { struct ceph_msg *msg; |