summaryrefslogtreecommitdiff
path: root/fs
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2020-06-08 12:49:18 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2020-06-08 12:49:18 -0700
commit95288a9b3beee8dd69d73b7691e36f2f231b7903 (patch)
treeb7bb598a516e5d7fa7dba040ac34bb0608b6388e /fs
parentca687877e05ad1bf5b4cefd9cdd091044626deac (diff)
parentdc1dad8e1a612650b1e786e992cb0c6e101e226a (diff)
Merge tag 'ceph-for-5.8-rc1' of git://github.com/ceph/ceph-client
Pull ceph updates from Ilya Dryomov: "The highlights are: - OSD/MDS latency and caps cache metrics infrastructure for the filesytem (Xiubo Li). Currently available through debugfs and will be periodically sent to the MDS in the future. - support for replica reads (balanced and localized reads) for rbd and the filesystem (myself). The default remains to always read from primary, users can opt-in with the new crush_location and read_from_replica options. Note that reading from replica is safe for general use only since Octopus. - support for RADOS allocation hint flags (myself). Currently used by rbd to propagate the compressible/incompressible hint given with the new compression_hint map option and ready for passing on more advanced hints, e.g. based on fadvise() from the filesystem. - support for efficient cross-quota-realm renames (Luis Henriques) - assorted cap handling improvements and cleanups, particularly untangling some of the locking (Jeff Layton)" * tag 'ceph-for-5.8-rc1' of git://github.com/ceph/ceph-client: (29 commits) rbd: compression_hint option libceph: support for alloc hint flags libceph: read_from_replica option libceph: support for balanced and localized reads libceph: crush_location infrastructure libceph: decode CRUSH device/bucket types and names libceph: add non-asserting rbtree insertion helper ceph: skip checking caps when session reconnecting and releasing reqs ceph: make sure mdsc->mutex is nested in s->s_mutex to fix dead lock ceph: don't return -ESTALE if there's still an open file libceph, rbd: replace zero-length array with flexible-array ceph: allow rename operation under different quota realms ceph: normalize 'delta' parameter usage in check_quota_exceeded ceph: ceph_kick_flushing_caps needs the s_mutex ceph: request expedited service on session's last cap flush ceph: convert mdsc->cap_dirty to a per-session list ceph: reset i_requested_max_size if file write is not wanted ceph: throw a warning if we destroy session with mutex still locked ceph: fix potential race in ceph_check_caps ceph: document what protects i_dirty_item and i_flushing_item ...
Diffstat (limited to 'fs')
-rw-r--r--fs/ceph/Makefile2
-rw-r--r--fs/ceph/acl.c2
-rw-r--r--fs/ceph/addr.c20
-rw-r--r--fs/ceph/caps.c425
-rw-r--r--fs/ceph/debugfs.c100
-rw-r--r--fs/ceph/dir.c26
-rw-r--r--fs/ceph/export.c9
-rw-r--r--fs/ceph/file.c30
-rw-r--r--fs/ceph/inode.c4
-rw-r--r--fs/ceph/mds_client.c48
-rw-r--r--fs/ceph/mds_client.h15
-rw-r--r--fs/ceph/metric.c148
-rw-r--r--fs/ceph/metric.h62
-rw-r--r--fs/ceph/quota.c62
-rw-r--r--fs/ceph/super.h34
-rw-r--r--fs/ceph/xattr.c4
16 files changed, 807 insertions, 184 deletions
diff --git a/fs/ceph/Makefile b/fs/ceph/Makefile
index 0a0823d378db..50c635dc7f71 100644
--- a/fs/ceph/Makefile
+++ b/fs/ceph/Makefile
@@ -8,7 +8,7 @@ obj-$(CONFIG_CEPH_FS) += ceph.o
ceph-y := super.o inode.o dir.o file.o locks.o addr.o ioctl.o \
export.o caps.o snap.o xattr.o quota.o io.o \
mds_client.o mdsmap.o strings.o ceph_frag.o \
- debugfs.o util.o
+ debugfs.o util.o metric.o
ceph-$(CONFIG_CEPH_FSCACHE) += cache.o
ceph-$(CONFIG_CEPH_FS_POSIX_ACL) += acl.o
diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c
index 26be6520d3fb..e0465741c591 100644
--- a/fs/ceph/acl.c
+++ b/fs/ceph/acl.c
@@ -22,7 +22,7 @@ static inline void ceph_set_cached_acl(struct inode *inode,
struct ceph_inode_info *ci = ceph_inode(inode);
spin_lock(&ci->i_ceph_lock);
- if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 0))
+ if (__ceph_caps_issued_mask_metric(ci, CEPH_CAP_XATTR_SHARED, 0))
set_cached_acl(inode, type, acl);
else
forget_cached_acl(inode, type);
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 6f4678d98df7..01ad09733ac7 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -11,10 +11,12 @@
#include <linux/task_io_accounting_ops.h>
#include <linux/signal.h>
#include <linux/iversion.h>
+#include <linux/ktime.h>
#include "super.h"
#include "mds_client.h"
#include "cache.h"
+#include "metric.h"
#include <linux/ceph/osd_client.h>
#include <linux/ceph/striper.h>
@@ -216,6 +218,9 @@ static int ceph_sync_readpages(struct ceph_fs_client *fsc,
if (!rc)
rc = ceph_osdc_wait_request(osdc, req);
+ ceph_update_read_latency(&fsc->mdsc->metric, req->r_start_latency,
+ req->r_end_latency, rc);
+
ceph_osdc_put_request(req);
dout("readpages result %d\n", rc);
return rc;
@@ -299,6 +304,7 @@ static int ceph_readpage(struct file *filp, struct page *page)
static void finish_read(struct ceph_osd_request *req)
{
struct inode *inode = req->r_inode;
+ struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_osd_data *osd_data;
int rc = req->r_result <= 0 ? req->r_result : 0;
int bytes = req->r_result >= 0 ? req->r_result : 0;
@@ -336,6 +342,10 @@ unlock:
put_page(page);
bytes -= PAGE_SIZE;
}
+
+ ceph_update_read_latency(&fsc->mdsc->metric, req->r_start_latency,
+ req->r_end_latency, rc);
+
kfree(osd_data->pages);
}
@@ -643,6 +653,9 @@ static int ceph_sync_writepages(struct ceph_fs_client *fsc,
if (!rc)
rc = ceph_osdc_wait_request(osdc, req);
+ ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
+ req->r_end_latency, rc);
+
ceph_osdc_put_request(req);
if (rc == 0)
rc = len;
@@ -794,6 +807,9 @@ static void writepages_finish(struct ceph_osd_request *req)
ceph_clear_error_write(ci);
}
+ ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
+ req->r_end_latency, rc);
+
/*
* We lost the cache cap, need to truncate the page before
* it is unlocked, otherwise we'd truncate it later in the
@@ -1852,6 +1868,10 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
if (!err)
err = ceph_osdc_wait_request(&fsc->client->osdc, req);
+
+ ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
+ req->r_end_latency, err);
+
out_put:
ceph_osdc_put_request(req);
if (err == -ECANCELED)
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index f1acde6fb9a6..972c13aa4225 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -597,6 +597,27 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
}
}
+/**
+ * change_auth_cap_ses - move inode to appropriate lists when auth caps change
+ * @ci: inode to be moved
+ * @session: new auth caps session
+ */
+static void change_auth_cap_ses(struct ceph_inode_info *ci,
+ struct ceph_mds_session *session)
+{
+ lockdep_assert_held(&ci->i_ceph_lock);
+
+ if (list_empty(&ci->i_dirty_item) && list_empty(&ci->i_flushing_item))
+ return;
+
+ spin_lock(&session->s_mdsc->cap_dirty_lock);
+ if (!list_empty(&ci->i_dirty_item))
+ list_move(&ci->i_dirty_item, &session->s_cap_dirty);
+ if (!list_empty(&ci->i_flushing_item))
+ list_move_tail(&ci->i_flushing_item, &session->s_cap_flushing);
+ spin_unlock(&session->s_mdsc->cap_dirty_lock);
+}
+
/*
* Add a capability under the given MDS session.
*
@@ -727,6 +748,9 @@ void ceph_add_cap(struct inode *inode,
if (flags & CEPH_CAP_FLAG_AUTH) {
if (!ci->i_auth_cap ||
ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0) {
+ if (ci->i_auth_cap &&
+ ci->i_auth_cap->session != cap->session)
+ change_auth_cap_ses(ci, cap->session);
ci->i_auth_cap = cap;
cap->mds_wanted = wanted;
}
@@ -912,6 +936,20 @@ int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int touch)
return 0;
}
+int __ceph_caps_issued_mask_metric(struct ceph_inode_info *ci, int mask,
+ int touch)
+{
+ struct ceph_fs_client *fsc = ceph_sb_to_client(ci->vfs_inode.i_sb);
+ int r;
+
+ r = __ceph_caps_issued_mask(ci, mask, touch);
+ if (r)
+ ceph_update_cap_hit(&fsc->mdsc->metric);
+ else
+ ceph_update_cap_mis(&fsc->mdsc->metric);
+ return r;
+}
+
/*
* Return true if mask caps are currently being revoked by an MDS.
*/
@@ -1109,8 +1147,10 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
/* remove from inode's cap rbtree, and clear auth cap */
rb_erase(&cap->ci_node, &ci->i_caps);
- if (ci->i_auth_cap == cap)
+ if (ci->i_auth_cap == cap) {
+ WARN_ON_ONCE(!list_empty(&ci->i_dirty_item));
ci->i_auth_cap = NULL;
+ }
/* remove from session list */
spin_lock(&session->s_cap_lock);
@@ -1167,6 +1207,7 @@ struct cap_msg_args {
u64 xattr_version;
u64 change_attr;
struct ceph_buffer *xattr_buf;
+ struct ceph_buffer *old_xattr_buf;
struct timespec64 atime, mtime, ctime, btime;
int op, caps, wanted, dirty;
u32 seq, issue_seq, mseq, time_warp_seq;
@@ -1175,6 +1216,7 @@ struct cap_msg_args {
kgid_t gid;
umode_t mode;
bool inline_data;
+ bool wake;
};
/*
@@ -1304,44 +1346,29 @@ void __ceph_remove_caps(struct ceph_inode_info *ci)
}
/*
- * Send a cap msg on the given inode. Update our caps state, then
- * drop i_ceph_lock and send the message.
+ * Prepare to send a cap message to an MDS. Update the cap state, and populate
+ * the arg struct with the parameters that will need to be sent. This should
+ * be done under the i_ceph_lock to guard against changes to cap state.
*
* Make note of max_size reported/requested from mds, revoked caps
* that have now been implemented.
- *
- * Return non-zero if delayed release, or we experienced an error
- * such that the caller should requeue + retry later.
- *
- * called with i_ceph_lock, then drops it.
- * caller should hold snap_rwsem (read), s_mutex.
*/
-static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
- int op, int flags, int used, int want, int retain,
- int flushing, u64 flush_tid, u64 oldest_flush_tid)
- __releases(cap->ci->i_ceph_lock)
+static void __prep_cap(struct cap_msg_args *arg, struct ceph_cap *cap,
+ int op, int flags, int used, int want, int retain,
+ int flushing, u64 flush_tid, u64 oldest_flush_tid)
{
struct ceph_inode_info *ci = cap->ci;
struct inode *inode = &ci->vfs_inode;
- struct ceph_buffer *old_blob = NULL;
- struct cap_msg_args arg;
int held, revoking;
- int wake = 0;
- int ret;
- /* Don't send anything if it's still being created. Return delayed */
- if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE) {
- spin_unlock(&ci->i_ceph_lock);
- dout("%s async create in flight for %p\n", __func__, inode);
- return 1;
- }
+ lockdep_assert_held(&ci->i_ceph_lock);
held = cap->issued | cap->implemented;
revoking = cap->implemented & ~cap->issued;
retain &= ~revoking;
- dout("__send_cap %p cap %p session %p %s -> %s (revoking %s)\n",
- inode, cap, cap->session,
+ dout("%s %p cap %p session %p %s -> %s (revoking %s)\n",
+ __func__, inode, cap, cap->session,
ceph_cap_string(held), ceph_cap_string(held & retain),
ceph_cap_string(revoking));
BUG_ON((retain & CEPH_CAP_PIN) == 0);
@@ -1349,60 +1376,62 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
ci->i_ceph_flags &= ~CEPH_I_FLUSH;
cap->issued &= retain; /* drop bits we don't want */
- if (cap->implemented & ~cap->issued) {
- /*
- * Wake up any waiters on wanted -> needed transition.
- * This is due to the weird transition from buffered
- * to sync IO... we need to flush dirty pages _before_
- * allowing sync writes to avoid reordering.
- */
- wake = 1;
- }
+ /*
+ * Wake up any waiters on wanted -> needed transition. This is due to
+ * the weird transition from buffered to sync IO... we need to flush
+ * dirty pages _before_ allowing sync writes to avoid reordering.
+ */
+ arg->wake = cap->implemented & ~cap->issued;
cap->implemented &= cap->issued | used;
cap->mds_wanted = want;
- arg.session = cap->session;
- arg.ino = ceph_vino(inode).ino;
- arg.cid = cap->cap_id;
- arg.follows = flushing ? ci->i_head_snapc->seq : 0;
- arg.flush_tid = flush_tid;
- arg.oldest_flush_tid = oldest_flush_tid;
-
- arg.size = inode->i_size;
- ci->i_reported_size = arg.size;
- arg.max_size = ci->i_wanted_max_size;
- if (cap == ci->i_auth_cap)
- ci->i_requested_max_size = arg.max_size;
+ arg->session = cap->session;
+ arg->ino = ceph_vino(inode).ino;
+ arg->cid = cap->cap_id;
+ arg->follows = flushing ? ci->i_head_snapc->seq : 0;
+ arg->flush_tid = flush_tid;
+ arg->oldest_flush_tid = oldest_flush_tid;
+
+ arg->size = inode->i_size;
+ ci->i_reported_size = arg->size;
+ arg->max_size = ci->i_wanted_max_size;
+ if (cap == ci->i_auth_cap) {
+ if (want & CEPH_CAP_ANY_FILE_WR)
+ ci->i_requested_max_size = arg->max_size;
+ else
+ ci->i_requested_max_size = 0;
+ }
if (flushing & CEPH_CAP_XATTR_EXCL) {
- old_blob = __ceph_build_xattrs_blob(ci);
- arg.xattr_version = ci->i_xattrs.version;
- arg.xattr_buf = ci->i_xattrs.blob;
+ arg->old_xattr_buf = __ceph_build_xattrs_blob(ci);
+ arg->xattr_version = ci->i_xattrs.version;
+ arg->xattr_buf = ci->i_xattrs.blob;
} else {
- arg.xattr_buf = NULL;
+ arg->xattr_buf = NULL;
+ arg->old_xattr_buf = NULL;
}
- arg.mtime = inode->i_mtime;
- arg.atime = inode->i_atime;
- arg.ctime = inode->i_ctime;
- arg.btime = ci->i_btime;
- arg.change_attr = inode_peek_iversion_raw(inode);
+ arg->mtime = inode->i_mtime;
+ arg->atime = inode->i_atime;
+ arg->ctime = inode->i_ctime;
+ arg->btime = ci->i_btime;
+ arg->change_attr = inode_peek_iversion_raw(inode);
- arg.op = op;
- arg.caps = cap->implemented;
- arg.wanted = want;
- arg.dirty = flushing;
+ arg->op = op;
+ arg->caps = cap->implemented;
+ arg->wanted = want;
+ arg->dirty = flushing;
- arg.seq = cap->seq;
- arg.issue_seq = cap->issue_seq;
- arg.mseq = cap->mseq;
- arg.time_warp_seq = ci->i_time_warp_seq;
+ arg->seq = cap->seq;
+ arg->issue_seq = cap->issue_seq;
+ arg->mseq = cap->mseq;
+ arg->time_warp_seq = ci->i_time_warp_seq;
- arg.uid = inode->i_uid;
- arg.gid = inode->i_gid;
- arg.mode = inode->i_mode;
+ arg->uid = inode->i_uid;
+ arg->gid = inode->i_gid;
+ arg->mode = inode->i_mode;
- arg.inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
+ arg->inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
if (!(flags & CEPH_CLIENT_CAPS_PENDING_CAPSNAP) &&
!list_empty(&ci->i_cap_snaps)) {
struct ceph_cap_snap *capsnap;
@@ -1415,27 +1444,35 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
}
}
}
- arg.flags = flags;
-
- spin_unlock(&ci->i_ceph_lock);
+ arg->flags = flags;
+}
- ceph_buffer_put(old_blob);
+/*
+ * Send a cap msg on the given inode.
+ *
+ * Caller should hold snap_rwsem (read), s_mutex.
+ */
+static void __send_cap(struct ceph_mds_client *mdsc, struct cap_msg_args *arg,
+ struct ceph_inode_info *ci)
+{
+ struct inode *inode = &ci->vfs_inode;
+ int ret;
- ret = send_cap_msg(&arg);
+ ret = send_cap_msg(arg);
if (ret < 0) {
pr_err("error sending cap msg, ino (%llx.%llx) "
"flushing %s tid %llu, requeue\n",
- ceph_vinop(inode), ceph_cap_string(flushing),
- flush_tid);
+ ceph_vinop(inode), ceph_cap_string(arg->dirty),
+ arg->flush_tid);
spin_lock(&ci->i_ceph_lock);
__cap_delay_requeue(mdsc, ci);
spin_unlock(&ci->i_ceph_lock);
}
- if (wake)
- wake_up_all(&ci->i_cap_wq);
+ ceph_buffer_put(arg->old_xattr_buf);
- return ret;
+ if (arg->wake)
+ wake_up_all(&ci->i_cap_wq);
}
static inline int __send_flush_snap(struct inode *inode,
@@ -1456,6 +1493,7 @@ static inline int __send_flush_snap(struct inode *inode,
arg.max_size = 0;
arg.xattr_version = capsnap->xattr_version;
arg.xattr_buf = capsnap->xattr_blob;
+ arg.old_xattr_buf = NULL;
arg.atime = capsnap->atime;
arg.mtime = capsnap->mtime;
@@ -1479,6 +1517,7 @@ static inline int __send_flush_snap(struct inode *inode,
arg.inline_data = capsnap->inline_data;
arg.flags = 0;
+ arg.wake = false;
return send_cap_msg(&arg);
}
@@ -1676,6 +1715,8 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask,
ceph_cap_string(was | mask));
ci->i_dirty_caps |= mask;
if (was == 0) {
+ struct ceph_mds_session *session = ci->i_auth_cap->session;
+
WARN_ON_ONCE(ci->i_prealloc_cap_flush);
swap(ci->i_prealloc_cap_flush, *pcf);
@@ -1688,7 +1729,7 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask,
&ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap);
BUG_ON(!list_empty(&ci->i_dirty_item));
spin_lock(&mdsc->cap_dirty_lock);
- list_add(&ci->i_dirty_item, &mdsc->cap_dirty);
+ list_add(&ci->i_dirty_item, &session->s_cap_dirty);
spin_unlock(&mdsc->cap_dirty_lock);
if (ci->i_flushing_caps == 0) {
ihold(inode);
@@ -1731,30 +1772,33 @@ static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc)
* Remove cap_flush from the mdsc's or inode's flushing cap list.
* Return true if caller needs to wake up flush waiters.
*/
-static bool __finish_cap_flush(struct ceph_mds_client *mdsc,
- struct ceph_inode_info *ci,
- struct ceph_cap_flush *cf)
+static bool __detach_cap_flush_from_mdsc(struct ceph_mds_client *mdsc,
+ struct ceph_cap_flush *cf)
{
struct ceph_cap_flush *prev;
bool wake = cf->wake;
- if (mdsc) {
- /* are there older pending cap flushes? */
- if (wake && cf->g_list.prev != &mdsc->cap_flush_list) {
- prev = list_prev_entry(cf, g_list);
- prev->wake = true;
- wake = false;
- }
- list_del(&cf->g_list);
- } else if (ci) {
- if (wake && cf->i_list.prev != &ci->i_cap_flush_list) {
- prev = list_prev_entry(cf, i_list);
- prev->wake = true;
- wake = false;
- }
- list_del(&cf->i_list);
- } else {
- BUG_ON(1);
+
+ if (wake && cf->g_list.prev != &mdsc->cap_flush_list) {
+ prev = list_prev_entry(cf, g_list);
+ prev->wake = true;
+ wake = false;
}
+ list_del(&cf->g_list);
+ return wake;
+}
+
+static bool __detach_cap_flush_from_ci(struct ceph_inode_info *ci,
+ struct ceph_cap_flush *cf)
+{
+ struct ceph_cap_flush *prev;
+ bool wake = cf->wake;
+
+ if (wake && cf->i_list.prev != &ci->i_cap_flush_list) {
+ prev = list_prev_entry(cf, i_list);
+ prev->wake = true;
+ wake = false;
+ }
+ list_del(&cf->i_list);
return wake;
}
@@ -1953,6 +1997,9 @@ retry_locked:
}
for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
+ int mflags = 0;
+ struct cap_msg_args arg;
+
cap = rb_entry(p, struct ceph_cap, ci_node);
/* avoid looping forever */
@@ -2030,12 +2077,24 @@ ack:
if (mutex_trylock(&session->s_mutex) == 0) {
dout("inverting session/ino locks on %p\n",
session);
+ session = ceph_get_mds_session(session);
spin_unlock(&ci->i_ceph_lock);
if (took_snap_rwsem) {
up_read(&mdsc->snap_rwsem);
took_snap_rwsem = 0;
}
- mutex_lock(&session->s_mutex);
+ if (session) {
+ mutex_lock(&session->s_mutex);
+ ceph_put_mds_session(session);
+ } else {
+ /*
+ * Because we take the reference while
+ * holding the i_ceph_lock, it should
+ * never be NULL. Throw a warning if it
+ * ever is.
+ */
+ WARN_ON_ONCE(true);
+ }
goto retry;
}
}
@@ -2070,6 +2129,9 @@ ack:
flushing = ci->i_dirty_caps;
flush_tid = __mark_caps_flushing(inode, session, false,
&oldest_flush_tid);
+ if (flags & CHECK_CAPS_FLUSH &&
+ list_empty(&session->s_cap_dirty))
+ mflags |= CEPH_CLIENT_CAPS_SYNC;
} else {
flushing = 0;
flush_tid = 0;
@@ -2080,9 +2142,12 @@ ack:
mds = cap->mds; /* remember mds, so we don't repeat */
- /* __send_cap drops i_ceph_lock */
- __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, 0, cap_used, want,
- retain, flushing, flush_tid, oldest_flush_tid);
+ __prep_cap(&arg, cap, CEPH_CAP_OP_UPDATE, mflags, cap_used,
+ want, retain, flushing, flush_tid, oldest_flush_tid);
+ spin_unlock(&ci->i_ceph_lock);
+
+ __send_cap(mdsc, &arg, ci);
+
goto retry; /* retake i_ceph_lock and restart our cap scan. */
}
@@ -2121,6 +2186,7 @@ retry:
retry_locked:
if (ci->i_dirty_caps && ci->i_auth_cap) {
struct ceph_cap *cap = ci->i_auth_cap;
+ struct cap_msg_args arg;
if (session != cap->session) {
spin_unlock(&ci->i_ceph_lock);
@@ -2148,11 +2214,13 @@ retry_locked:
flush_tid = __mark_caps_flushing(inode, session, true,
&oldest_flush_tid);
- /* __send_cap drops i_ceph_lock */
- __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, CEPH_CLIENT_CAPS_SYNC,
+ __prep_cap(&arg, cap, CEPH_CAP_OP_FLUSH, CEPH_CLIENT_CAPS_SYNC,
__ceph_caps_used(ci), __ceph_caps_wanted(ci),
(cap->issued | cap->implemented),
flushing, flush_tid, oldest_flush_tid);
+ spin_unlock(&ci->i_ceph_lock);
+
+ __send_cap(mdsc, &arg, ci);
} else {
if (!list_empty(&ci->i_cap_flush_list)) {
struct ceph_cap_flush *cf =
@@ -2354,15 +2422,19 @@ static void __kick_flushing_caps(struct ceph_mds_client *mdsc,
first_tid = cf->tid + 1;
if (cf->caps) {
+ struct cap_msg_args arg;
+
dout("kick_flushing_caps %p cap %p tid %llu %s\n",
inode, cap, cf->tid, ceph_cap_string(cf->caps));
- __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
+ __prep_cap(&arg, cap, CEPH_CAP_OP_FLUSH,
(cf->tid < last_snap_flush ?
CEPH_CLIENT_CAPS_PENDING_CAPSNAP : 0),
__ceph_caps_used(ci),
__ceph_caps_wanted(ci),
(cap->issued | cap->implemented),
cf->caps, cf->tid, oldest_flush_tid);
+ spin_unlock(&ci->i_ceph_lock);
+ __send_cap(mdsc, &arg, ci);
} else {
struct ceph_cap_snap *capsnap =
container_of(cf, struct ceph_cap_snap,
@@ -2446,6 +2518,8 @@ void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
struct ceph_cap *cap;
u64 oldest_flush_tid;
+ lockdep_assert_held(&session->s_mutex);
+
dout("kick_flushing_caps mds%d\n", session->s_mds);
spin_lock(&mdsc->cap_dirty_lock);
@@ -2685,6 +2759,11 @@ out_unlock:
if (snap_rwsem_locked)
up_read(&mdsc->snap_rwsem);
+ if (!ret)
+ ceph_update_cap_mis(&mdsc->metric);
+ else if (ret == 1)
+ ceph_update_cap_hit(&mdsc->metric);
+
dout("get_cap_refs %p ret %d got %s\n", inode,
ret, ceph_cap_string(*got));
return ret;
@@ -2937,7 +3016,8 @@ static int ceph_try_drop_cap_snap(struct ceph_inode_info *ci,
* If we are releasing a WR cap (from a sync write), finalize any affected
* cap_snap, and wake up any waiters.
*/
-void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
+static void __ceph_put_cap_refs(struct ceph_inode_info *ci, int had,
+ bool skip_checking_caps)
{
struct inode *inode = &ci->vfs_inode;
int last = 0, put = 0, flushsnaps = 0, wake = 0;
@@ -2993,7 +3073,7 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
dout("put_cap_refs %p had %s%s%s\n", inode, ceph_cap_string(had),
last ? " last" : "", put ? " put" : "");
- if (last)
+ if (last && !skip_checking_caps)
ceph_check_caps(ci, 0, NULL);
else if (flushsnaps)
ceph_flush_snaps(ci, NULL);
@@ -3003,6 +3083,16 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
iput(inode);
}
+void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
+{
+ __ceph_put_cap_refs(ci, had, false);
+}
+
+void ceph_put_cap_refs_no_check_caps(struct ceph_inode_info *ci, int had)
+{
+ __ceph_put_cap_refs(ci, had, true);
+}
+
/*
* Release @nr WRBUFFER refs on dirty pages for the given @snapc snap
* context. Adjust per-snap dirty page accounting as appropriate.
@@ -3301,10 +3391,6 @@ static void handle_cap_grant(struct inode *inode,
ci->i_requested_max_size = 0;
}
wake = true;
- } else if (ci->i_wanted_max_size > ci->i_max_size &&
- ci->i_wanted_max_size > ci->i_requested_max_size) {
- /* CEPH_CAP_OP_IMPORT */
- wake = true;
}
}
@@ -3380,9 +3466,18 @@ static void handle_cap_grant(struct inode *inode,
fill_inline = true;
}
- if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
+ if (ci->i_auth_cap == cap &&
+ le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
if (newcaps & ~extra_info->issued)
wake = true;
+
+ if (ci->i_requested_max_size > max_size ||
+ !(le32_to_cpu(grant->wanted) & CEPH_CAP_ANY_FILE_WR)) {
+ /* re-request max_size if necessary */
+ ci->i_requested_max_size = 0;
+ wake = true;
+ }
+
ceph_kick_flushing_inode_caps(session, ci);
spin_unlock(&ci->i_ceph_lock);
up_read(&session->s_mdsc->snap_rwsem);
@@ -3442,15 +3537,26 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
bool wake_mdsc = false;
list_for_each_entry_safe(cf, tmp_cf, &ci->i_cap_flush_list, i_list) {
+ /* Is this the one that was flushed? */
if (cf->tid == flush_tid)
cleaned = cf->caps;
- if (cf->caps == 0) /* capsnap */
+
+ /* Is this a capsnap? */
+ if (cf->caps == 0)
continue;
+
if (cf->tid <= flush_tid) {
- if (__finish_cap_flush(NULL, ci, cf))
- wake_ci = true;
+ /*
+ * An earlier or current tid. The FLUSH_ACK should
+ * represent a superset of this flush's caps.
+ */
+ wake_ci |= __detach_cap_flush_from_ci(ci, cf);
list_add_tail(&cf->i_list, &to_remove);
} else {
+ /*
+ * This is a later one. Any caps in it are still dirty
+ * so don't count them as cleaned.
+ */
cleaned &= ~cf->caps;
if (!cleaned)
break;
@@ -3470,10 +3576,8 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
spin_lock(&mdsc->cap_dirty_lock);
- list_for_each_entry(cf, &to_remove, i_list) {
- if (__finish_cap_flush(mdsc, NULL, cf))
- wake_mdsc = true;
- }
+ list_for_each_entry(cf, &to_remove, i_list)
+ wake_mdsc |= __detach_cap_flush_from_mdsc(mdsc, cf);
if (ci->i_flushing_caps == 0) {
if (list_empty(&ci->i_cap_flush_list)) {
@@ -3565,17 +3669,15 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid,
dout(" removing %p cap_snap %p follows %lld\n",
inode, capsnap, follows);
list_del(&capsnap->ci_item);
- if (__finish_cap_flush(NULL, ci, &capsnap->cap_flush))
- wake_ci = true;
+ wake_ci |= __detach_cap_flush_from_ci(ci, &capsnap->cap_flush);
spin_lock(&mdsc->cap_dirty_lock);
if (list_empty(&ci->i_cap_flush_list))
list_del_init(&ci->i_flushing_item);
- if (__finish_cap_flush(mdsc, NULL, &capsnap->cap_flush))
- wake_mdsc = true;
-
+ wake_mdsc |= __detach_cap_flush_from_mdsc(mdsc,
+ &capsnap->cap_flush);
spin_unlock(&mdsc->cap_dirty_lock);
}
spin_unlock(&ci->i_ceph_lock);
@@ -3595,10 +3697,9 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid,
*
* caller hold s_mutex.
*/
-static void handle_cap_trunc(struct inode *inode,
+static bool handle_cap_trunc(struct inode *inode,
struct ceph_mds_caps *trunc,
struct ceph_mds_session *session)
- __releases(ci->i_ceph_lock)
{
struct ceph_inode_info *ci = ceph_inode(inode);
int mds = session->s_mds;
@@ -3609,7 +3710,9 @@ static void handle_cap_trunc(struct inode *inode,
int implemented = 0;
int dirty = __ceph_caps_dirty(ci);
int issued = __ceph_caps_issued(ceph_inode(inode), &implemented);
- int queue_trunc = 0;
+ bool queue_trunc = false;
+
+ lockdep_assert_held(&ci->i_ceph_lock);
issued |= implemented | dirty;
@@ -3617,10 +3720,7 @@ static void handle_cap_trunc(struct inode *inode,
inode, mds, seq, truncate_size, truncate_seq);
queue_trunc = ceph_fill_file_size(inode, issued,
truncate_seq, truncate_size, size);
- spin_unlock(&ci->i_ceph_lock);
-
- if (queue_trunc)
- ceph_queue_vmtruncate(inode);
+ return queue_trunc;
}
/*
@@ -3694,15 +3794,9 @@ retry:
tcap->issue_seq = t_seq - 1;
tcap->issued |= issued;
tcap->implemented |= issued;
- if (cap == ci->i_auth_cap)
+ if (cap == ci->i_auth_cap) {
ci->i_auth_cap = tcap;
-
- if (!list_empty(&ci->i_cap_flush_list) &&
- ci->i_auth_cap == tcap) {
- spin_lock(&mdsc->cap_dirty_lock);
- list_move_tail(&ci->i_flushing_item,
- &tcap->session->s_cap_flushing);
- spin_unlock(&mdsc->cap_dirty_lock);
+ change_auth_cap_ses(ci, tcap->session);
}
}
__ceph_remove_cap(cap, false);
@@ -3771,7 +3865,6 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
struct ceph_mds_cap_peer *ph,
struct ceph_mds_session *session,
struct ceph_cap **target_cap, int *old_issued)
- __acquires(ci->i_ceph_lock)
{
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_cap *cap, *ocap, *new_cap = NULL;
@@ -3796,14 +3889,13 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
dout("handle_cap_import inode %p ci %p mds%d mseq %d peer %d\n",
inode, ci, mds, mseq, peer);
-
retry:
- spin_lock(&ci->i_ceph_lock);
cap = __get_cap_for_mds(ci, mds);
if (!cap) {
if (!new_cap) {
spin_unlock(&ci->i_ceph_lock);
new_cap = ceph_get_cap(mdsc, NULL);
+ spin_lock(&ci->i_ceph_lock);
goto retry;
}
cap = new_cap;
@@ -3838,9 +3930,6 @@ retry:
__ceph_remove_cap(ocap, (ph->flags & CEPH_CAP_FLAG_RELEASE));
}
- /* make sure we re-request max_size, if necessary */
- ci->i_requested_max_size = 0;
-
*old_issued = issued;
*target_cap = cap;
}
@@ -3869,6 +3958,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
size_t snaptrace_len;
void *p, *end;
struct cap_extra_info extra_info = {};
+ bool queue_trunc;
dout("handle_caps from mds%d\n", session->s_mds);
@@ -4016,6 +4106,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
} else {
down_read(&mdsc->snap_rwsem);
}
+ spin_lock(&ci->i_ceph_lock);
handle_cap_import(mdsc, inode, h, peer, session,
&cap, &extra_info.issued);
handle_cap_grant(inode, session, cap,
@@ -4052,7 +4143,10 @@ void ceph_handle_caps(struct ceph_mds_session *session,
break;
case CEPH_CAP_OP_TRUNC:
- handle_cap_trunc(inode, h, session);
+ queue_trunc = handle_cap_trunc(inode, h, session);
+ spin_unlock(&ci->i_ceph_lock);
+ if (queue_trunc)
+ ceph_queue_vmtruncate(inode);
break;
default:
@@ -4121,15 +4215,16 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
/*
* Flush all dirty caps to the mds
*/
-void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc)
+static void flush_dirty_session_caps(struct ceph_mds_session *s)
{
+ struct ceph_mds_client *mdsc = s->s_mdsc;
struct ceph_inode_info *ci;
struct inode *inode;
dout("flush_dirty_caps\n");
spin_lock(&mdsc->cap_dirty_lock);
- while (!list_empty(&mdsc->cap_dirty)) {
- ci = list_first_entry(&mdsc->cap_dirty, struct ceph_inode_info,
+ while (!list_empty(&s->s_cap_dirty)) {
+ ci = list_first_entry(&s->s_cap_dirty, struct ceph_inode_info,
i_dirty_item);
inode = &ci->vfs_inode;
ihold(inode);
@@ -4143,6 +4238,35 @@ void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc)
dout("flush_dirty_caps done\n");
}
+static void iterate_sessions(struct ceph_mds_client *mdsc,
+ void (*cb)(struct ceph_mds_session *))
+{
+ int mds;
+
+ mutex_lock(&mdsc->mutex);
+ for (mds = 0; mds < mdsc->max_sessions; ++mds) {
+ struct ceph_mds_session *s;
+
+ if (!mdsc->sessions[mds])
+ continue;
+
+ s = ceph_get_mds_session(mdsc->sessions[mds]);
+ if (!s)
+ continue;
+
+ mutex_unlock(&mdsc->mutex);
+ cb(s);
+ ceph_put_mds_session(s);
+ mutex_lock(&mdsc->mutex);
+ }
+ mutex_unlock(&mdsc->mutex);
+}
+
+void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc)
+{
+ iterate_sessions(mdsc, flush_dirty_session_caps);
+}
+
void __ceph_touch_fmode(struct ceph_inode_info *ci,
struct ceph_mds_client *mdsc, int fmode)
{
@@ -4269,6 +4393,9 @@ int ceph_encode_inode_release(void **p, struct inode *inode,
cap->issued &= ~drop;
cap->implemented &= ~drop;
cap->mds_wanted = wanted;
+ if (cap == ci->i_auth_cap &&
+ !(wanted & CEPH_CAP_ANY_FILE_WR))
+ ci->i_requested_max_size = 0;
} else {
dout("encode_inode_release %p cap %p %s"
" (force)\n", inode, cap,
diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c
index dcaed75de9e6..070ed8481340 100644
--- a/fs/ceph/debugfs.c
+++ b/fs/ceph/debugfs.c
@@ -7,6 +7,8 @@
#include <linux/ctype.h>
#include <linux/debugfs.h>
#include <linux/seq_file.h>
+#include <linux/math64.h>
+#include <linux/ktime.h>
#include <linux/ceph/libceph.h>
#include <linux/ceph/mon_client.h>
@@ -18,6 +20,7 @@
#ifdef CONFIG_DEBUG_FS
#include "mds_client.h"
+#include "metric.h"
static int mdsmap_show(struct seq_file *s, void *p)
{
@@ -124,6 +127,87 @@ static int mdsc_show(struct seq_file *s, void *p)
return 0;
}
+#define CEPH_METRIC_SHOW(name, total, avg, min, max, sq) { \
+ s64 _total, _avg, _min, _max, _sq, _st; \
+ _avg = ktime_to_us(avg); \
+ _min = ktime_to_us(min == KTIME_MAX ? 0 : min); \
+ _max = ktime_to_us(max); \
+ _total = total - 1; \
+ _sq = _total > 0 ? DIV64_U64_ROUND_CLOSEST(sq, _total) : 0; \
+ _st = int_sqrt64(_sq); \
+ _st = ktime_to_us(_st); \
+ seq_printf(s, "%-14s%-12lld%-16lld%-16lld%-16lld%lld\n", \
+ name, total, _avg, _min, _max, _st); \
+}
+
+static int metric_show(struct seq_file *s, void *p)
+{
+ struct ceph_fs_client *fsc = s->private;
+ struct ceph_mds_client *mdsc = fsc->mdsc;
+ struct ceph_client_metric *m = &mdsc->metric;
+ int i, nr_caps = 0;
+ s64 total, sum, avg, min, max, sq;
+
+ seq_printf(s, "item total avg_lat(us) min_lat(us) max_lat(us) stdev(us)\n");
+ seq_printf(s, "-----------------------------------------------------------------------------------\n");
+
+ spin_lock(&m->read_latency_lock);
+ total = m->total_reads;
+ sum = m->read_latency_sum;
+ avg = total > 0 ? DIV64_U64_ROUND_CLOSEST(sum, total) : 0;
+ min = m->read_latency_min;
+ max = m->read_latency_max;
+ sq = m->read_latency_sq_sum;
+ spin_unlock(&m->read_latency_lock);
+ CEPH_METRIC_SHOW("read", total, avg, min, max, sq);
+
+ spin_lock(&m->write_latency_lock);
+ total = m->total_writes;
+ sum = m->write_latency_sum;
+ avg = total > 0 ? DIV64_U64_ROUND_CLOSEST(sum, total) : 0;
+ min = m->write_latency_min;
+ max = m->write_latency_max;
+ sq = m->write_latency_sq_sum;
+ spin_unlock(&m->write_latency_lock);
+ CEPH_METRIC_SHOW("write", total, avg, min, max, sq);
+
+ spin_lock(&m->metadata_latency_lock);
+ total = m->total_metadatas;
+ sum = m->metadata_latency_sum;
+ avg = total > 0 ? DIV64_U64_ROUND_CLOSEST(sum, total) : 0;
+ min = m->metadata_latency_min;
+ max = m->metadata_latency_max;
+ sq = m->metadata_latency_sq_sum;
+ spin_unlock(&m->metadata_latency_lock);
+ CEPH_METRIC_SHOW("metadata", total, avg, min, max, sq);
+
+ seq_printf(s, "\n");
+ seq_printf(s, "item total miss hit\n");
+ seq_printf(s, "-------------------------------------------------\n");
+
+ seq_printf(s, "%-14s%-16lld%-16lld%lld\n", "d_lease",
+ atomic64_read(&m->total_dentries),
+ percpu_counter_sum(&m->d_lease_mis),
+ percpu_counter_sum(&m->d_lease_hit));
+
+ mutex_lock(&mdsc->mutex);
+ for (i = 0; i < mdsc->max_sessions; i++) {
+ struct ceph_mds_session *s;
+
+ s = __ceph_lookup_mds_session(mdsc, i);
+ if (!s)
+ continue;
+ nr_caps += s->s_nr_caps;
+ ceph_put_mds_session(s);
+ }
+ mutex_unlock(&mdsc->mutex);
+ seq_printf(s, "%-14s%-16d%-16lld%lld\n", "caps", nr_caps,
+ percpu_counter_sum(&m->i_caps_mis),
+ percpu_counter_sum(&m->i_caps_hit));
+
+ return 0;
+}
+
static int caps_show_cb(struct inode *inode, struct ceph_cap *cap, void *p)
{
struct seq_file *s = p;
@@ -222,6 +306,7 @@ DEFINE_SHOW_ATTRIBUTE(mdsmap);
DEFINE_SHOW_ATTRIBUTE(mdsc);
DEFINE_SHOW_ATTRIBUTE(caps);
DEFINE_SHOW_ATTRIBUTE(mds_sessions);
+DEFINE_SHOW_ATTRIBUTE(metric);
/*
@@ -255,6 +340,7 @@ void ceph_fs_debugfs_cleanup(struct ceph_fs_client *fsc)
debugfs_remove(fsc->debugfs_mdsmap);
debugfs_remove(fsc->debugfs_mds_sessions);
debugfs_remove(fsc->debugfs_caps);
+ debugfs_remove(fsc->debugfs_metric);
debugfs_remove(fsc->debugfs_mdsc);
}
@@ -295,11 +381,17 @@ void ceph_fs_debugfs_init(struct ceph_fs_client *fsc)
fsc,
&mdsc_fops);
+ fsc->debugfs_metric = debugfs_create_file("metrics",
+ 0400,
+ fsc->client->debugfs_dir,
+ fsc,
+ &metric_fops);
+
fsc->debugfs_caps = debugfs_create_file("caps",
- 0400,
- fsc->client->debugfs_dir,
- fsc,
- &caps_fops);
+ 0400,
+ fsc->client->debugfs_dir,
+ fsc,
+ &caps_fops);
}
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 4c4202c93b71..39f5311404b0 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -38,6 +38,8 @@ static int __dir_lease_try_check(const struct dentry *dentry);
static int ceph_d_init(struct dentry *dentry)
{
struct ceph_dentry_info *di;
+ struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
+ struct ceph_mds_client *mdsc = fsc->mdsc;
di = kmem_cache_zalloc(ceph_dentry_cachep, GFP_KERNEL);
if (!di)
@@ -48,6 +50,9 @@ static int ceph_d_init(struct dentry *dentry)
di->time = jiffies;
dentry->d_fsdata = di;
INIT_LIST_HEAD(&di->lease_list);
+
+ atomic64_inc(&mdsc->metric.total_dentries);
+
return 0;
}
@@ -344,8 +349,9 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
!ceph_test_mount_opt(fsc, NOASYNCREADDIR) &&
ceph_snap(inode) != CEPH_SNAPDIR &&
__ceph_dir_is_complete_ordered(ci) &&
- __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) {
+ __ceph_caps_issued_mask_metric(ci, CEPH_CAP_FILE_SHARED, 1)) {
int shared_gen = atomic_read(&ci->i_shared_gen);
+
spin_unlock(&ci->i_ceph_lock);
err = __dcache_readdir(file, ctx, shared_gen);
if (err != -EAGAIN)
@@ -762,7 +768,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
!is_root_ceph_dentry(dir, dentry) &&
ceph_test_mount_opt(fsc, DCACHE) &&
__ceph_dir_is_complete(ci) &&
- (__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1))) {
+ __ceph_caps_issued_mask_metric(ci, CEPH_CAP_FILE_SHARED, 1)) {
__ceph_touch_fmode(ci, mdsc, CEPH_FILE_MODE_RD);
spin_unlock(&ci->i_ceph_lock);
dout(" dir %p complete, -ENOENT\n", dir);
@@ -1203,11 +1209,12 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
op = CEPH_MDS_OP_RENAMESNAP;
else
return -EROFS;
+ } else if (old_dir != new_dir) {
+ err = ceph_quota_check_rename(mdsc, d_inode(old_dentry),
+ new_dir);
+ if (err)
+ return err;
}
- /* don't allow cross-quota renames */
- if ((old_dir != new_dir) &&
- (!ceph_quota_is_same_realm(old_dir, new_dir)))
- return -EXDEV;
dout("rename dir %p dentry %p to dir %p dentry %p\n",
old_dir, old_dentry, new_dir, new_dentry);
@@ -1709,6 +1716,8 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
if (flags & LOOKUP_RCU)
return -ECHILD;
+ percpu_counter_inc(&mdsc->metric.d_lease_mis);
+
op = ceph_snap(dir) == CEPH_SNAPDIR ?
CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP;
req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS);
@@ -1740,6 +1749,8 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
dout("d_revalidate %p lookup result=%d\n",
dentry, err);
}
+ } else {
+ percpu_counter_inc(&mdsc->metric.d_lease_hit);
}
dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid");
@@ -1782,9 +1793,12 @@ static int ceph_d_delete(const struct dentry *dentry)
static void ceph_d_release(struct dentry *dentry)
{
struct ceph_dentry_info *di = ceph_dentry(dentry);
+ struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
dout("d_release %p\n", dentry);
+ atomic64_dec(&fsc->mdsc->metric.total_dentries);
+
spin_lock(&dentry->d_lock);
__dentry_lease_unlist(di);
dentry->d_fsdata = NULL;
diff --git a/fs/ceph/export.c b/fs/ceph/export.c
index 79dc06881e78..e088843a7734 100644
--- a/fs/ceph/export.c
+++ b/fs/ceph/export.c
@@ -172,9 +172,16 @@ struct inode *ceph_lookup_inode(struct super_block *sb, u64 ino)
static struct dentry *__fh_to_dentry(struct super_block *sb, u64 ino)
{
struct inode *inode = __lookup_inode(sb, ino);
+ int err;
+
if (IS_ERR(inode))
return ERR_CAST(inode);
- if (inode->i_nlink == 0) {
+ /* We need LINK caps to reliably check i_nlink */
+ err = ceph_do_getattr(inode, CEPH_CAP_LINK_SHARED, false);
+ if (err)
+ return ERR_PTR(err);
+ /* -ESTALE if inode as been unlinked and no file is open */
+ if ((inode->i_nlink == 0) && (atomic_read(&inode->i_count) == 1)) {
iput(inode);
return ERR_PTR(-ESTALE);
}
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index afdfca965a7f..160644ddaeed 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -11,11 +11,13 @@
#include <linux/writeback.h>
#include <linux/falloc.h>
#include <linux/iversion.h>
+#include <linux/ktime.h>
#include "super.h"
#include "mds_client.h"
#include "cache.h"
#include "io.h"
+#include "metric.h"
static __le32 ceph_flags_sys2wire(u32 flags)
{
@@ -906,6 +908,12 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
ret = ceph_osdc_start_request(osdc, req, false);
if (!ret)
ret = ceph_osdc_wait_request(osdc, req);
+
+ ceph_update_read_latency(&fsc->mdsc->metric,
+ req->r_start_latency,
+ req->r_end_latency,
+ ret);
+
ceph_osdc_put_request(req);
i_size = i_size_read(inode);
@@ -1044,6 +1052,8 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
struct inode *inode = req->r_inode;
struct ceph_aio_request *aio_req = req->r_priv;
struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
+ struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+ struct ceph_client_metric *metric = &fsc->mdsc->metric;
BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_BVECS);
BUG_ON(!osd_data->num_bvecs);
@@ -1051,6 +1061,16 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
dout("ceph_aio_complete_req %p rc %d bytes %u\n",
inode, rc, osd_data->bvec_pos.iter.bi_size);
+ /* r_start_latency == 0 means the request was not submitted */
+ if (req->r_start_latency) {
+ if (aio_req->write)
+ ceph_update_write_latency(metric, req->r_start_latency,
+ req->r_end_latency, rc);
+ else
+ ceph_update_read_latency(metric, req->r_start_latency,
+ req->r_end_latency, rc);
+ }
+
if (rc == -EOLDSNAPC) {
struct ceph_aio_work *aio_work;
BUG_ON(!aio_req->write);
@@ -1179,6 +1199,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
struct inode *inode = file_inode(file);
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+ struct ceph_client_metric *metric = &fsc->mdsc->metric;
struct ceph_vino vino;
struct ceph_osd_request *req;
struct bio_vec *bvecs;
@@ -1295,6 +1316,13 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
if (!ret)
ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
+ if (write)
+ ceph_update_write_latency(metric, req->r_start_latency,
+ req->r_end_latency, ret);
+ else
+ ceph_update_read_latency(metric, req->r_start_latency,
+ req->r_end_latency, ret);
+
size = i_size_read(inode);
if (!write) {
if (ret == -ENOENT)
@@ -1466,6 +1494,8 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
if (!ret)
ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
+ ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
+ req->r_end_latency, ret);
out:
ceph_osdc_put_request(req);
if (ret != 0) {
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 7fef94fd1e55..357c937699d5 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -2288,8 +2288,8 @@ int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
dout("do_getattr inode %p mask %s mode 0%o\n",
inode, ceph_cap_string(mask), inode->i_mode);
- if (!force && ceph_caps_issued_mask(ceph_inode(inode), mask, 1))
- return 0;
+ if (!force && ceph_caps_issued_mask_metric(ceph_inode(inode), mask, 1))
+ return 0;
mode = (mask & CEPH_STAT_RSTAT) ? USE_AUTH_MDS : USE_ANY_MDS;
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, mode);
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 7c63abf5bea9..a50497142e59 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -10,6 +10,7 @@
#include <linux/seq_file.h>
#include <linux/ratelimit.h>
#include <linux/bits.h>
+#include <linux/ktime.h>
#include "super.h"
#include "mds_client.h"
@@ -658,6 +659,7 @@ void ceph_put_mds_session(struct ceph_mds_session *s)
if (refcount_dec_and_test(&s->s_ref)) {
if (s->s_auth.authorizer)
ceph_auth_destroy_authorizer(s->s_auth.authorizer);
+ WARN_ON(mutex_is_locked(&s->s_mutex));
xa_destroy(&s->s_delegated_inos);
kfree(s);
}
@@ -753,6 +755,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
INIT_LIST_HEAD(&s->s_cap_releases);
INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
+ INIT_LIST_HEAD(&s->s_cap_dirty);
INIT_LIST_HEAD(&s->s_cap_flushing);
mdsc->sessions[mds] = s;
@@ -801,7 +804,7 @@ void ceph_mdsc_release_request(struct kref *kref)
struct ceph_mds_request *req = container_of(kref,
struct ceph_mds_request,
r_kref);
- ceph_mdsc_release_dir_caps(req);
+ ceph_mdsc_release_dir_caps_no_check(req);
destroy_reply_info(&req->r_reply_info);
if (req->r_request)
ceph_msg_put(req->r_request);
@@ -2201,6 +2204,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
mutex_init(&req->r_fill_mutex);
req->r_mdsc = mdsc;
req->r_started = jiffies;
+ req->r_start_latency = ktime_get();
req->r_resend_mds = -1;
INIT_LIST_HEAD(&req->r_unsafe_dir_item);
INIT_LIST_HEAD(&req->r_unsafe_target_item);
@@ -2547,6 +2551,8 @@ out:
static void complete_request(struct ceph_mds_client *mdsc,
struct ceph_mds_request *req)
{
+ req->r_end_latency = ktime_get();
+
if (req->r_callback)
req->r_callback(mdsc, req);
complete_all(&req->r_completion);
@@ -3155,6 +3161,9 @@ out_err:
/* kick calling process */
complete_request(mdsc, req);
+
+ ceph_update_metadata_latency(&mdsc->metric, req->r_start_latency,
+ req->r_end_latency, err);
out:
ceph_mdsc_put_request(req);
return;
@@ -3393,6 +3402,18 @@ void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req)
}
}
+void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req)
+{
+ int dcaps;
+
+ dcaps = xchg(&req->r_dir_caps, 0);
+ if (dcaps) {
+ dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
+ ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent),
+ dcaps);
+ }
+}
+
/*
* called under session->mutex.
*/
@@ -3425,7 +3446,7 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
if (req->r_session->s_mds != session->s_mds)
continue;
- ceph_mdsc_release_dir_caps(req);
+ ceph_mdsc_release_dir_caps_no_check(req);
__send_request(mdsc, session, req, true);
}
@@ -3760,8 +3781,6 @@ fail:
* recovering MDS might have.
*
* This is a relatively heavyweight operation, but it's rare.
- *
- * called with mdsc->mutex held.
*/
static void send_mds_reconnect(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session)
@@ -4015,7 +4034,11 @@ static void check_new_map(struct ceph_mds_client *mdsc,
oldstate != CEPH_MDS_STATE_STARTING)
pr_info("mds%d recovery completed\n", s->s_mds);
kick_requests(mdsc, i);
+ mutex_unlock(&mdsc->mutex);
+ mutex_lock(&s->s_mutex);
+ mutex_lock(&mdsc->mutex);
ceph_kick_flushing_caps(mdsc, s);
+ mutex_unlock(&s->s_mutex);
wake_up_session_caps(s, RECONNECT);
}
}
@@ -4323,6 +4346,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
{
struct ceph_mds_client *mdsc;
+ int err;
mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
if (!mdsc)
@@ -4331,8 +4355,8 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
mutex_init(&mdsc->mutex);
mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
if (!mdsc->mdsmap) {
- kfree(mdsc);
- return -ENOMEM;
+ err = -ENOMEM;
+ goto err_mdsc;
}
fsc->mdsc = mdsc;
@@ -4364,13 +4388,15 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
spin_lock_init(&mdsc->snap_flush_lock);
mdsc->last_cap_flush_tid = 1;
INIT_LIST_HEAD(&mdsc->cap_flush_list);
- INIT_LIST_HEAD(&mdsc->cap_dirty);
INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
mdsc->num_cap_flushing = 0;
spin_lock_init(&mdsc->cap_dirty_lock);
init_waitqueue_head(&mdsc->cap_flushing_wq);
INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
atomic_set(&mdsc->cap_reclaim_pending, 0);
+ err = ceph_metric_init(&mdsc->metric);
+ if (err)
+ goto err_mdsmap;
spin_lock_init(&mdsc->dentry_list_lock);
INIT_LIST_HEAD(&mdsc->dentry_leases);
@@ -4389,6 +4415,12 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
strscpy(mdsc->nodename, utsname()->nodename,
sizeof(mdsc->nodename));
return 0;
+
+err_mdsmap:
+ kfree(mdsc->mdsmap);
+err_mdsc:
+ kfree(mdsc);
+ return err;
}
/*
@@ -4646,6 +4678,8 @@ void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
ceph_mdsc_stop(mdsc);
+ ceph_metric_destroy(&mdsc->metric);
+
fsc->mdsc = NULL;
kfree(mdsc);
dout("mdsc_destroy %p done\n", mdsc);
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 903d9edfd4bf..5e0c4073a6be 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -10,12 +10,15 @@
#include <linux/spinlock.h>
#include <linux/refcount.h>
#include <linux/utsname.h>
+#include <linux/ktime.h>
#include <linux/ceph/types.h>
#include <linux/ceph/messenger.h>
#include <linux/ceph/mdsmap.h>
#include <linux/ceph/auth.h>
+#include "metric.h"
+
/* The first 8 bits are reserved for old ceph releases */
enum ceph_feature_type {
CEPHFS_FEATURE_MIMIC = 8,
@@ -196,8 +199,12 @@ struct ceph_mds_session {
struct list_head s_cap_releases; /* waiting cap_release messages */
struct work_struct s_cap_release_work;
- /* protected by mutex */
+ /* See ceph_inode_info->i_dirty_item. */
+ struct list_head s_cap_dirty; /* inodes w/ dirty caps */
+
+ /* See ceph_inode_info->i_flushing_item. */
struct list_head s_cap_flushing; /* inodes w/ flushing caps */
+
unsigned long s_renew_requested; /* last time we sent a renew req */
u64 s_renew_seq;
@@ -297,6 +304,8 @@ struct ceph_mds_request {
unsigned long r_timeout; /* optional. jiffies, 0 is "wait forever" */
unsigned long r_started; /* start time to measure timeout against */
+ unsigned long r_start_latency; /* start time to measure latency */
+ unsigned long r_end_latency; /* finish time to measure latency */
unsigned long r_request_started; /* start time for mds request only,
used to measure lease durations */
@@ -419,7 +428,6 @@ struct ceph_mds_client {
u64 last_cap_flush_tid;
struct list_head cap_flush_list;
- struct list_head cap_dirty; /* inodes with dirty caps */
struct list_head cap_dirty_migrating; /* ...that are migration... */
int num_cap_flushing; /* # caps we are flushing */
spinlock_t cap_dirty_lock; /* protects above items */
@@ -454,6 +462,8 @@ struct ceph_mds_client {
struct list_head dentry_leases; /* fifo list */
struct list_head dentry_dir_leases; /* lru list */
+ struct ceph_client_metric metric;
+
spinlock_t snapid_map_lock;
struct rb_root snapid_map_tree;
struct list_head snapid_map_lru;
@@ -497,6 +507,7 @@ extern int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
struct inode *dir,
struct ceph_mds_request *req);
extern void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req);
+extern void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req);
static inline void ceph_mdsc_get_request(struct ceph_mds_request *req)
{
kref_get(&req->r_kref);
diff --git a/fs/ceph/metric.c b/fs/ceph/metric.c
new file mode 100644
index 000000000000..9217f35bc2b9
--- /dev/null
+++ b/fs/ceph/metric.c
@@ -0,0 +1,148 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+
+#include <linux/types.h>
+#include <linux/percpu_counter.h>
+#include <linux/math64.h>
+
+#include "metric.h"
+
+int ceph_metric_init(struct ceph_client_metric *m)
+{
+ int ret;
+
+ if (!m)
+ return -EINVAL;
+
+ atomic64_set(&m->total_dentries, 0);
+ ret = percpu_counter_init(&m->d_lease_hit, 0, GFP_KERNEL);
+ if (ret)
+ return ret;
+
+ ret = percpu_counter_init(&m->d_lease_mis, 0, GFP_KERNEL);
+ if (ret)
+ goto err_d_lease_mis;
+
+ ret = percpu_counter_init(&m->i_caps_hit, 0, GFP_KERNEL);
+ if (ret)
+ goto err_i_caps_hit;
+
+ ret = percpu_counter_init(&m->i_caps_mis, 0, GFP_KERNEL);
+ if (ret)
+ goto err_i_caps_mis;
+
+ spin_lock_init(&m->read_latency_lock);
+ m->read_latency_sq_sum = 0;
+ m->read_latency_min = KTIME_MAX;
+ m->read_latency_max = 0;
+ m->total_reads = 0;
+ m->read_latency_sum = 0;
+
+ spin_lock_init(&m->write_latency_lock);
+ m->write_latency_sq_sum = 0;
+ m->write_latency_min = KTIME_MAX;
+ m->write_latency_max = 0;
+ m->total_writes = 0;
+ m->write_latency_sum = 0;
+
+ spin_lock_init(&m->metadata_latency_lock);
+ m->metadata_latency_sq_sum = 0;
+ m->metadata_latency_min = KTIME_MAX;
+ m->metadata_latency_max = 0;
+ m->total_metadatas = 0;
+ m->metadata_latency_sum = 0;
+
+ return 0;
+
+err_i_caps_mis:
+ percpu_counter_destroy(&m->i_caps_hit);
+err_i_caps_hit:
+ percpu_counter_destroy(&m->d_lease_mis);
+err_d_lease_mis:
+ percpu_counter_destroy(&m->d_lease_hit);
+
+ return ret;
+}
+
+void ceph_metric_destroy(struct ceph_client_metric *m)
+{
+ if (!m)
+ return;
+
+ percpu_counter_destroy(&m->i_caps_mis);
+ percpu_counter_destroy(&m->i_caps_hit);
+ percpu_counter_destroy(&m->d_lease_mis);
+ percpu_counter_destroy(&m->d_lease_hit);
+}
+
+static inline void __update_latency(ktime_t *totalp, ktime_t *lsump,
+ ktime_t *min, ktime_t *max,
+ ktime_t *sq_sump, ktime_t lat)
+{
+ ktime_t total, avg, sq, lsum;
+
+ total = ++(*totalp);
+ lsum = (*lsump += lat);
+
+ if (unlikely(lat < *min))
+ *min = lat;
+ if (unlikely(lat > *max))
+ *max = lat;
+
+ if (unlikely(total == 1))
+ return;
+
+ /* the sq is (lat - old_avg) * (lat - new_avg) */
+ avg = DIV64_U64_ROUND_CLOSEST((lsum - lat), (total - 1));
+ sq = lat - avg;
+ avg = DIV64_U64_ROUND_CLOSEST(lsum, total);
+ sq = sq * (lat - avg);
+ *sq_sump += sq;
+}
+
+void ceph_update_read_latency(struct ceph_client_metric *m,
+ ktime_t r_start, ktime_t r_end,
+ int rc)
+{
+ ktime_t lat = ktime_sub(r_end, r_start);
+
+ if (unlikely(rc < 0 && rc != -ENOENT && rc != -ETIMEDOUT))
+ return;
+
+ spin_lock(&m->read_latency_lock);
+ __update_latency(&m->total_reads, &m->read_latency_sum,
+ &m->read_latency_min, &m->read_latency_max,
+ &m->read_latency_sq_sum, lat);
+ spin_unlock(&m->read_latency_lock);
+}
+
+void ceph_update_write_latency(struct ceph_client_metric *m,
+ ktime_t r_start, ktime_t r_end,
+ int rc)
+{
+ ktime_t lat = ktime_sub(r_end, r_start);
+
+ if (unlikely(rc && rc != -ETIMEDOUT))
+ return;
+
+ spin_lock(&m->write_latency_lock);
+ __update_latency(&m->total_writes, &m->write_latency_sum,
+ &m->write_latency_min, &m->write_latency_max,
+ &m->write_latency_sq_sum, lat);
+ spin_unlock(&m->write_latency_lock);
+}
+
+void ceph_update_metadata_latency(struct ceph_client_metric *m,
+ ktime_t r_start, ktime_t r_end,
+ int rc)
+{
+ ktime_t lat = ktime_sub(r_end, r_start);
+
+ if (unlikely(rc && rc != -ENOENT))
+ return;
+
+ spin_lock(&m->metadata_latency_lock);
+ __update_latency(&m->total_metadatas, &m->metadata_latency_sum,
+ &m->metadata_latency_min, &m->metadata_latency_max,
+ &m->metadata_latency_sq_sum, lat);
+ spin_unlock(&m->metadata_latency_lock);
+}
diff --git a/fs/ceph/metric.h b/fs/ceph/metric.h
new file mode 100644
index 000000000000..ccd81285a450
--- /dev/null
+++ b/fs/ceph/metric.h
@@ -0,0 +1,62 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+#ifndef _FS_CEPH_MDS_METRIC_H
+#define _FS_CEPH_MDS_METRIC_H
+
+#include <linux/types.h>
+#include <linux/percpu_counter.h>
+#include <linux/ktime.h>
+
+/* This is the global metrics */
+struct ceph_client_metric {
+ atomic64_t total_dentries;
+ struct percpu_counter d_lease_hit;
+ struct percpu_counter d_lease_mis;
+
+ struct percpu_counter i_caps_hit;
+ struct percpu_counter i_caps_mis;
+
+ spinlock_t read_latency_lock;
+ u64 total_reads;
+ ktime_t read_latency_sum;
+ ktime_t read_latency_sq_sum;
+ ktime_t read_latency_min;
+ ktime_t read_latency_max;
+
+ spinlock_t write_latency_lock;
+ u64 total_writes;
+ ktime_t write_latency_sum;
+ ktime_t write_latency_sq_sum;
+ ktime_t write_latency_min;
+ ktime_t write_latency_max;
+
+ spinlock_t metadata_latency_lock;
+ u64 total_metadatas;
+ ktime_t metadata_latency_sum;
+ ktime_t metadata_latency_sq_sum;
+ ktime_t metadata_latency_min;
+ ktime_t metadata_latency_max;
+};
+
+extern int ceph_metric_init(struct ceph_client_metric *m);
+extern void ceph_metric_destroy(struct ceph_client_metric *m);
+
+static inline void ceph_update_cap_hit(struct ceph_client_metric *m)
+{
+ percpu_counter_inc(&m->i_caps_hit);
+}
+
+static inline void ceph_update_cap_mis(struct ceph_client_metric *m)
+{
+ percpu_counter_inc(&m->i_caps_mis);
+}
+
+extern void ceph_update_read_latency(struct ceph_client_metric *m,
+ ktime_t r_start, ktime_t r_end,
+ int rc);
+extern void ceph_update_write_latency(struct ceph_client_metric *m,
+ ktime_t r_start, ktime_t r_end,
+ int rc);
+extern void ceph_update_metadata_latency(struct ceph_client_metric *m,
+ ktime_t r_start, ktime_t r_end,
+ int rc);
+#endif /* _FS_CEPH_MDS_METRIC_H */
diff --git a/fs/ceph/quota.c b/fs/ceph/quota.c
index 19507e2fdb57..198ddde5c1e6 100644
--- a/fs/ceph/quota.c
+++ b/fs/ceph/quota.c
@@ -264,7 +264,7 @@ restart:
return NULL;
}
-bool ceph_quota_is_same_realm(struct inode *old, struct inode *new)
+static bool ceph_quota_is_same_realm(struct inode *old, struct inode *new)
{
struct ceph_mds_client *mdsc = ceph_inode_to_client(old)->mdsc;
struct ceph_snap_realm *old_realm, *new_realm;
@@ -361,8 +361,6 @@ restart:
spin_unlock(&ci->i_ceph_lock);
switch (op) {
case QUOTA_CHECK_MAX_FILES_OP:
- exceeded = (max && (rvalue >= max));
- break;
case QUOTA_CHECK_MAX_BYTES_OP:
exceeded = (max && (rvalue + delta > max));
break;
@@ -417,7 +415,7 @@ bool ceph_quota_is_max_files_exceeded(struct inode *inode)
WARN_ON(!S_ISDIR(inode->i_mode));
- return check_quota_exceeded(inode, QUOTA_CHECK_MAX_FILES_OP, 0);
+ return check_quota_exceeded(inode, QUOTA_CHECK_MAX_FILES_OP, 1);
}
/*
@@ -518,3 +516,59 @@ bool ceph_quota_update_statfs(struct ceph_fs_client *fsc, struct kstatfs *buf)
return is_updated;
}
+/*
+ * ceph_quota_check_rename - check if a rename can be executed
+ * @mdsc: MDS client instance
+ * @old: inode to be copied
+ * @new: destination inode (directory)
+ *
+ * This function verifies if a rename (e.g. moving a file or directory) can be
+ * executed. It forces an rstat update in the @new target directory (and in the
+ * source @old as well, if it's a directory). The actual check is done both for
+ * max_files and max_bytes.
+ *
+ * This function returns 0 if it's OK to do the rename, or, if quotas are
+ * exceeded, -EXDEV (if @old is a directory) or -EDQUOT.
+ */
+int ceph_quota_check_rename(struct ceph_mds_client *mdsc,
+ struct inode *old, struct inode *new)
+{
+ struct ceph_inode_info *ci_old = ceph_inode(old);
+ int ret = 0;
+
+ if (ceph_quota_is_same_realm(old, new))
+ return 0;
+
+ /*
+ * Get the latest rstat for target directory (and for source, if a
+ * directory)
+ */
+ ret = ceph_do_getattr(new, CEPH_STAT_RSTAT, false);
+ if (ret)
+ return ret;
+
+ if (S_ISDIR(old->i_mode)) {
+ ret = ceph_do_getattr(old, CEPH_STAT_RSTAT, false);
+ if (ret)
+ return ret;
+ ret = check_quota_exceeded(new, QUOTA_CHECK_MAX_BYTES_OP,
+ ci_old->i_rbytes);
+ if (!ret)
+ ret = check_quota_exceeded(new,
+ QUOTA_CHECK_MAX_FILES_OP,
+ ci_old->i_rfiles +
+ ci_old->i_rsubdirs);
+ if (ret)
+ ret = -EXDEV;
+ } else {
+ ret = check_quota_exceeded(new, QUOTA_CHECK_MAX_BYTES_OP,
+ i_size_read(old));
+ if (!ret)
+ ret = check_quota_exceeded(new,
+ QUOTA_CHECK_MAX_FILES_OP, 1);
+ if (ret)
+ ret = -EDQUOT;
+ }
+
+ return ret;
+}
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 60aac3aee055..5a6cdd39bc10 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -128,6 +128,7 @@ struct ceph_fs_client {
struct dentry *debugfs_congestion_kb;
struct dentry *debugfs_bdi;
struct dentry *debugfs_mdsc, *debugfs_mdsmap;
+ struct dentry *debugfs_metric;
struct dentry *debugfs_mds_sessions;
#endif
@@ -350,7 +351,25 @@ struct ceph_inode_info {
struct rb_root i_caps; /* cap list */
struct ceph_cap *i_auth_cap; /* authoritative cap, if any */
unsigned i_dirty_caps, i_flushing_caps; /* mask of dirtied fields */
- struct list_head i_dirty_item, i_flushing_item;
+
+ /*
+ * Link to the the auth cap's session's s_cap_dirty list. s_cap_dirty
+ * is protected by the mdsc->cap_dirty_lock, but each individual item
+ * is also protected by the inode's i_ceph_lock. Walking s_cap_dirty
+ * requires the mdsc->cap_dirty_lock. List presence for an item can
+ * be tested under the i_ceph_lock. Changing anything requires both.
+ */
+ struct list_head i_dirty_item;
+
+ /*
+ * Link to session's s_cap_flushing list. Protected in a similar
+ * fashion to i_dirty_item, but also by the s_mutex for changes. The
+ * s_cap_flushing list can be walked while holding either the s_mutex
+ * or msdc->cap_dirty_lock. List presence can also be checked while
+ * holding the i_ceph_lock for this inode.
+ */
+ struct list_head i_flushing_item;
+
/* we need to track cap writeback on a per-cap-bit basis, to allow
* overlapping, pipelined cap flushes to the mds. we can probably
* reduce the tid to 8 bits if we're concerned about inode size. */
@@ -644,6 +663,8 @@ static inline bool __ceph_is_any_real_caps(struct ceph_inode_info *ci)
extern int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented);
extern int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int t);
+extern int __ceph_caps_issued_mask_metric(struct ceph_inode_info *ci, int mask,
+ int t);
extern int __ceph_caps_issued_other(struct ceph_inode_info *ci,
struct ceph_cap *cap);
@@ -656,12 +677,12 @@ static inline int ceph_caps_issued(struct ceph_inode_info *ci)
return issued;
}
-static inline int ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask,
- int touch)
+static inline int ceph_caps_issued_mask_metric(struct ceph_inode_info *ci,
+ int mask, int touch)
{
int r;
spin_lock(&ci->i_ceph_lock);
- r = __ceph_caps_issued_mask(ci, mask, touch);
+ r = __ceph_caps_issued_mask_metric(ci, mask, touch);
spin_unlock(&ci->i_ceph_lock);
return r;
}
@@ -1074,6 +1095,8 @@ extern void ceph_take_cap_refs(struct ceph_inode_info *ci, int caps,
bool snap_rwsem_locked);
extern void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps);
extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had);
+extern void ceph_put_cap_refs_no_check_caps(struct ceph_inode_info *ci,
+ int had);
extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
struct ceph_snap_context *snapc);
extern void ceph_flush_snaps(struct ceph_inode_info *ci,
@@ -1189,13 +1212,14 @@ extern void ceph_handle_quota(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session,
struct ceph_msg *msg);
extern bool ceph_quota_is_max_files_exceeded(struct inode *inode);
-extern bool ceph_quota_is_same_realm(struct inode *old, struct inode *new);
extern bool ceph_quota_is_max_bytes_exceeded(struct inode *inode,
loff_t newlen);
extern bool ceph_quota_is_max_bytes_approaching(struct inode *inode,
loff_t newlen);
extern bool ceph_quota_update_statfs(struct ceph_fs_client *fsc,
struct kstatfs *buf);
+extern int ceph_quota_check_rename(struct ceph_mds_client *mdsc,
+ struct inode *old, struct inode *new);
extern void ceph_cleanup_quotarealms_inodes(struct ceph_mds_client *mdsc);
#endif /* _FS_CEPH_SUPER_H */
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 7b8a070a782d..71ee34d160c3 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -856,7 +856,7 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
if (ci->i_xattrs.version == 0 ||
!((req_mask & CEPH_CAP_XATTR_SHARED) ||
- __ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1))) {
+ __ceph_caps_issued_mask_metric(ci, CEPH_CAP_XATTR_SHARED, 1))) {
spin_unlock(&ci->i_ceph_lock);
/* security module gets xattr while filling trace */
@@ -914,7 +914,7 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size)
ci->i_xattrs.version, ci->i_xattrs.index_version);
if (ci->i_xattrs.version == 0 ||
- !__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1)) {
+ !__ceph_caps_issued_mask_metric(ci, CEPH_CAP_XATTR_SHARED, 1)) {
spin_unlock(&ci->i_ceph_lock);
err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR, true);
if (err)