diff options
Diffstat (limited to 'net/ceph')
-rw-r--r-- | net/ceph/messenger.c | 47 | ||||
-rw-r--r-- | net/ceph/osd_client.c | 129 |
2 files changed, 121 insertions, 55 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 1948d592aa54..b2f571dd933d 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -174,6 +174,7 @@ static struct lock_class_key socket_class; #define SKIP_BUF_SIZE 1024 static void queue_con(struct ceph_connection *con); +static void cancel_con(struct ceph_connection *con); static void con_work(struct work_struct *); static void con_fault(struct ceph_connection *con); @@ -680,7 +681,7 @@ void ceph_con_close(struct ceph_connection *con) reset_connection(con); con->peer_global_seq = 0; - cancel_delayed_work(&con->work); + cancel_con(con); con_close_socket(con); mutex_unlock(&con->mutex); } @@ -900,7 +901,7 @@ static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor, BUG_ON(page_count > (int)USHRT_MAX); cursor->page_count = (unsigned short)page_count; BUG_ON(length > SIZE_MAX - cursor->page_offset); - cursor->last_piece = (size_t)cursor->page_offset + length <= PAGE_SIZE; + cursor->last_piece = cursor->page_offset + cursor->resid <= PAGE_SIZE; } static struct page * @@ -2667,19 +2668,16 @@ static int queue_con_delay(struct ceph_connection *con, unsigned long delay) { if (!con->ops->get(con)) { dout("%s %p ref count 0\n", __func__, con); - return -ENOENT; } if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) { dout("%s %p - already queued\n", __func__, con); con->ops->put(con); - return -EBUSY; } dout("%s %p %lu\n", __func__, con, delay); - return 0; } @@ -2688,6 +2686,14 @@ static void queue_con(struct ceph_connection *con) (void) queue_con_delay(con, 0); } +static void cancel_con(struct ceph_connection *con) +{ + if (cancel_delayed_work(&con->work)) { + dout("%s %p\n", __func__, con); + con->ops->put(con); + } +} + static bool con_sock_closed(struct ceph_connection *con) { if (!con_flag_test_and_clear(con, CON_FLAG_SOCK_CLOSED)) @@ -3269,24 +3275,21 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip) /* * Free a generically kmalloc'd message. */ -void ceph_msg_kfree(struct ceph_msg *m) +static void ceph_msg_free(struct ceph_msg *m) { - dout("msg_kfree %p\n", m); + dout("%s %p\n", __func__, m); ceph_kvfree(m->front.iov_base); kmem_cache_free(ceph_msg_cache, m); } -/* - * Drop a msg ref. Destroy as needed. - */ -void ceph_msg_last_put(struct kref *kref) +static void ceph_msg_release(struct kref *kref) { struct ceph_msg *m = container_of(kref, struct ceph_msg, kref); LIST_HEAD(data); struct list_head *links; struct list_head *next; - dout("ceph_msg_put last one on %p\n", m); + dout("%s %p\n", __func__, m); WARN_ON(!list_empty(&m->list_head)); /* drop middle, data, if any */ @@ -3308,9 +3311,25 @@ void ceph_msg_last_put(struct kref *kref) if (m->pool) ceph_msgpool_put(m->pool, m); else - ceph_msg_kfree(m); + ceph_msg_free(m); +} + +struct ceph_msg *ceph_msg_get(struct ceph_msg *msg) +{ + dout("%s %p (was %d)\n", __func__, msg, + atomic_read(&msg->kref.refcount)); + kref_get(&msg->kref); + return msg; +} +EXPORT_SYMBOL(ceph_msg_get); + +void ceph_msg_put(struct ceph_msg *msg) +{ + dout("%s %p (was %d)\n", __func__, msg, + atomic_read(&msg->kref.refcount)); + kref_put(&msg->kref, ceph_msg_release); } -EXPORT_SYMBOL(ceph_msg_last_put); +EXPORT_SYMBOL(ceph_msg_put); void ceph_msg_dump(struct ceph_msg *msg) { diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 05be0c181695..30f6faf3584f 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -297,12 +297,21 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req, /* * requests */ -void ceph_osdc_release_request(struct kref *kref) +static void ceph_osdc_release_request(struct kref *kref) { - struct ceph_osd_request *req; + struct ceph_osd_request *req = container_of(kref, + struct ceph_osd_request, r_kref); unsigned int which; - req = container_of(kref, struct ceph_osd_request, r_kref); + dout("%s %p (r_request %p r_reply %p)\n", __func__, req, + req->r_request, req->r_reply); + WARN_ON(!RB_EMPTY_NODE(&req->r_node)); + WARN_ON(!list_empty(&req->r_req_lru_item)); + WARN_ON(!list_empty(&req->r_osd_item)); + WARN_ON(!list_empty(&req->r_linger_item)); + WARN_ON(!list_empty(&req->r_linger_osd_item)); + WARN_ON(req->r_osd); + if (req->r_request) ceph_msg_put(req->r_request); if (req->r_reply) { @@ -320,7 +329,22 @@ void ceph_osdc_release_request(struct kref *kref) kmem_cache_free(ceph_osd_request_cache, req); } -EXPORT_SYMBOL(ceph_osdc_release_request); + +void ceph_osdc_get_request(struct ceph_osd_request *req) +{ + dout("%s %p (was %d)\n", __func__, req, + atomic_read(&req->r_kref.refcount)); + kref_get(&req->r_kref); +} +EXPORT_SYMBOL(ceph_osdc_get_request); + +void ceph_osdc_put_request(struct ceph_osd_request *req) +{ + dout("%s %p (was %d)\n", __func__, req, + atomic_read(&req->r_kref.refcount)); + kref_put(&req->r_kref, ceph_osdc_release_request); +} +EXPORT_SYMBOL(ceph_osdc_put_request); struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, struct ceph_snap_context *snapc, @@ -364,7 +388,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, RB_CLEAR_NODE(&req->r_node); INIT_LIST_HEAD(&req->r_unsafe_item); INIT_LIST_HEAD(&req->r_linger_item); - INIT_LIST_HEAD(&req->r_linger_osd); + INIT_LIST_HEAD(&req->r_linger_osd_item); INIT_LIST_HEAD(&req->r_req_lru_item); INIT_LIST_HEAD(&req->r_osd_item); @@ -916,7 +940,7 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc, * list at the end to keep things in tid order. */ list_for_each_entry_safe(req, nreq, &osd->o_linger_requests, - r_linger_osd) { + r_linger_osd_item) { /* * reregister request prior to unregistering linger so * that r_osd is preserved. @@ -1008,6 +1032,8 @@ static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) { dout("__remove_osd %p\n", osd); BUG_ON(!list_empty(&osd->o_requests)); + BUG_ON(!list_empty(&osd->o_linger_requests)); + rb_erase(&osd->o_node, &osdc->osds); list_del_init(&osd->o_osd_lru); ceph_con_close(&osd->o_con); @@ -1029,12 +1055,23 @@ static void remove_all_osds(struct ceph_osd_client *osdc) static void __move_osd_to_lru(struct ceph_osd_client *osdc, struct ceph_osd *osd) { - dout("__move_osd_to_lru %p\n", osd); + dout("%s %p\n", __func__, osd); BUG_ON(!list_empty(&osd->o_osd_lru)); + list_add_tail(&osd->o_osd_lru, &osdc->osd_lru); osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ; } +static void maybe_move_osd_to_lru(struct ceph_osd_client *osdc, + struct ceph_osd *osd) +{ + dout("%s %p\n", __func__, osd); + + if (list_empty(&osd->o_requests) && + list_empty(&osd->o_linger_requests)) + __move_osd_to_lru(osdc, osd); +} + static void __remove_osd_from_lru(struct ceph_osd *osd) { dout("__remove_osd_from_lru %p\n", osd); @@ -1175,6 +1212,7 @@ static void __unregister_request(struct ceph_osd_client *osdc, dout("__unregister_request %p tid %lld\n", req, req->r_tid); rb_erase(&req->r_node, &osdc->requests); + RB_CLEAR_NODE(&req->r_node); osdc->num_requests--; if (req->r_osd) { @@ -1182,12 +1220,8 @@ static void __unregister_request(struct ceph_osd_client *osdc, ceph_msg_revoke(req->r_request); list_del_init(&req->r_osd_item); - if (list_empty(&req->r_osd->o_requests) && - list_empty(&req->r_osd->o_linger_requests)) { - dout("moving osd to %p lru\n", req->r_osd); - __move_osd_to_lru(osdc, req->r_osd); - } - if (list_empty(&req->r_linger_item)) + maybe_move_osd_to_lru(osdc, req->r_osd); + if (list_empty(&req->r_linger_osd_item)) req->r_osd = NULL; } @@ -1214,45 +1248,39 @@ static void __cancel_request(struct ceph_osd_request *req) static void __register_linger_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req) { - dout("__register_linger_request %p\n", req); + dout("%s %p tid %llu\n", __func__, req, req->r_tid); + WARN_ON(!req->r_linger); + ceph_osdc_get_request(req); list_add_tail(&req->r_linger_item, &osdc->req_linger); if (req->r_osd) - list_add_tail(&req->r_linger_osd, + list_add_tail(&req->r_linger_osd_item, &req->r_osd->o_linger_requests); } static void __unregister_linger_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req) { - dout("__unregister_linger_request %p\n", req); + WARN_ON(!req->r_linger); + + if (list_empty(&req->r_linger_item)) { + dout("%s %p tid %llu not registered\n", __func__, req, + req->r_tid); + return; + } + + dout("%s %p tid %llu\n", __func__, req, req->r_tid); list_del_init(&req->r_linger_item); - if (req->r_osd) { - list_del_init(&req->r_linger_osd); - if (list_empty(&req->r_osd->o_requests) && - list_empty(&req->r_osd->o_linger_requests)) { - dout("moving osd to %p lru\n", req->r_osd); - __move_osd_to_lru(osdc, req->r_osd); - } + if (req->r_osd) { + list_del_init(&req->r_linger_osd_item); + maybe_move_osd_to_lru(osdc, req->r_osd); if (list_empty(&req->r_osd_item)) req->r_osd = NULL; } ceph_osdc_put_request(req); } -void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc, - struct ceph_osd_request *req) -{ - mutex_lock(&osdc->request_mutex); - if (req->r_linger) { - req->r_linger = 0; - __unregister_linger_request(osdc, req); - } - mutex_unlock(&osdc->request_mutex); -} -EXPORT_SYMBOL(ceph_osdc_unregister_linger_request); - void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, struct ceph_osd_request *req) { @@ -2430,6 +2458,25 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc, EXPORT_SYMBOL(ceph_osdc_start_request); /* + * Unregister a registered request. The request is not completed (i.e. + * no callbacks or wakeups) - higher layers are supposed to know what + * they are canceling. + */ +void ceph_osdc_cancel_request(struct ceph_osd_request *req) +{ + struct ceph_osd_client *osdc = req->r_osdc; + + mutex_lock(&osdc->request_mutex); + if (req->r_linger) + __unregister_linger_request(osdc, req); + __unregister_request(osdc, req); + mutex_unlock(&osdc->request_mutex); + + dout("%s %p tid %llu canceled\n", __func__, req, req->r_tid); +} +EXPORT_SYMBOL(ceph_osdc_cancel_request); + +/* * wait for a request to complete */ int ceph_osdc_wait_request(struct ceph_osd_client *osdc, @@ -2437,18 +2484,18 @@ int ceph_osdc_wait_request(struct ceph_osd_client *osdc, { int rc; + dout("%s %p tid %llu\n", __func__, req, req->r_tid); + rc = wait_for_completion_interruptible(&req->r_completion); if (rc < 0) { - mutex_lock(&osdc->request_mutex); - __cancel_request(req); - __unregister_request(osdc, req); - mutex_unlock(&osdc->request_mutex); + dout("%s %p tid %llu interrupted\n", __func__, req, req->r_tid); + ceph_osdc_cancel_request(req); complete_request(req); - dout("wait_request tid %llu canceled/timed out\n", req->r_tid); return rc; } - dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result); + dout("%s %p tid %llu result %d\n", __func__, req, req->r_tid, + req->r_result); return req->r_result; } EXPORT_SYMBOL(ceph_osdc_wait_request); |