diff options
author | Mark Fasheh <mark.fasheh@oracle.com> | 2006-09-08 14:14:34 -0700 |
---|---|---|
committer | Mark Fasheh <mark.fasheh@oracle.com> | 2006-09-24 13:50:42 -0700 |
commit | d680efe9d8fe0eb99d9dd063a4def6b362cdb40d (patch) | |
tree | 51e8c081c673240434dce4b44bf66fbfd4dddf30 /fs/ocfs2/dlmglue.c | |
parent | f0681062b8e369d9fb6f3ce10f4e3fc8cea5f910 (diff) |
ocfs2: Add new cluster lock type
Replace the dentry vote mechanism with a cluster lock which covers a set
of dentries. This allows us to force d_delete() only on nodes which actually
care about an unlink.
Every node that does a ->lookup() gets a read only lock on the dentry, until
an unlink during which the unlinking node, will request an exclusive lock,
forcing the other nodes who care about that dentry to d_delete() it. The
effect is that we retain a very lightweight ->d_revalidate(), and at the
same time get to make large improvements to the average case performance of
the ocfs2 unlink and rename operations.
This patch adds the cluster lock type which OCFS2 can attach to
dentries. A small number of fs/ocfs2/dcache.c functions are stubbed
out so that this change can compile.
Signed-off-by: Mark Fasheh <mark.fasheh@oracle.com>
Diffstat (limited to 'fs/ocfs2/dlmglue.c')
-rw-r--r-- | fs/ocfs2/dlmglue.c | 475 |
1 files changed, 371 insertions, 104 deletions
diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c index 20c6ca8ac7fd..764d15defd88 100644 --- a/fs/ocfs2/dlmglue.c +++ b/fs/ocfs2/dlmglue.c @@ -46,6 +46,7 @@ #include "ocfs2.h" #include "alloc.h" +#include "dcache.h" #include "dlmglue.h" #include "extent_map.h" #include "heartbeat.h" @@ -69,6 +70,9 @@ struct ocfs2_mask_waiter { static void ocfs2_inode_ast_func(void *opaque); static void ocfs2_inode_bast_func(void *opaque, int level); +static void ocfs2_dentry_ast_func(void *opaque); +static void ocfs2_dentry_bast_func(void *opaque, + int level); static void ocfs2_super_ast_func(void *opaque); static void ocfs2_super_bast_func(void *opaque, int level); @@ -76,32 +80,57 @@ static void ocfs2_rename_ast_func(void *opaque); static void ocfs2_rename_bast_func(void *opaque, int level); +/* + * Return value from ocfs2_convert_worker_t functions. + * + * These control the precise actions of ocfs2_generic_unblock_lock() + * and ocfs2_process_blocked_lock() + * + */ +enum ocfs2_unblock_action { + UNBLOCK_CONTINUE = 0, /* Continue downconvert */ + UNBLOCK_CONTINUE_POST = 1, /* Continue downconvert, fire + * ->post_unlock callback */ + UNBLOCK_STOP_POST = 2, /* Do not downconvert, fire + * ->post_unlock() callback. */ +}; + +struct ocfs2_unblock_ctl { + int requeue; + enum ocfs2_unblock_action unblock_action; +}; + /* so far, all locks have gotten along with the same unlock ast */ static void ocfs2_unlock_ast_func(void *opaque, enum dlm_status status); -static int ocfs2_do_unblock_meta(struct inode *inode, - int *requeue); static int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres, - int *requeue); + struct ocfs2_unblock_ctl *ctl); static int ocfs2_unblock_data(struct ocfs2_lock_res *lockres, - int *requeue); + struct ocfs2_unblock_ctl *ctl); static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres, - int *requeue); + struct ocfs2_unblock_ctl *ctl); +static int ocfs2_unblock_dentry_lock(struct ocfs2_lock_res *lockres, + struct ocfs2_unblock_ctl *ctl); static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres, - int *requeue); -typedef void (ocfs2_convert_worker_t)(struct ocfs2_lock_res *, int); -static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb, - struct ocfs2_lock_res *lockres, - int *requeue, - ocfs2_convert_worker_t *worker); + struct ocfs2_unblock_ctl *ctl); + +static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, + struct ocfs2_lock_res *lockres); struct ocfs2_lock_res_ops { void (*ast)(void *); void (*bast)(void *, int); void (*unlock_ast)(void *, enum dlm_status); - int (*unblock)(struct ocfs2_lock_res *, int *); + int (*unblock)(struct ocfs2_lock_res *, struct ocfs2_unblock_ctl *); + void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *); }; +typedef int (ocfs2_convert_worker_t)(struct ocfs2_lock_res *, int); +static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb, + struct ocfs2_lock_res *lockres, + struct ocfs2_unblock_ctl *ctl, + ocfs2_convert_worker_t *worker); + static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = { .ast = ocfs2_inode_ast_func, .bast = ocfs2_inode_bast_func, @@ -116,9 +145,6 @@ static struct ocfs2_lock_res_ops ocfs2_inode_meta_lops = { .unblock = ocfs2_unblock_meta, }; -static void ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, - int blocking); - static struct ocfs2_lock_res_ops ocfs2_inode_data_lops = { .ast = ocfs2_inode_ast_func, .bast = ocfs2_inode_bast_func, @@ -140,6 +166,14 @@ static struct ocfs2_lock_res_ops ocfs2_rename_lops = { .unblock = ocfs2_unblock_osb_lock, }; +static struct ocfs2_lock_res_ops ocfs2_dentry_lops = { + .ast = ocfs2_dentry_ast_func, + .bast = ocfs2_dentry_bast_func, + .unlock_ast = ocfs2_unlock_ast_func, + .unblock = ocfs2_unblock_dentry_lock, + .post_unlock = ocfs2_dentry_post_unlock, +}; + static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres) { return lockres->l_type == OCFS2_LOCK_TYPE_META || @@ -172,6 +206,13 @@ static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres) return (struct inode *) lockres->l_priv; } +static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres) +{ + BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY); + + return (struct ocfs2_dentry_lock *)lockres->l_priv; +} + static int ocfs2_lock_create(struct ocfs2_super *osb, struct ocfs2_lock_res *lockres, int level, @@ -204,22 +245,6 @@ static inline int ocfs2_can_downconvert_meta_lock(struct inode *inode, struct ocfs2_lock_res *lockres, int new_level); -static char *ocfs2_lock_type_strings[] = { - [OCFS2_LOCK_TYPE_META] = "Meta", - [OCFS2_LOCK_TYPE_DATA] = "Data", - [OCFS2_LOCK_TYPE_SUPER] = "Super", - [OCFS2_LOCK_TYPE_RENAME] = "Rename", - /* Need to differntiate from [R]ename.. serializing writes is the - * important job it does, anyway. */ - [OCFS2_LOCK_TYPE_RW] = "Write/Read", -}; - -static char *ocfs2_lock_type_string(enum ocfs2_lock_type type) -{ - mlog_bug_on_msg(type >= OCFS2_NUM_LOCK_TYPES, "%d\n", type); - return ocfs2_lock_type_strings[type]; -} - static void ocfs2_build_lock_name(enum ocfs2_lock_type type, u64 blkno, u32 generation, @@ -265,13 +290,9 @@ static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res) static void ocfs2_lock_res_init_common(struct ocfs2_super *osb, struct ocfs2_lock_res *res, enum ocfs2_lock_type type, - u64 blkno, - u32 generation, struct ocfs2_lock_res_ops *ops, void *priv) { - ocfs2_build_lock_name(type, blkno, generation, res->l_name); - res->l_type = type; res->l_ops = ops; res->l_priv = priv; @@ -319,9 +340,59 @@ void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res, break; }; - ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, - OCFS2_I(inode)->ip_blkno, - inode->i_generation, ops, inode); + ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno, + inode->i_generation, res->l_name); + ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode); +} + +static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres) +{ + __be64 inode_blkno_be; + + memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], + sizeof(__be64)); + + return be64_to_cpu(inode_blkno_be); +} + +void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl, + u64 parent, struct inode *inode) +{ + int len; + u64 inode_blkno = OCFS2_I(inode)->ip_blkno; + __be64 inode_blkno_be = cpu_to_be64(inode_blkno); + struct ocfs2_lock_res *lockres = &dl->dl_lockres; + + ocfs2_lock_res_init_once(lockres); + + /* + * Unfortunately, the standard lock naming scheme won't work + * here because we have two 16 byte values to use. Instead, + * we'll stuff the inode number as a binary value. We still + * want error prints to show something without garbling the + * display, so drop a null byte in there before the inode + * number. A future version of OCFS2 will likely use all + * binary lock names. The stringified names have been a + * tremendous aid in debugging, but now that the debugfs + * interface exists, we can mangle things there if need be. + * + * NOTE: We also drop the standard "pad" value (the total lock + * name size stays the same though - the last part is all + * zeros due to the memset in ocfs2_lock_res_init_once() + */ + len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START, + "%c%016llx", + ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY), + (long long)parent); + + BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1)); + + memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be, + sizeof(__be64)); + + ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, + OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops, + dl); } static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res, @@ -330,8 +401,9 @@ static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res, /* Superblock lockres doesn't come from a slab so we call init * once on it manually. */ ocfs2_lock_res_init_once(res); + ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO, + 0, res->l_name); ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER, - OCFS2_SUPER_BLOCK_BLKNO, 0, &ocfs2_super_lops, osb); } @@ -341,7 +413,8 @@ static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res, /* Rename lockres doesn't come from a slab so we call init * once on it manually. */ ocfs2_lock_res_init_once(res); - ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME, 0, 0, + ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name); + ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME, &ocfs2_rename_lops, osb); } @@ -627,9 +700,10 @@ static void ocfs2_generic_bast_func(struct ocfs2_super *osb, ocfs2_schedule_blocked_lock(osb, lockres); spin_unlock_irqrestore(&lockres->l_lock, flags); + wake_up(&lockres->l_event); + ocfs2_kick_vote_thread(osb); - wake_up(&lockres->l_event); mlog_exit_void(); } @@ -690,9 +764,9 @@ static void ocfs2_generic_ast_func(struct ocfs2_lock_res *lockres, /* set it to something invalid so if we get called again we * can catch it. */ lockres->l_action = OCFS2_AST_INVALID; - spin_unlock_irqrestore(&lockres->l_lock, flags); wake_up(&lockres->l_event); + spin_unlock_irqrestore(&lockres->l_lock, flags); } static void ocfs2_super_ast_func(void *opaque) @@ -757,6 +831,27 @@ static void ocfs2_rename_bast_func(void *opaque, mlog_exit_void(); } +static void ocfs2_dentry_ast_func(void *opaque) +{ + struct ocfs2_lock_res *lockres = opaque; + + BUG_ON(!lockres); + + ocfs2_generic_ast_func(lockres, 1); +} + +static void ocfs2_dentry_bast_func(void *opaque, int level) +{ + struct ocfs2_lock_res *lockres = opaque; + struct ocfs2_dentry_lock *dl = lockres->l_priv; + struct ocfs2_super *osb = OCFS2_SB(dl->dl_inode->i_sb); + + mlog(0, "Dentry bast: level: %d, name: %s\n", level, + lockres->l_name); + + ocfs2_generic_bast_func(osb, lockres, level); +} + static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, int convert) { @@ -1076,10 +1171,11 @@ static void ocfs2_cluster_unlock(struct ocfs2_super *osb, mlog_exit_void(); } -static int ocfs2_create_new_inode_lock(struct inode *inode, - struct ocfs2_lock_res *lockres) +int ocfs2_create_new_lock(struct ocfs2_super *osb, + struct ocfs2_lock_res *lockres, + int ex) { - struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); + int level = ex ? LKM_EXMODE : LKM_PRMODE; unsigned long flags; spin_lock_irqsave(&lockres->l_lock, flags); @@ -1087,7 +1183,7 @@ static int ocfs2_create_new_inode_lock(struct inode *inode, lockres_or_flags(lockres, OCFS2_LOCK_LOCAL); spin_unlock_irqrestore(&lockres->l_lock, flags); - return ocfs2_lock_create(osb, lockres, LKM_EXMODE, LKM_LOCAL); + return ocfs2_lock_create(osb, lockres, level, LKM_LOCAL); } /* Grants us an EX lock on the data and metadata resources, skipping @@ -1099,6 +1195,7 @@ static int ocfs2_create_new_inode_lock(struct inode *inode, int ocfs2_create_new_inode_locks(struct inode *inode) { int ret; + struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); BUG_ON(!inode); BUG_ON(!ocfs2_inode_is_new(inode)); @@ -1115,22 +1212,19 @@ int ocfs2_create_new_inode_locks(struct inode *inode) * on a resource which has an invalid one -- we'll set it * valid when we release the EX. */ - ret = ocfs2_create_new_inode_lock(inode, - &OCFS2_I(inode)->ip_rw_lockres); + ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1); if (ret) { mlog_errno(ret); goto bail; } - ret = ocfs2_create_new_inode_lock(inode, - &OCFS2_I(inode)->ip_meta_lockres); + ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_meta_lockres, 1); if (ret) { mlog_errno(ret); goto bail; } - ret = ocfs2_create_new_inode_lock(inode, - &OCFS2_I(inode)->ip_data_lockres); + ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_data_lockres, 1); if (ret) { mlog_errno(ret); goto bail; @@ -1809,6 +1903,34 @@ void ocfs2_rename_unlock(struct ocfs2_super *osb) ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE); } +int ocfs2_dentry_lock(struct dentry *dentry, int ex) +{ + int ret; + int level = ex ? LKM_EXMODE : LKM_PRMODE; + struct ocfs2_dentry_lock *dl = dentry->d_fsdata; + struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); + + BUG_ON(!dl); + + if (ocfs2_is_hard_readonly(osb)) + return -EROFS; + + ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0); + if (ret < 0) + mlog_errno(ret); + + return ret; +} + +void ocfs2_dentry_unlock(struct dentry *dentry, int ex) +{ + int level = ex ? LKM_EXMODE : LKM_PRMODE; + struct ocfs2_dentry_lock *dl = dentry->d_fsdata; + struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); + + ocfs2_cluster_unlock(osb, &dl->dl_lockres, level); +} + /* Reference counting of the dlm debug structure. We want this because * open references on the debug inodes can live on after a mount, so * we can't rely on the ocfs2_super to always exist. */ @@ -1939,9 +2061,16 @@ static int ocfs2_dlm_seq_show(struct seq_file *m, void *v) if (!lockres) return -EINVAL; - seq_printf(m, "0x%x\t" - "%.*s\t" - "%d\t" + seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION); + + if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY) + seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1, + lockres->l_name, + (unsigned int)ocfs2_get_dentry_lock_ino(lockres)); + else + seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name); + + seq_printf(m, "%d\t" "0x%lx\t" "0x%x\t" "0x%x\t" @@ -1949,8 +2078,6 @@ static int ocfs2_dlm_seq_show(struct seq_file *m, void *v) "%u\t" "%d\t" "%d\t", - OCFS2_DLM_DEBUG_STR_VERSION, - OCFS2_LOCK_ID_MAX_LEN, lockres->l_name, lockres->l_level, lockres->l_flags, lockres->l_action, @@ -2311,25 +2438,21 @@ void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres) spin_unlock_irqrestore(&lockres->l_lock, flags); } -static void ocfs2_drop_osb_locks(struct ocfs2_super *osb) +void ocfs2_simple_drop_lockres(struct ocfs2_super *osb, + struct ocfs2_lock_res *lockres) { - int status; - - mlog_entry_void(); - - ocfs2_mark_lockres_freeing(&osb->osb_super_lockres); - - status = ocfs2_drop_lock(osb, &osb->osb_super_lockres, NULL); - if (status < 0) - mlog_errno(status); - - ocfs2_mark_lockres_freeing(&osb->osb_rename_lockres); + int ret; - status = ocfs2_drop_lock(osb, &osb->osb_rename_lockres, NULL); - if (status < 0) - mlog_errno(status); + ocfs2_mark_lockres_freeing(lockres); + ret = ocfs2_drop_lock(osb, lockres, NULL); + if (ret) + mlog_errno(ret); +} - mlog_exit(status); +static void ocfs2_drop_osb_locks(struct ocfs2_super *osb) +{ + ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres); + ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres); } static void ocfs2_meta_pre_drop(struct ocfs2_lock_res *lockres, void *data) @@ -2599,7 +2722,7 @@ leave: static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb, struct ocfs2_lock_res *lockres, - int *requeue, + struct ocfs2_unblock_ctl *ctl, ocfs2_convert_worker_t *worker) { unsigned long flags; @@ -2615,7 +2738,7 @@ static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb, recheck: if (lockres->l_flags & OCFS2_LOCK_BUSY) { - *requeue = 1; + ctl->requeue = 1; ret = ocfs2_prepare_cancel_convert(osb, lockres); spin_unlock_irqrestore(&lockres->l_lock, flags); if (ret) { @@ -2631,7 +2754,7 @@ recheck: if ((lockres->l_blocking == LKM_EXMODE) && (lockres->l_ex_holders || lockres->l_ro_holders)) { spin_unlock_irqrestore(&lockres->l_lock, flags); - *requeue = 1; + ctl->requeue = 1; ret = 0; goto leave; } @@ -2641,7 +2764,7 @@ recheck: if (lockres->l_blocking == LKM_PRMODE && lockres->l_ex_holders) { spin_unlock_irqrestore(&lockres->l_lock, flags); - *requeue = 1; + ctl->requeue = 1; ret = 0; goto leave; } @@ -2659,7 +2782,10 @@ recheck: blocking = lockres->l_blocking; spin_unlock_irqrestore(&lockres->l_lock, flags); - worker(lockres, blocking); + ctl->unblock_action = worker(lockres, blocking); + + if (ctl->unblock_action == UNBLOCK_STOP_POST) + goto leave; spin_lock_irqsave(&lockres->l_lock, flags); if (blocking != lockres->l_blocking) { @@ -2669,7 +2795,7 @@ recheck: } downconvert: - *requeue = 0; + ctl->requeue = 0; new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking); ocfs2_prepare_downconvert(lockres, new_level); @@ -2680,14 +2806,12 @@ leave: return ret; } -static void ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, - int blocking) +static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, + int blocking) { struct inode *inode; struct address_space *mapping; - mlog_entry_void(); - inode = ocfs2_lock_res_inode(lockres); mapping = inode->i_mapping; @@ -2708,11 +2832,11 @@ static void ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, filemap_fdatawait(mapping); } - mlog_exit_void(); + return UNBLOCK_CONTINUE; } int ocfs2_unblock_data(struct ocfs2_lock_res *lockres, - int *requeue) + struct ocfs2_unblock_ctl *ctl) { int status; struct inode *inode; @@ -2726,22 +2850,20 @@ int ocfs2_unblock_data(struct ocfs2_lock_res *lockres, mlog(0, "unblock inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno); - status = ocfs2_generic_unblock_lock(osb, - lockres, - requeue, + status = ocfs2_generic_unblock_lock(osb, lockres, ctl, ocfs2_data_convert_worker); if (status < 0) mlog_errno(status); mlog(0, "inode %llu, requeue = %d\n", - (unsigned long long)OCFS2_I(inode)->ip_blkno, *requeue); + (unsigned long long)OCFS2_I(inode)->ip_blkno, ctl->requeue); mlog_exit(status); return status; } static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres, - int *requeue) + struct ocfs2_unblock_ctl *ctl) { int status; struct inode *inode; @@ -2753,9 +2875,7 @@ static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres, inode = ocfs2_lock_res_inode(lockres); status = ocfs2_generic_unblock_lock(OCFS2_SB(inode->i_sb), - lockres, - requeue, - NULL); + lockres, ctl, NULL); if (status < 0) mlog_errno(status); @@ -2763,9 +2883,8 @@ static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres, return status; } - -int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres, - int *requeue) +static int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres, + struct ocfs2_unblock_ctl *ctl) { int status; struct inode *inode; @@ -2777,21 +2896,165 @@ int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres, mlog(0, "unblock inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno); - status = ocfs2_do_unblock_meta(inode, requeue); + status = ocfs2_do_unblock_meta(inode, &ctl->requeue); if (status < 0) mlog_errno(status); mlog(0, "inode %llu, requeue = %d\n", - (unsigned long long)OCFS2_I(inode)->ip_blkno, *requeue); + (unsigned long long)OCFS2_I(inode)->ip_blkno, ctl->requeue); mlog_exit(status); return status; } +/* + * Does the final reference drop on our dentry lock. Right now this + * happens in the vote thread, but we could choose to simplify the + * dlmglue API and push these off to the ocfs2_wq in the future. + */ +static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, + struct ocfs2_lock_res *lockres) +{ + struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); + ocfs2_dentry_lock_put(osb, dl); +} + +/* + * d_delete() matching dentries before the lock downconvert. + * + * At this point, any process waiting to destroy the + * dentry_lock due to last ref count is stopped by the + * OCFS2_LOCK_QUEUED flag. + * + * We have two potential problems + * + * 1) If we do the last reference drop on our dentry_lock (via dput) + * we'll wind up in ocfs2_release_dentry_lock(), waiting on + * the downconvert to finish. Instead we take an elevated + * reference and push the drop until after we've completed our + * unblock processing. + * + * 2) There might be another process with a final reference, + * waiting on us to finish processing. If this is the case, we + * detect it and exit out - there's no more dentries anyway. + */ +static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, + int blocking) +{ + struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); + struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode); + struct dentry *dentry; + unsigned long flags; + int extra_ref = 0; + + /* + * This node is blocking another node from getting a read + * lock. This happens when we've renamed within a + * directory. We've forced the other nodes to d_delete(), but + * we never actually dropped our lock because it's still + * valid. The downconvert code will retain a PR for this node, + * so there's no further work to do. + */ + if (blocking == LKM_PRMODE) + return UNBLOCK_CONTINUE; + + /* + * Mark this inode as potentially orphaned. The code in + * ocfs2_delete_inode() will figure out whether it actually + * needs to be freed or not. + */ + spin_lock(&oi->ip_lock); + oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED; + spin_unlock(&oi->ip_lock); + + /* + * Yuck. We need to make sure however that the check of + * OCFS2_LOCK_FREEING and the extra reference are atomic with + * respect to a reference decrement or the setting of that + * flag. + */ + spin_lock_irqsave(&lockres->l_lock, flags); + spin_lock(&dentry_attach_lock); + if (!(lockres->l_flags & OCFS2_LOCK_FREEING) + && dl->dl_count) { + dl->dl_count++; + extra_ref = 1; + } + spin_unlock(&dentry_attach_lock); + spin_unlock_irqrestore(&lockres->l_lock, flags); + + mlog(0, "extra_ref = %d\n", extra_ref); + + /* + * We have a process waiting on us in ocfs2_dentry_iput(), + * which means we can't have any more outstanding + * aliases. There's no need to do any more work. + */ + if (!extra_ref) + return UNBLOCK_CONTINUE; + + spin_lock(&dentry_attach_lock); + while (1) { + dentry = ocfs2_find_local_alias(dl->dl_inode, + dl->dl_parent_blkno, 1); + if (!dentry) + break; + spin_unlock(&dentry_attach_lock); + + mlog(0, "d_delete(%.*s);\n", dentry->d_name.len, + dentry->d_name.name); + + /* + * The following dcache calls may do an + * iput(). Normally we don't want that from the + * downconverting thread, but in this case it's ok + * because the requesting node already has an + * exclusive lock on the inode, so it can't be queued + * for a downconvert. + */ + d_delete(dentry); + dput(dentry); + + spin_lock(&dentry_attach_lock); + } + spin_unlock(&dentry_attach_lock); + + /* + * If we are the last holder of this dentry lock, there is no + * reason to downconvert so skip straight to the unlock. + */ + if (dl->dl_count == 1) + return UNBLOCK_STOP_POST; + + return UNBLOCK_CONTINUE_POST; +} + +static int ocfs2_unblock_dentry_lock(struct ocfs2_lock_res *lockres, + struct ocfs2_unblock_ctl *ctl) +{ + int ret; + struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); + struct ocfs2_super *osb = OCFS2_SB(dl->dl_inode->i_sb); + + mlog(0, "unblock dentry lock: %llu\n", + (unsigned long long)OCFS2_I(dl->dl_inode)->ip_blkno); + + ret = ocfs2_generic_unblock_lock(osb, + lockres, + ctl, + ocfs2_dentry_convert_worker); + if (ret < 0) + mlog_errno(ret); + + mlog(0, "requeue = %d, post = %d\n", ctl->requeue, ctl->unblock_action); + + return ret; +} + /* Generic unblock function for any lockres whose private data is an * ocfs2_super pointer. */ static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres, - int *requeue) + struct ocfs2_unblock_ctl *ctl) { int status; struct ocfs2_super *osb; @@ -2804,7 +3067,7 @@ static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres, status = ocfs2_generic_unblock_lock(osb, lockres, - requeue, + ctl, NULL); if (status < 0) mlog_errno(status); @@ -2817,7 +3080,7 @@ void ocfs2_process_blocked_lock(struct ocfs2_super *osb, struct ocfs2_lock_res *lockres) { int status; - int requeue = 0; + struct ocfs2_unblock_ctl ctl = {0, 0,}; unsigned long flags; /* Our reference to the lockres in this function can be @@ -2842,21 +3105,25 @@ void ocfs2_process_blocked_lock(struct ocfs2_super *osb, goto unqueue; spin_unlock_irqrestore(&lockres->l_lock, flags); - status = lockres->l_ops->unblock(lockres, &requeue); + status = lockres->l_ops->unblock(lockres, &ctl); if (status < 0) mlog_errno(status); spin_lock_irqsave(&lockres->l_lock, flags); unqueue: - if (lockres->l_flags & OCFS2_LOCK_FREEING || !requeue) { + if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) { lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED); } else ocfs2_schedule_blocked_lock(osb, lockres); mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name, - requeue ? "yes" : "no"); + ctl.requeue ? "yes" : "no"); spin_unlock_irqrestore(&lockres->l_lock, flags); + if (ctl.unblock_action != UNBLOCK_CONTINUE + && lockres->l_ops->post_unlock) + lockres->l_ops->post_unlock(osb, lockres); + mlog_exit_void(); } |