diff options
author | Alexander Aring <aahringo@redhat.com> | 2024-06-10 15:26:03 -0500 |
---|---|---|
committer | David Teigland <teigland@redhat.com> | 2024-06-10 15:26:03 -0500 |
commit | 01fdeca1cc2dd705b1391f31a2594214c8bd7886 (patch) | |
tree | 31da92c905bf972bbbbf71f15c095a5773ff0b17 /fs/dlm | |
parent | c217adfc8caad240ec7bed446a6a1a801d5acc6d (diff) |
dlm: use rcu to avoid an extra rsb struct lookup
Use rcu to free rsb structs, and hold the rcu read lock
while looking up rsb structs. This allows us to avoid an
extra hash table lookup for an rsb. A new rsb flag HASHED
is added which is set while the rsb is in the hash table.
This flag is checked in place of repeating the hash table
lookup.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
Signed-off-by: David Teigland <teigland@redhat.com>
Diffstat (limited to 'fs/dlm')
-rw-r--r-- | fs/dlm/dlm_internal.h | 2 | ||||
-rw-r--r-- | fs/dlm/lock.c | 102 | ||||
-rw-r--r-- | fs/dlm/memory.c | 8 |
3 files changed, 96 insertions, 16 deletions
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index 818484315906..e06fa17c5603 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -333,6 +333,7 @@ struct dlm_rsb { struct list_head res_masters_list; /* used for recovery */ struct list_head res_recover_list; /* used for recovery */ int res_recover_locks_count; + struct rcu_head rcu; char *res_lvbptr; char res_name[DLM_RESNAME_MAXLEN+1]; @@ -366,6 +367,7 @@ enum rsb_flags { RSB_RECOVER_GRANT, RSB_RECOVER_LVB_INVAL, RSB_INACTIVE, + RSB_HASHED, /* set while rsb is on ls_rsbtbl */ }; static inline void rsb_set_flag(struct dlm_rsb *r, enum rsb_flags flag) diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index 5ca3f29bef7d..8bee4f444afd 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -563,6 +563,7 @@ void dlm_rsb_scan(struct timer_list *timer) list_del(&r->res_slow_list); rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, dlm_rhash_rsb_params); + rsb_clear_flag(r, RSB_HASHED); /* ls_rsbtbl_lock is not needed when calling send_remove() */ write_unlock(&ls->ls_rsbtbl_lock); @@ -636,8 +637,14 @@ int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len, static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash) { - return rhashtable_insert_fast(rhash, &rsb->res_node, - dlm_rhash_rsb_params); + int rv; + + rv = rhashtable_insert_fast(rhash, &rsb->res_node, + dlm_rhash_rsb_params); + if (!rv) + rsb_set_flag(rsb, RSB_HASHED); + + return rv; } /* @@ -752,11 +759,23 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, do_inactive: write_lock_bh(&ls->ls_rsbtbl_lock); - /* retry lookup under write lock to see if its still in inactive state - * if not it's in active state and we relookup - unlikely path. + /* + * The expectation here is that the rsb will have HASHED and + * INACTIVE flags set, and that the rsb can be moved from + * inactive back to active again. However, between releasing + * the read lock and acquiring the write lock, this rsb could + * have been removed from rsbtbl, and had HASHED cleared, to + * be freed. To deal with this case, we would normally need + * to repeat dlm_search_rsb_tree while holding the write lock, + * but rcu allows us to simply check the HASHED flag, because + * the rcu read lock means the rsb will not be freed yet. + * If the HASHED flag is not set, then the rsb is being freed, + * so we add a new rsb struct. If the HASHED flag is set, + * and INACTIVE is not set, it means another thread has + * made the rsb active, as we're expecting to do here, and + * we just repeat the lookup (this will be very unlikely.) */ - error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); - if (!error) { + if (rsb_flag(r, RSB_HASHED)) { if (!rsb_flag(r, RSB_INACTIVE)) { write_unlock_bh(&ls->ls_rsbtbl_lock); goto retry; @@ -926,11 +945,8 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, do_inactive: write_lock_bh(&ls->ls_rsbtbl_lock); - /* retry lookup under write lock to see if its still inactive. - * if it's active, repeat lookup - unlikely path. - */ - error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); - if (!error) { + /* See comment in find_rsb_dir. */ + if (rsb_flag(r, RSB_HASHED)) { if (!rsb_flag(r, RSB_INACTIVE)) { write_unlock_bh(&ls->ls_rsbtbl_lock); goto retry; @@ -1012,12 +1028,54 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, return error; } +/* + * rsb rcu usage + * + * While rcu read lock is held, the rsb cannot be freed, + * which allows a lookup optimization. + * + * Two threads are accessing the same rsb concurrently, + * the first (A) is trying to use the rsb, the second (B) + * is trying to free the rsb. + * + * thread A thread B + * (trying to use rsb) (trying to free rsb) + * + * A1. rcu read lock + * A2. rsbtbl read lock + * A3. look up rsb in rsbtbl + * A4. rsbtbl read unlock + * B1. rsbtbl write lock + * B2. look up rsb in rsbtbl + * B3. remove rsb from rsbtbl + * B4. clear rsb HASHED flag + * B5. rsbtbl write unlock + * B6. begin freeing rsb using rcu... + * + * (rsb is inactive, so try to make it active again) + * A5. read rsb HASHED flag (safe because rsb is not freed yet) + * A6. the rsb HASHED flag is not set, which it means the rsb + * is being removed from rsbtbl and freed, so don't use it. + * A7. rcu read unlock + * + * B7. ...finish freeing rsb using rcu + * A8. create a new rsb + * + * Without the rcu optimization, steps A5-8 would need to do + * an extra rsbtbl lookup: + * A5. rsbtbl write lock + * A6. look up rsb in rsbtbl, not found + * A7. rsbtbl write unlock + * A8. create a new rsb + */ + static int find_rsb(struct dlm_ls *ls, const void *name, int len, int from_nodeid, unsigned int flags, struct dlm_rsb **r_ret) { int dir_nodeid; uint32_t hash; + int rv; if (len > DLM_RESNAME_MAXLEN) return -EINVAL; @@ -1025,12 +1083,15 @@ static int find_rsb(struct dlm_ls *ls, const void *name, int len, hash = jhash(name, len, 0); dir_nodeid = dlm_hash2nodeid(ls, hash); + rcu_read_lock(); if (dlm_no_directory(ls)) - return find_rsb_nodir(ls, name, len, hash, dir_nodeid, + rv = find_rsb_nodir(ls, name, len, hash, dir_nodeid, from_nodeid, flags, r_ret); else - return find_rsb_dir(ls, name, len, hash, dir_nodeid, + rv = find_rsb_dir(ls, name, len, hash, dir_nodeid, from_nodeid, flags, r_ret); + rcu_read_unlock(); + return rv; } /* we have received a request and found that res_master_nodeid != our_nodeid, @@ -1187,8 +1248,8 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0) */ -int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, - int len, unsigned int flags, int *r_nodeid, int *result) +static int _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, + int len, unsigned int flags, int *r_nodeid, int *result) { struct dlm_rsb *r = NULL; uint32_t hash; @@ -1315,6 +1376,16 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, return error; } +int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, + int len, unsigned int flags, int *r_nodeid, int *result) +{ + int rv; + rcu_read_lock(); + rv = _dlm_master_lookup(ls, from_nodeid, name, len, flags, r_nodeid, result); + rcu_read_unlock(); + return rv; +} + static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash) { struct dlm_rsb *r; @@ -4293,6 +4364,7 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms) list_del(&r->res_slow_list); rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, dlm_rhash_rsb_params); + rsb_clear_flag(r, RSB_HASHED); write_unlock_bh(&ls->ls_rsbtbl_lock); free_inactive_rsb(r); diff --git a/fs/dlm/memory.c b/fs/dlm/memory.c index 15a8b1cee433..105a79978706 100644 --- a/fs/dlm/memory.c +++ b/fs/dlm/memory.c @@ -101,13 +101,19 @@ struct dlm_rsb *dlm_allocate_rsb(struct dlm_ls *ls) return r; } -void dlm_free_rsb(struct dlm_rsb *r) +static void __free_rsb_rcu(struct rcu_head *rcu) { + struct dlm_rsb *r = container_of(rcu, struct dlm_rsb, rcu); if (r->res_lvbptr) dlm_free_lvb(r->res_lvbptr); kmem_cache_free(rsb_cache, r); } +void dlm_free_rsb(struct dlm_rsb *r) +{ + call_rcu(&r->rcu, __free_rsb_rcu); +} + struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls) { struct dlm_lkb *lkb; |