diff options
author | Ilya Dryomov <idryomov@gmail.com> | 2016-04-28 16:07:27 +0200 |
---|---|---|
committer | Ilya Dryomov <idryomov@gmail.com> | 2016-05-26 01:15:28 +0200 |
commit | b07d3c4bd7270c74e2b6803af8ac8a00cb3e5ed2 (patch) | |
tree | fcaa24cff4389ab2f0e7b2d2ca327fb76bdef6a7 | |
parent | 1907920324f1f3ebb6618344417c03a2863bba01 (diff) |
libceph: support for checking on status of watch
Implement ceph_osdc_watch_check() to be able to check on status of
watch. Note that the time it takes for a watch/notify event to get
delivered through the notify_wq is taken into account.
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
-rw-r--r-- | include/linux/ceph/osd_client.h | 4 | ||||
-rw-r--r-- | net/ceph/osd_client.c | 52 |
2 files changed, 55 insertions, 1 deletions
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 63054fae4f15..2ae7cfd82ec9 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -213,6 +213,8 @@ struct ceph_osd_linger_request { struct ceph_osd_request *reg_req; struct ceph_osd_request *ping_req; unsigned long ping_sent; + unsigned long watch_valid_thru; + struct list_head pending_lworks; struct ceph_osd_request_target t; u32 last_force_resend; @@ -417,5 +419,7 @@ int ceph_osdc_notify(struct ceph_osd_client *osdc, u32 timeout, struct page ***preply_pages, size_t *preply_len); +int ceph_osdc_watch_check(struct ceph_osd_client *osdc, + struct ceph_osd_linger_request *lreq); #endif diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index e6e3ab4223db..5ac6dce74f07 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -1746,6 +1746,7 @@ static void linger_release(struct kref *kref) WARN_ON(!RB_EMPTY_NODE(&lreq->node)); WARN_ON(!RB_EMPTY_NODE(&lreq->osdc_node)); WARN_ON(!list_empty(&lreq->scan_item)); + WARN_ON(!list_empty(&lreq->pending_lworks)); WARN_ON(lreq->osd); if (lreq->reg_req) @@ -1783,6 +1784,7 @@ linger_alloc(struct ceph_osd_client *osdc) RB_CLEAR_NODE(&lreq->node); RB_CLEAR_NODE(&lreq->osdc_node); INIT_LIST_HEAD(&lreq->scan_item); + INIT_LIST_HEAD(&lreq->pending_lworks); init_completion(&lreq->reg_commit_wait); init_completion(&lreq->notify_finish_wait); @@ -1890,6 +1892,8 @@ static void cancel_linger_request(struct ceph_osd_request *req) struct linger_work { struct work_struct work; struct ceph_osd_linger_request *lreq; + struct list_head pending_item; + unsigned long queued_stamp; union { struct { @@ -1916,6 +1920,7 @@ static struct linger_work *lwork_alloc(struct ceph_osd_linger_request *lreq, return NULL; INIT_WORK(&lwork->work, workfn); + INIT_LIST_HEAD(&lwork->pending_item); lwork->lreq = linger_get(lreq); return lwork; @@ -1925,6 +1930,10 @@ static void lwork_free(struct linger_work *lwork) { struct ceph_osd_linger_request *lreq = lwork->lreq; + mutex_lock(&lreq->lock); + list_del(&lwork->pending_item); + mutex_unlock(&lreq->lock); + linger_put(lreq); kfree(lwork); } @@ -1935,6 +1944,10 @@ static void lwork_queue(struct linger_work *lwork) struct ceph_osd_client *osdc = lreq->osdc; verify_lreq_locked(lreq); + WARN_ON(!list_empty(&lwork->pending_item)); + + lwork->queued_stamp = jiffies; + list_add_tail(&lwork->pending_item, &lreq->pending_lworks); queue_work(osdc->notify_wq, &lwork->work); } @@ -2116,7 +2129,9 @@ static void linger_ping_cb(struct ceph_osd_request *req) __func__, lreq, lreq->linger_id, req->r_result, lreq->ping_sent, lreq->last_error); if (lreq->register_gen == req->r_ops[0].watch.gen) { - if (req->r_result && !lreq->last_error) { + if (!req->r_result) { + lreq->watch_valid_thru = lreq->ping_sent; + } else if (!lreq->last_error) { lreq->last_error = normalize_watch_error(req->r_result); queue_watch_error(lreq); } @@ -3316,6 +3331,7 @@ ceph_osdc_watch(struct ceph_osd_client *osdc, lreq->wcb = wcb; lreq->errcb = errcb; lreq->data = data; + lreq->watch_valid_thru = jiffies; ceph_oid_copy(&lreq->t.base_oid, oid); ceph_oloc_copy(&lreq->t.base_oloc, oloc); @@ -3578,6 +3594,40 @@ out_put_lreq: EXPORT_SYMBOL(ceph_osdc_notify); /* + * Return the number of milliseconds since the watch was last + * confirmed, or an error. If there is an error, the watch is no + * longer valid, and should be destroyed with ceph_osdc_unwatch(). + */ +int ceph_osdc_watch_check(struct ceph_osd_client *osdc, + struct ceph_osd_linger_request *lreq) +{ + unsigned long stamp, age; + int ret; + + down_read(&osdc->lock); + mutex_lock(&lreq->lock); + stamp = lreq->watch_valid_thru; + if (!list_empty(&lreq->pending_lworks)) { + struct linger_work *lwork = + list_first_entry(&lreq->pending_lworks, + struct linger_work, + pending_item); + + if (time_before(lwork->queued_stamp, stamp)) + stamp = lwork->queued_stamp; + } + age = jiffies - stamp; + dout("%s lreq %p linger_id %llu age %lu last_error %d\n", __func__, + lreq, lreq->linger_id, age, lreq->last_error); + /* we are truncating to msecs, so return a safe upper bound */ + ret = lreq->last_error ?: 1 + jiffies_to_msecs(age); + + mutex_unlock(&lreq->lock); + up_read(&osdc->lock); + return ret; +} + +/* * Call all pending notify callbacks - for use after a watch is * unregistered, to make sure no more callbacks for it will be invoked */ |