GIT 6a7c9f8f929d338ba49e84e21ac43a2e0cc63de0 git+ssh://master.kernel.org/pub/scm/linux/kernel/git/mfasheh/ocfs2.git#ALL commit c23804eaae9b563ff062a53df9826e961be046c4 Author: Kurt Hackel Date: Fri Jan 5 15:04:49 2007 -0800 ocfs2_dlm: Fix migrate lockres handler queue scanning The migrate lockres handler was only searching for its lock on migrated lockres on the expected queue. This could be problematic as the new master could have also issued a convert request during the migration and thus moved the lock to the convert queue. We now search for the lock on all three queues. Signed-off-by: Kurt Hackel Signed-off-by: Sunil Mushran Signed-off-by: Mark Fasheh commit db039bf0755d4152b2bcb7a90b3d2abf2ad61b92 Author: Kurt Hackel Date: Fri Jan 5 15:02:30 2007 -0800 ocfs2_dlm: Make dlmunlock() wait for migration to complete dlmunlock() was not waiting for migration to complete before releasing locks on locally mastered locks. Signed-off-by: Kurt Hackel Signed-off-by: Sunil Mushran Signed-off-by: Mark Fasheh commit 2e17f7d3db6c8db8dc4a69d105c8a34baf711c25 Author: Kurt Hackel Date: Fri Jan 5 15:00:17 2007 -0800 ocfs2_dlm: Fixes race between migrate and dirty dlmthread was removing lockres' from the dirty list and resetting the dirty flag before shuffling the list. This patch retains the dirty state flag until the lists are shuffled. Signed-off-by: Kurt Hackel Signed-off-by: Sunil Mushran Signed-off-by: Mark Fasheh commit ba5f1ff8f78c52ea86a4cdddae4c7d837c566197 Author: Mark Fasheh Date: Wed Jan 3 17:25:40 2007 -0800 ocfs2: Don't print errors when following symlinks We shouldn't print errors returned from vfs_follow_link(). This was causing spurious errors to show up in the logs. Signed-off-by: Mark Fasheh commit 2811c739ea334601706fc6d588c44f9c16d71198 Author: Mark Fasheh Date: Wed Jan 3 17:06:59 2007 -0800 ocfs2: cleanup ocfs2_iget() errors Get rid of some error prints in the ocfs2_iget() path from ocfs2_get_dentry(). NFSD can easily cause us to read stale inodes. Signed-off-by: Mark Fasheh commit 6b1a5e00af99747e5d430193300bbf7d6474d79d Author: Mark Fasheh Date: Tue Jan 2 17:59:40 2007 -0800 ocfs2: Directory c/mtime update fixes ocfs2 wasn't updating c/mtime on directories during dirent creation/deletion. Fix ocfs2_unlink(), ocfs2_rename() and __ocfs2_add_entry() by adding the proper code to update the struct inode and push the change out to disk. This helps rename/unlink on nfs exported file systems in particular as those clients compare directory time values to avoid a full re-reading a directory which hasn't changed. ocfs2_rename() loses some superfluous error handling as a result of this patch. Signed-off-by: Mark Fasheh commit c436841d732853f60495e9727e72b309e15e9dcd Author: Adrian Bunk Date: Thu Dec 14 00:17:32 2006 +0100 [PATCH] fs/ocfs2/dlm/: make functions static This patch makes some needlessly global functions static. Signed-off-by: Adrian Bunk Signed-off-by: Mark Fasheh commit d65fa9be91c6e0ac24851c69a07805666b48e595 Author: Kurt Hackel Date: Fri Dec 1 14:47:20 2006 -0800 ocfs2: fix cluster-wide refcounting of lock resources This was previously broken and migration of some locks had to be temporarily disabled. We use a new (and backward-incompatible) set of network messages to account for all references to a lock resources held across the cluster. once these are all freed, the master node may then free the lock resource memory once its local references are dropped. Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh commit e47575895914402e1e72f00b3db539f468b849e3 Author: Joel Becker Date: Fri Oct 6 17:33:23 2006 -0700 configfs: accessing item hierarchy during rmdir(2) Add a notification callback, ops->disconnect_notify(). It has the same prototype as ->drop_item(), but it will be called just before the item linkage is broken. This way, configfs users who want to do work while the object is still in the heirarchy have a chance. Client drivers will still need to config_item_put() in their ->drop_item(), if they implement it. They need do nothing in ->disconnect_notify(). They don't have to provide it if they don't care. But someone who wants to be notified before ci_parent is set to NULL can now be notified. Signed-off-by: Joel Becker Documentation/filesystems/configfs/configfs.txt | 12 + fs/configfs/dir.c | 29 ++ fs/ocfs2/cluster/tcp_internal.h | 5 fs/ocfs2/dlm/dlmcommon.h | 78 ++++ fs/ocfs2/dlm/dlmdebug.c | 18 + fs/ocfs2/dlm/dlmdomain.c | 117 +++++- fs/ocfs2/dlm/dlmlock.c | 4 fs/ocfs2/dlm/dlmmaster.c | 418 ++++++++++++++++++++--- fs/ocfs2/dlm/dlmrecovery.c | 150 ++++++++ fs/ocfs2/dlm/dlmthread.c | 196 +++++------ fs/ocfs2/dlm/dlmunlock.c | 4 fs/ocfs2/export.c | 5 fs/ocfs2/inode.c | 11 - fs/ocfs2/namei.c | 69 ++-- fs/ocfs2/symlink.c | 3 include/linux/configfs.h | 1 16 files changed, 873 insertions(+), 247 deletions(-) diff --git a/Documentation/filesystems/configfs/configfs.txt b/Documentation/filesystems/configfs/configfs.txt index b34cdb5..c199484 100644 --- a/Documentation/filesystems/configfs/configfs.txt +++ b/Documentation/filesystems/configfs/configfs.txt @@ -238,6 +238,8 @@ config_item_type. struct config_group *(*make_group)(struct config_group *group, const char *name); int (*commit_item)(struct config_item *item); + void (*disconnect_notify)(struct config_group *group, + struct config_item *item); void (*drop_item)(struct config_group *group, struct config_item *item); }; @@ -268,6 +270,16 @@ the item in other threads, the memory is for the item to actually disappear from the subsystem's usage. But it is gone from configfs. +When drop_item() is called, the item's linkage has already been torn +down. It no longer has a reference on its parent and has no place in +the item hierarchy. If a client needs to do some cleanup before this +teardown happens, the subsystem can implement the +ct_group_ops->disconnect_notify() method. The method is called after +configfs has removed the item from the filesystem view but before the +item is removed from its parent group. Like drop_item(), +disconnect_notify() is void and cannot fail. Client subsystems should +not drop any references here, as they still must do it in drop_item(). + A config_group cannot be removed while it still has child items. This is implemented in the configfs rmdir(2) code. ->drop_item() will not be called, as the item has not been dropped. rmdir(2) will fail, as the diff --git a/fs/configfs/dir.c b/fs/configfs/dir.c index 1814ba4..c8d234d 100644 --- a/fs/configfs/dir.c +++ b/fs/configfs/dir.c @@ -715,6 +715,28 @@ static void configfs_detach_group(struct } /* + * After the item has been detached from the filesystem view, we are + * ready to tear it out of the hierarchy. Notify the client before + * we do that so they can perform any cleanup that requires + * navigating the hierarchy. A client does not need to provide this + * callback. The subsystem semaphore MUST be held by the caller, and + * references must be valid for both items. It also assumes the + * caller has validated ci_type. + */ +static void client_disconnect_notify(struct config_item *parent_item, + struct config_item *item) +{ + struct config_item_type *type; + + type = parent_item->ci_type; + BUG_ON(!type); + + if (type->ct_group_ops && type->ct_group_ops->disconnect_notify) + type->ct_group_ops->disconnect_notify(to_config_group(parent_item), + item); +} + +/* * Drop the initial reference from make_item()/make_group() * This function assumes that reference is held on item * and that item holds a valid reference to the parent. Also, it @@ -734,7 +756,7 @@ static void client_drop_item(struct conf */ if (type->ct_group_ops && type->ct_group_ops->drop_item) type->ct_group_ops->drop_item(to_config_group(parent_item), - item); + item); else config_item_put(item); } @@ -843,11 +865,14 @@ out_unlink: if (ret) { /* Tear down everything we built up */ down(&subsys->su_sem); + + client_disconnect_notify(parent_item, item); if (group) unlink_group(group); else unlink_obj(item); client_drop_item(parent_item, item); + up(&subsys->su_sem); if (module_got) @@ -912,11 +937,13 @@ static int configfs_rmdir(struct inode * configfs_detach_group(item); down(&subsys->su_sem); + client_disconnect_notify(parent_item, item); unlink_group(to_config_group(item)); } else { configfs_detach_item(item); down(&subsys->su_sem); + client_disconnect_notify(parent_item, item); unlink_obj(item); } diff --git a/fs/ocfs2/cluster/tcp_internal.h b/fs/ocfs2/cluster/tcp_internal.h index b700dc9..775c911 100644 --- a/fs/ocfs2/cluster/tcp_internal.h +++ b/fs/ocfs2/cluster/tcp_internal.h @@ -38,6 +38,9 @@ #define O2NET_QUORUM_DELAY_MS ((o2hb_dea * locking semantics of the file system using the protocol. It should * be somewhere else, I'm sure, but right now it isn't. * + * New in version 6: + * - DLM lockres remote refcount fixes. + * * New in version 5: * - Network timeout checking protocol * @@ -51,7 +54,7 @@ #define O2NET_QUORUM_DELAY_MS ((o2hb_dea * - full 64 bit i_size in the metadata lock lvbs * - introduction of "rw" lock and pushing meta/data locking down */ -#define O2NET_PROTOCOL_VERSION 5ULL +#define O2NET_PROTOCOL_VERSION 6ULL struct o2net_handshake { __be64 protocol_version; __be64 connector_id; diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h index 6b6ff76..e95ecb2 100644 --- a/fs/ocfs2/dlm/dlmcommon.h +++ b/fs/ocfs2/dlm/dlmcommon.h @@ -222,6 +222,8 @@ #define DLM_LOCK_RES_READY #define DLM_LOCK_RES_DIRTY 0x00000008 #define DLM_LOCK_RES_IN_PROGRESS 0x00000010 #define DLM_LOCK_RES_MIGRATING 0x00000020 +#define DLM_LOCK_RES_DROPPING_REF 0x00000040 +#define DLM_LOCK_RES_BLOCK_DIRTY 0x00001000 /* max milliseconds to wait to sync up a network failure with a node death */ #define DLM_NODE_DEATH_WAIT_MAX (5 * 1000) @@ -265,6 +267,8 @@ struct dlm_lock_resource u8 owner; //node which owns the lock resource, or unknown u16 state; char lvb[DLM_LVB_LEN]; + unsigned int inflight_locks; + unsigned long refmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; }; struct dlm_migratable_lock @@ -367,7 +371,7 @@ enum { DLM_CONVERT_LOCK_MSG, /* 504 */ DLM_PROXY_AST_MSG, /* 505 */ DLM_UNLOCK_LOCK_MSG, /* 506 */ - DLM_UNUSED_MSG2, /* 507 */ + DLM_DEREF_LOCKRES_MSG, /* 507 */ DLM_MIGRATE_REQUEST_MSG, /* 508 */ DLM_MIG_LOCKRES_MSG, /* 509 */ DLM_QUERY_JOIN_MSG, /* 510 */ @@ -417,6 +421,9 @@ struct dlm_master_request u8 name[O2NM_MAX_NAME_LEN]; }; +#define DLM_ASSERT_RESPONSE_REASSERT 0x00000001 +#define DLM_ASSERT_RESPONSE_MASTERY_REF 0x00000002 + #define DLM_ASSERT_MASTER_MLE_CLEANUP 0x00000001 #define DLM_ASSERT_MASTER_REQUERY 0x00000002 #define DLM_ASSERT_MASTER_FINISH_MIGRATION 0x00000004 @@ -430,6 +437,8 @@ struct dlm_assert_master u8 name[O2NM_MAX_NAME_LEN]; }; +#define DLM_MIGRATE_RESPONSE_MASTERY_REF 0x00000001 + struct dlm_migrate_request { u8 master; @@ -648,6 +657,16 @@ struct dlm_finalize_reco __be32 pad2; }; +struct dlm_deref_lockres +{ + u32 pad1; + u16 pad2; + u8 node_idx; + u8 namelen; + + u8 name[O2NM_MAX_NAME_LEN]; +}; + static inline enum dlm_status __dlm_lockres_state_to_status(struct dlm_lock_resource *res) { @@ -721,8 +740,6 @@ void __dlm_lockres_calc_usage(struct dlm struct dlm_lock_resource *res); void dlm_lockres_calc_usage(struct dlm_ctxt *dlm, struct dlm_lock_resource *res); -void dlm_purge_lockres(struct dlm_ctxt *dlm, - struct dlm_lock_resource *lockres); static inline void dlm_lockres_get(struct dlm_lock_resource *res) { /* This is called on every lookup, so it might be worth @@ -733,6 +750,10 @@ void dlm_lockres_put(struct dlm_lock_res void __dlm_unhash_lockres(struct dlm_lock_resource *res); void __dlm_insert_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res); +struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm, + const char *name, + unsigned int len, + unsigned int hash); struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, const char *name, unsigned int len, @@ -753,6 +774,47 @@ struct dlm_lock_resource *dlm_new_lockre const char *name, unsigned int namelen); +#define dlm_lockres_set_refmap_bit(bit,res) \ + __dlm_lockres_set_refmap_bit(bit,res,__FILE__,__LINE__) +#define dlm_lockres_clear_refmap_bit(bit,res) \ + __dlm_lockres_clear_refmap_bit(bit,res,__FILE__,__LINE__) + +static inline void __dlm_lockres_set_refmap_bit(int bit, + struct dlm_lock_resource *res, + const char *file, + int line) +{ + //printk("%s:%d:%.*s: setting bit %d\n", file, line, + // res->lockname.len, res->lockname.name, bit); + set_bit(bit, res->refmap); +} + +static inline void __dlm_lockres_clear_refmap_bit(int bit, + struct dlm_lock_resource *res, + const char *file, + int line) +{ + //printk("%s:%d:%.*s: clearing bit %d\n", file, line, + // res->lockname.len, res->lockname.name, bit); + clear_bit(bit, res->refmap); +} + +void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res, + const char *file, + int line); +void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res, + int new_lockres, + const char *file, + int line); +#define dlm_lockres_drop_inflight_ref(d,r) \ + __dlm_lockres_drop_inflight_ref(d,r,__FILE__,__LINE__) +#define dlm_lockres_grab_inflight_ref(d,r) \ + __dlm_lockres_grab_inflight_ref(d,r,0,__FILE__,__LINE__) +#define dlm_lockres_grab_inflight_ref_new(d,r) \ + __dlm_lockres_grab_inflight_ref(d,r,1,__FILE__,__LINE__) + void dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock); void dlm_queue_bast(struct dlm_ctxt *dlm, struct dlm_lock *lock); void dlm_do_local_ast(struct dlm_ctxt *dlm, @@ -801,10 +863,7 @@ int dlm_heartbeat_init(struct dlm_ctxt * void dlm_hb_node_down_cb(struct o2nm_node *node, int idx, void *data); void dlm_hb_node_up_cb(struct o2nm_node *node, int idx, void *data); -int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, struct dlm_lock_resource *res); -int dlm_migrate_lockres(struct dlm_ctxt *dlm, - struct dlm_lock_resource *res, - u8 target); +int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res); int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, u8 old_master); @@ -814,6 +873,7 @@ void __dlm_lockres_reserve_ast(struct dl int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data); int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data); +int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data); int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data); int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data); int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data); @@ -856,10 +916,12 @@ static inline void __dlm_wait_on_lockres int dlm_init_mle_cache(void); void dlm_destroy_mle_cache(void); void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up); +int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res); void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node); int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock); - +int __dlm_lockres_has_locks(struct dlm_lock_resource *res); int __dlm_lockres_unused(struct dlm_lock_resource *res); static inline const char * dlm_lock_mode_name(int mode) diff --git a/fs/ocfs2/dlm/dlmdebug.c b/fs/ocfs2/dlm/dlmdebug.c index 3f6c8d8..1015cc7 100644 --- a/fs/ocfs2/dlm/dlmdebug.c +++ b/fs/ocfs2/dlm/dlmdebug.c @@ -53,6 +53,23 @@ void dlm_print_one_lock_resource(struct spin_unlock(&res->spinlock); } +static void dlm_print_lockres_refmap(struct dlm_lock_resource *res) +{ + int bit; + assert_spin_locked(&res->spinlock); + + mlog(ML_NOTICE, " refmap nodes: [ "); + bit = 0; + while (1) { + bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit); + if (bit >= O2NM_MAX_NODES) + break; + printk("%u ", bit); + bit++; + } + printk("], inflight=%u\n", res->inflight_locks); +} + void __dlm_print_one_lock_resource(struct dlm_lock_resource *res) { struct list_head *iter2; @@ -65,6 +82,7 @@ void __dlm_print_one_lock_resource(struc res->owner, res->state); mlog(ML_NOTICE, " last used: %lu, on purge list: %s\n", res->last_used, list_empty(&res->purge) ? "no" : "yes"); + dlm_print_lockres_refmap(res); mlog(ML_NOTICE, " granted queue: \n"); list_for_each(iter2, &res->granted) { lock = list_entry(iter2, struct dlm_lock, list); diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c index f0b25f2..3995de3 100644 --- a/fs/ocfs2/dlm/dlmdomain.c +++ b/fs/ocfs2/dlm/dlmdomain.c @@ -125,10 +125,10 @@ void __dlm_insert_lockres(struct dlm_ctx hlist_add_head(&res->hash_node, bucket); } -struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, - const char *name, - unsigned int len, - unsigned int hash) +struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm, + const char *name, + unsigned int len, + unsigned int hash) { struct hlist_head *bucket; struct hlist_node *list; @@ -154,6 +154,37 @@ struct dlm_lock_resource * __dlm_lookup_ return NULL; } +/* intended to be called by functions which do not care about lock + * resources which are being purged (most net _handler functions). + * this will return NULL for any lock resource which is found but + * currently in the process of dropping its mastery reference. + * use __dlm_lookup_lockres_full when you need the lock resource + * regardless (e.g. dlm_get_lock_resource) */ +struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, + const char *name, + unsigned int len, + unsigned int hash) +{ + struct dlm_lock_resource *res = NULL; + + mlog_entry("%.*s\n", len, name); + + assert_spin_locked(&dlm->spinlock); + + res = __dlm_lookup_lockres_full(dlm, name, len, hash); + if (res) { + spin_lock(&res->spinlock); + if (res->state & DLM_LOCK_RES_DROPPING_REF) { + spin_unlock(&res->spinlock); + dlm_lockres_put(res); + return NULL; + } + spin_unlock(&res->spinlock); + } + + return res; +} + struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm, const char *name, unsigned int len) @@ -330,43 +361,60 @@ static void dlm_complete_dlm_shutdown(st wake_up(&dlm_domain_events); } -static void dlm_migrate_all_locks(struct dlm_ctxt *dlm) +static int dlm_migrate_all_locks(struct dlm_ctxt *dlm) { - int i; + int i, num, n, ret = 0; struct dlm_lock_resource *res; + struct hlist_node *iter; + struct hlist_head *bucket; + int dropped; mlog(0, "Migrating locks from domain %s\n", dlm->name); -restart: + + num = 0; spin_lock(&dlm->spinlock); for (i = 0; i < DLM_HASH_BUCKETS; i++) { - while (!hlist_empty(dlm_lockres_hash(dlm, i))) { - res = hlist_entry(dlm_lockres_hash(dlm, i)->first, - struct dlm_lock_resource, hash_node); - /* need reference when manually grabbing lockres */ +redo_bucket: + n = 0; + bucket = dlm_lockres_hash(dlm, i); + iter = bucket->first; + while (iter) { + n++; + res = hlist_entry(iter, struct dlm_lock_resource, + hash_node); dlm_lockres_get(res); - /* this should unhash the lockres - * and exit with dlm->spinlock */ - mlog(0, "purging res=%p\n", res); - if (dlm_lockres_is_dirty(dlm, res)) { - /* HACK! this should absolutely go. - * need to figure out why some empty - * lockreses are still marked dirty */ - mlog(ML_ERROR, "lockres %.*s dirty!\n", - res->lockname.len, res->lockname.name); - - spin_unlock(&dlm->spinlock); - dlm_kick_thread(dlm, res); - wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res)); - dlm_lockres_put(res); - goto restart; - } - dlm_purge_lockres(dlm, res); + /* migrate, if necessary. this will drop the dlm + * spinlock and retake it if it does migration. */ + dropped = dlm_empty_lockres(dlm, res); + + spin_lock(&res->spinlock); + __dlm_lockres_calc_usage(dlm, res); + iter = res->hash_node.next; + spin_unlock(&res->spinlock); + dlm_lockres_put(res); + + cond_resched_lock(&dlm->spinlock); + + if (dropped) + goto redo_bucket; } + num += n; + mlog(0, "%s: touched %d lockreses in bucket %d " + "(tot=%d)\n", dlm->name, n, i, num); } spin_unlock(&dlm->spinlock); - + wake_up(&dlm->dlm_thread_wq); + + /* let the dlm thread take care of purging, keep scanning until + * nothing remains in the hash */ + if (num) { + mlog(0, "%s: %d lock resources in hash last pass\n", + dlm->name, num); + ret = -EAGAIN; + } mlog(0, "DONE Migrating locks from domain %s\n", dlm->name); + return ret; } static int dlm_no_joining_node(struct dlm_ctxt *dlm) @@ -571,7 +619,9 @@ void dlm_unregister_domain(struct dlm_ct /* We changed dlm state, notify the thread */ dlm_kick_thread(dlm, NULL); - dlm_migrate_all_locks(dlm); + while (dlm_migrate_all_locks(dlm)) { + mlog(0, "%s: more migration to do\n", dlm->name); + } dlm_mark_domain_leaving(dlm); dlm_leave_domain(dlm); dlm_complete_dlm_shutdown(dlm); @@ -1082,6 +1132,13 @@ static int dlm_register_domain_handlers( if (status) goto bail; + status = o2net_register_handler(DLM_DEREF_LOCKRES_MSG, dlm->key, + sizeof(struct dlm_deref_lockres), + dlm_deref_lockres_handler, + dlm, &dlm->dlm_domain_handlers); + if (status) + goto bail; + status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key, sizeof(struct dlm_migrate_request), dlm_migrate_request_handler, diff --git a/fs/ocfs2/dlm/dlmlock.c b/fs/ocfs2/dlm/dlmlock.c index e5ca3db..ac91a76 100644 --- a/fs/ocfs2/dlm/dlmlock.c +++ b/fs/ocfs2/dlm/dlmlock.c @@ -163,6 +163,10 @@ static enum dlm_status dlmlock_master(st kick_thread = 1; } } + /* reduce the inflight count, this may result in the lockres + * being purged below during calc_usage */ + if (lock->ml.node == dlm->node_num) + dlm_lockres_drop_inflight_ref(dlm, res); spin_unlock(&res->spinlock); wake_up(&res->wq); diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 0ad8720..a65a877 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -99,9 +99,9 @@ static void dlm_mle_node_up(struct dlm_c int idx); static void dlm_assert_master_worker(struct dlm_work_item *item, void *data); -static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname, - unsigned int namelen, void *nodemap, - u32 flags); +static int dlm_do_assert_master(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res, + void *nodemap, u32 flags); static inline int dlm_mle_equal(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle, @@ -237,7 +237,8 @@ static int dlm_find_mle(struct dlm_ctxt struct dlm_master_list_entry **mle, char *name, unsigned int namelen); -static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to); +static int dlm_do_master_request(struct dlm_lock_resource *res, + struct dlm_master_list_entry *mle, int to); static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm, @@ -687,6 +688,7 @@ static void dlm_init_lockres(struct dlm_ INIT_LIST_HEAD(&res->purge); atomic_set(&res->asts_reserved, 0); res->migration_pending = 0; + res->inflight_locks = 0; kref_init(&res->refs); @@ -700,6 +702,7 @@ static void dlm_init_lockres(struct dlm_ res->last_used = 0; memset(res->lvb, 0, DLM_LVB_LEN); + memset(res->refmap, 0, sizeof(res->refmap)); } struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm, @@ -722,6 +725,42 @@ struct dlm_lock_resource *dlm_new_lockre return res; } +void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res, + int new_lockres, + const char *file, + int line) +{ + if (!new_lockres) + assert_spin_locked(&res->spinlock); + + if (!test_bit(dlm->node_num, res->refmap)) { + BUG_ON(res->inflight_locks != 0); + dlm_lockres_set_refmap_bit(dlm->node_num, res); + } + res->inflight_locks++; + mlog(0, "%s:%.*s: inflight++: now %u\n", + dlm->name, res->lockname.len, res->lockname.name, + res->inflight_locks); +} + +void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res, + const char *file, + int line) +{ + assert_spin_locked(&res->spinlock); + + BUG_ON(res->inflight_locks == 0); + res->inflight_locks--; + mlog(0, "%s:%.*s: inflight--: now %u\n", + dlm->name, res->lockname.len, res->lockname.name, + res->inflight_locks); + if (res->inflight_locks == 0) + dlm_lockres_clear_refmap_bit(dlm->node_num, res); + wake_up(&res->wq); +} + /* * lookup a lock resource by name. * may already exist in the hashtable. @@ -752,6 +791,7 @@ struct dlm_lock_resource * dlm_get_lock_ unsigned int hash; int tries = 0; int bit, wait_on_recovery = 0; + int drop_inflight_if_nonlocal = 0; BUG_ON(!lockid); @@ -761,9 +801,30 @@ struct dlm_lock_resource * dlm_get_lock_ lookup: spin_lock(&dlm->spinlock); - tmpres = __dlm_lookup_lockres(dlm, lockid, namelen, hash); + tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash); if (tmpres) { + int dropping_ref = 0; + + spin_lock(&tmpres->spinlock); + if (tmpres->owner == dlm->node_num) { + BUG_ON(tmpres->state & DLM_LOCK_RES_DROPPING_REF); + dlm_lockres_grab_inflight_ref(dlm, tmpres); + } else if (tmpres->state & DLM_LOCK_RES_DROPPING_REF) + dropping_ref = 1; + spin_unlock(&tmpres->spinlock); spin_unlock(&dlm->spinlock); + + /* wait until done messaging the master, drop our ref to allow + * the lockres to be purged, start over. */ + if (dropping_ref) { + spin_lock(&tmpres->spinlock); + __dlm_wait_on_lockres_flags(tmpres, DLM_LOCK_RES_DROPPING_REF); + spin_unlock(&tmpres->spinlock); + dlm_lockres_put(tmpres); + tmpres = NULL; + goto lookup; + } + mlog(0, "found in hash!\n"); if (res) dlm_lockres_put(res); @@ -793,6 +854,7 @@ lookup: spin_lock(&res->spinlock); dlm_change_lockres_owner(dlm, res, dlm->node_num); __dlm_insert_lockres(dlm, res); + dlm_lockres_grab_inflight_ref(dlm, res); spin_unlock(&res->spinlock); spin_unlock(&dlm->spinlock); /* lockres still marked IN_PROGRESS */ @@ -805,29 +867,40 @@ lookup: /* if we found a block, wait for lock to be mastered by another node */ blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen); if (blocked) { + int mig; if (mle->type == DLM_MLE_MASTER) { mlog(ML_ERROR, "master entry for nonexistent lock!\n"); BUG(); - } else if (mle->type == DLM_MLE_MIGRATION) { - /* migration is in progress! */ - /* the good news is that we now know the - * "current" master (mle->master). */ - + } + mig = (mle->type == DLM_MLE_MIGRATION); + /* if there is a migration in progress, let the migration + * finish before continuing. we can wait for the absence + * of the MIGRATION mle: either the migrate finished or + * one of the nodes died and the mle was cleaned up. + * if there is a BLOCK here, but it already has a master + * set, we are too late. the master does not have a ref + * for us in the refmap. detach the mle and drop it. + * either way, go back to the top and start over. */ + if (mig || mle->master != O2NM_MAX_NODES) { + BUG_ON(mig && mle->master == dlm->node_num); + /* we arrived too late. the master does not + * have a ref for us. retry. */ + mlog(0, "%s:%.*s: late on %s\n", + dlm->name, namelen, lockid, + mig ? "MIGRATION" : "BLOCK"); spin_unlock(&dlm->master_lock); - assert_spin_locked(&dlm->spinlock); - - /* set the lockres owner and hash it */ - spin_lock(&res->spinlock); - dlm_set_lockres_owner(dlm, res, mle->master); - __dlm_insert_lockres(dlm, res); - spin_unlock(&res->spinlock); spin_unlock(&dlm->spinlock); /* master is known, detach */ - dlm_mle_detach_hb_events(dlm, mle); + if (!mig) + dlm_mle_detach_hb_events(dlm, mle); dlm_put_mle(mle); mle = NULL; - goto wake_waiters; + /* this is lame, but we cant wait on either + * the mle or lockres waitqueue here */ + if (mig) + msleep(100); + goto lookup; } } else { /* go ahead and try to master lock on this node */ @@ -858,6 +931,13 @@ lookup: /* finally add the lockres to its hash bucket */ __dlm_insert_lockres(dlm, res); + /* since this lockres is new it doesnt not require the spinlock */ + dlm_lockres_grab_inflight_ref_new(dlm, res); + + /* if this node does not become the master make sure to drop + * this inflight reference below */ + drop_inflight_if_nonlocal = 1; + /* get an extra ref on the mle in case this is a BLOCK * if so, the creator of the BLOCK may try to put the last * ref at this time in the assert master handler, so we @@ -910,7 +990,7 @@ redo_request: ret = -EINVAL; dlm_node_iter_init(mle->vote_map, &iter); while ((nodenum = dlm_node_iter_next(&iter)) >= 0) { - ret = dlm_do_master_request(mle, nodenum); + ret = dlm_do_master_request(res, mle, nodenum); if (ret < 0) mlog_errno(ret); if (mle->master != O2NM_MAX_NODES) { @@ -960,6 +1040,8 @@ wait: wake_waiters: spin_lock(&res->spinlock); + if (res->owner != dlm->node_num && drop_inflight_if_nonlocal) + dlm_lockres_drop_inflight_ref(dlm, res); res->state &= ~DLM_LOCK_RES_IN_PROGRESS; spin_unlock(&res->spinlock); wake_up(&res->wq); @@ -998,7 +1080,7 @@ recheck: /* this will cause the master to re-assert across * the whole cluster, freeing up mles */ if (res->owner != dlm->node_num) { - ret = dlm_do_master_request(mle, res->owner); + ret = dlm_do_master_request(res, mle, res->owner); if (ret < 0) { /* give recovery a chance to run */ mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret); @@ -1062,6 +1144,8 @@ recheck: * now tell other nodes that I am * mastering this. */ mle->master = dlm->node_num; + /* ref was grabbed in get_lock_resource + * will be dropped in dlmlock_master */ assert = 1; sleep = 0; } @@ -1087,7 +1171,8 @@ recheck: (atomic_read(&mle->woken) == 1), timeo); if (res->owner == O2NM_MAX_NODES) { - mlog(0, "waiting again\n"); + mlog(0, "%s:%.*s: waiting again\n", dlm->name, + res->lockname.len, res->lockname.name); goto recheck; } mlog(0, "done waiting, master is %u\n", res->owner); @@ -1100,8 +1185,7 @@ recheck: m = dlm->node_num; mlog(0, "about to master %.*s here, this=%u\n", res->lockname.len, res->lockname.name, m); - ret = dlm_do_assert_master(dlm, res->lockname.name, - res->lockname.len, mle->vote_map, 0); + ret = dlm_do_assert_master(dlm, res, mle->vote_map, 0); if (ret) { /* This is a failure in the network path, * not in the response to the assert_master @@ -1117,6 +1201,8 @@ recheck: /* set the lockres owner */ spin_lock(&res->spinlock); + /* mastery reference obtained either during + * assert_master_handler or in get_lock_resource */ dlm_change_lockres_owner(dlm, res, m); spin_unlock(&res->spinlock); @@ -1283,7 +1369,8 @@ static int dlm_restart_lock_mastery(stru * */ -static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to) +static int dlm_do_master_request(struct dlm_lock_resource *res, + struct dlm_master_list_entry *mle, int to) { struct dlm_ctxt *dlm = mle->dlm; struct dlm_master_request request; @@ -1339,6 +1426,9 @@ again: case DLM_MASTER_RESP_YES: set_bit(to, mle->response_map); mlog(0, "node %u is the master, response=YES\n", to); + mlog(0, "%s:%.*s: master node %u now knows I have a " + "reference\n", dlm->name, res->lockname.len, + res->lockname.name, to); mle->master = to; break; case DLM_MASTER_RESP_NO: @@ -1428,8 +1518,10 @@ way_up_top: } if (res->owner == dlm->node_num) { + mlog(0, "%s:%.*s: setting bit %u in refmap\n", + dlm->name, namelen, name, request->node_idx); + dlm_lockres_set_refmap_bit(request->node_idx, res); spin_unlock(&res->spinlock); - // mlog(0, "this node is the master\n"); response = DLM_MASTER_RESP_YES; if (mle) kmem_cache_free(dlm_mle_cache, mle); @@ -1477,7 +1569,6 @@ way_up_top: mlog(0, "node %u is master, but trying to migrate to " "node %u.\n", tmpmle->master, tmpmle->new_master); if (tmpmle->master == dlm->node_num) { - response = DLM_MASTER_RESP_YES; mlog(ML_ERROR, "no owner on lockres, but this " "node is trying to migrate it to %u?!\n", tmpmle->new_master); @@ -1494,6 +1585,10 @@ way_up_top: * go back and clean the mles on any * other nodes */ dispatch_assert = 1; + dlm_lockres_set_refmap_bit(request->node_idx, res); + mlog(0, "%s:%.*s: setting bit %u in refmap\n", + dlm->name, namelen, name, + request->node_idx); } else response = DLM_MASTER_RESP_NO; } else { @@ -1607,15 +1702,17 @@ send_response: * can periodically run all locks owned by this node * and re-assert across the cluster... */ -static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname, - unsigned int namelen, void *nodemap, - u32 flags) +int dlm_do_assert_master(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res, + void *nodemap, u32 flags) { struct dlm_assert_master assert; int to, tmpret; struct dlm_node_iter iter; int ret = 0; int reassert; + const char *lockname = res->lockname.name; + unsigned int namelen = res->lockname.len; BUG_ON(namelen > O2NM_MAX_NAME_LEN); again: @@ -1647,6 +1744,7 @@ again: mlog(0, "link to %d went down!\n", to); /* any nonzero status return will do */ ret = tmpret; + r = 0; } else if (r < 0) { /* ok, something horribly messed. kill thyself. */ mlog(ML_ERROR,"during assert master of %.*s to %u, " @@ -1661,12 +1759,29 @@ again: spin_unlock(&dlm->master_lock); spin_unlock(&dlm->spinlock); BUG(); - } else if (r == EAGAIN) { + } + + if (r & DLM_ASSERT_RESPONSE_REASSERT && + !(r & DLM_ASSERT_RESPONSE_MASTERY_REF)) { + mlog(ML_ERROR, "%.*s: very strange, " + "master MLE but no lockres on %u\n", + namelen, lockname, to); + } + + if (r & DLM_ASSERT_RESPONSE_REASSERT) { mlog(0, "%.*s: node %u create mles on other " "nodes and requests a re-assert\n", namelen, lockname, to); reassert = 1; } + if (r & DLM_ASSERT_RESPONSE_MASTERY_REF) { + mlog(0, "%.*s: node %u has a reference to this " + "lockres, set the bit in the refmap\n", + namelen, lockname, to); + spin_lock(&res->spinlock); + dlm_lockres_set_refmap_bit(to, res); + spin_unlock(&res->spinlock); + } } if (reassert) @@ -1693,7 +1808,7 @@ int dlm_assert_master_handler(struct o2n char *name; unsigned int namelen, hash; u32 flags; - int master_request = 0; + int master_request = 0, have_lockres_ref = 0; int ret = 0; if (!dlm_grab(dlm)) @@ -1864,6 +1979,7 @@ ok: dlm_change_lockres_owner(dlm, res, mle->master); } spin_unlock(&res->spinlock); + have_lockres_ref = 1; } /* master is known, detach if not already detached. @@ -1918,7 +2034,19 @@ done: dlm_put(dlm); if (master_request) { mlog(0, "need to tell master to reassert\n"); - ret = EAGAIN; // positive. negative would shoot down the node. + /* positive. negative would shoot down the node. */ + ret |= DLM_ASSERT_RESPONSE_REASSERT; + if (!have_lockres_ref) { + mlog(ML_ERROR, "strange, got assert from %u, MASTER " + "mle present here for %s:%.*s, but no lockres!\n", + assert->node_idx, dlm->name, namelen, name); + } + } + if (have_lockres_ref) { + /* let the master know we have a reference to the lockres */ + ret |= DLM_ASSERT_RESPONSE_MASTERY_REF; + mlog(0, "%s:%.*s: got assert from %u, need a ref\n", + dlm->name, namelen, name, assert->node_idx); } return ret; @@ -2023,9 +2151,7 @@ static void dlm_assert_master_worker(str * even if one or more nodes die */ mlog(0, "worker about to master %.*s here, this=%u\n", res->lockname.len, res->lockname.name, dlm->node_num); - ret = dlm_do_assert_master(dlm, res->lockname.name, - res->lockname.len, - nodemap, flags); + ret = dlm_do_assert_master(dlm, res, nodemap, flags); if (ret < 0) { /* no need to restart, we are done */ if (!dlm_is_host_down(ret)) @@ -2097,14 +2223,113 @@ static int dlm_pre_master_reco_lockres(s return ret; } +/* + * DLM_DEREF_LOCKRES_MSG + */ + +int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) +{ + struct dlm_deref_lockres deref; + int ret = 0, r; + const char *lockname; + unsigned int namelen; + + lockname = res->lockname.name; + namelen = res->lockname.len; + BUG_ON(namelen > O2NM_MAX_NAME_LEN); + + mlog(0, "%s:%.*s: sending deref to %d\n", + dlm->name, namelen, lockname, res->owner); + memset(&deref, 0, sizeof(deref)); + deref.node_idx = dlm->node_num; + deref.namelen = namelen; + memcpy(deref.name, lockname, namelen); + + ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key, + &deref, sizeof(deref), res->owner, &r); + if (ret < 0) + mlog_errno(ret); + else if (r < 0) { + /* BAD. other node says I did not have a ref. */ + mlog(ML_ERROR,"while dropping ref on %s:%.*s " + "(master=%u) got %d.\n", dlm->name, namelen, + lockname, res->owner, r); + dlm_print_one_lock_resource(res); + BUG(); + } + return ret; +} + +int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data) +{ + struct dlm_ctxt *dlm = data; + struct dlm_deref_lockres *deref = (struct dlm_deref_lockres *)msg->buf; + struct dlm_lock_resource *res = NULL; + char *name; + unsigned int namelen; + int ret = -EINVAL; + u8 node; + unsigned int hash; + + if (!dlm_grab(dlm)) + return 0; + + name = deref->name; + namelen = deref->namelen; + node = deref->node_idx; + + if (namelen > DLM_LOCKID_NAME_MAX) { + mlog(ML_ERROR, "Invalid name length!"); + goto done; + } + if (deref->node_idx >= O2NM_MAX_NODES) { + mlog(ML_ERROR, "Invalid node number: %u\n", node); + goto done; + } + + hash = dlm_lockid_hash(name, namelen); + + spin_lock(&dlm->spinlock); + res = __dlm_lookup_lockres_full(dlm, name, namelen, hash); + if (!res) { + spin_unlock(&dlm->spinlock); + mlog(ML_ERROR, "%s:%.*s: bad lockres name\n", + dlm->name, namelen, name); + goto done; + } + spin_unlock(&dlm->spinlock); + + spin_lock(&res->spinlock); + BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF); + if (test_bit(node, res->refmap)) { + ret = 0; + dlm_lockres_clear_refmap_bit(node, res); + } else { + mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref " + "but it is already dropped!\n", dlm->name, namelen, + name, node); + __dlm_print_one_lock_resource(res); + } + spin_unlock(&res->spinlock); + + if (!ret) + dlm_lockres_calc_usage(dlm, res); +done: + if (res) + dlm_lockres_put(res); + dlm_put(dlm); + return ret; +} + /* * DLM_MIGRATE_LOCKRES */ -int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, - u8 target) +static int dlm_migrate_lockres(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res, + u8 target) { struct dlm_master_list_entry *mle = NULL; struct dlm_master_list_entry *oldmle = NULL; @@ -2376,6 +2601,53 @@ leave: return ret; } +#define DLM_MIGRATION_RETRY_MS 100 + +/* Should be called only after beginning the domain leave process. + * There should not be any remaining locks on nonlocal lock resources, + * and there should be no local locks left on locally mastered resources. + * + * Called with the dlm spinlock held, may drop it to do migration, but + * will re-acquire before exit. + * + * Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped */ +int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) +{ + int ret; + int lock_dropped = 0; + + if (res->owner != dlm->node_num) { + if (!__dlm_lockres_unused(res)) { + mlog(ML_ERROR, "%s:%.*s: this node is not master, " + "trying to free this but locks remain\n", + dlm->name, res->lockname.len, res->lockname.name); + } + goto leave; + } + + /* Wheee! Migrate lockres here! Will sleep so drop spinlock. */ + spin_unlock(&dlm->spinlock); + lock_dropped = 1; + while (1) { + ret = dlm_migrate_lockres(dlm, res, O2NM_MAX_NODES); + if (ret >= 0) + break; + if (ret == -ENOTEMPTY) { + mlog(ML_ERROR, "lockres %.*s still has local locks!\n", + res->lockname.len, res->lockname.name); + BUG(); + } + + mlog(0, "lockres %.*s: migrate failed, " + "retrying\n", res->lockname.len, + res->lockname.name); + msleep(DLM_MIGRATION_RETRY_MS); + } + spin_lock(&dlm->spinlock); +leave: + return lock_dropped; +} + int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock) { int ret; @@ -2405,7 +2677,8 @@ static int dlm_migration_can_proceed(str return can_proceed; } -int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) +static int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res) { int ret; spin_lock(&res->spinlock); @@ -2434,8 +2707,15 @@ static int dlm_mark_lockres_migrating(st __dlm_lockres_reserve_ast(res); spin_unlock(&res->spinlock); - /* now flush all the pending asts.. hang out for a bit */ + /* now flush all the pending asts */ dlm_kick_thread(dlm, res); + /* before waiting on DIRTY, block processes which may + * try to dirty the lockres before MIGRATING is set */ + spin_lock(&res->spinlock); + BUG_ON(res->state & DLM_LOCK_RES_BLOCK_DIRTY); + res->state |= DLM_LOCK_RES_BLOCK_DIRTY; + spin_unlock(&res->spinlock); + /* now wait on any pending asts and the DIRTY state */ wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res)); dlm_lockres_release_ast(dlm, res); @@ -2461,6 +2741,13 @@ again: mlog(0, "trying again...\n"); goto again; } + /* now that we are sure the MIGRATING state is there, drop + * the unneded state which blocked threads trying to DIRTY */ + spin_lock(&res->spinlock); + BUG_ON(!(res->state & DLM_LOCK_RES_BLOCK_DIRTY)); + BUG_ON(!(res->state & DLM_LOCK_RES_MIGRATING)); + res->state &= ~DLM_LOCK_RES_BLOCK_DIRTY; + spin_unlock(&res->spinlock); /* did the target go down or die? */ spin_lock(&dlm->spinlock); @@ -2490,7 +2777,7 @@ static void dlm_remove_nonlocal_locks(st { struct list_head *iter, *iter2; struct list_head *queue = &res->granted; - int i; + int i, bit; struct dlm_lock *lock; assert_spin_locked(&res->spinlock); @@ -2508,12 +2795,28 @@ static void dlm_remove_nonlocal_locks(st BUG_ON(!list_empty(&lock->bast_list)); BUG_ON(lock->ast_pending); BUG_ON(lock->bast_pending); + dlm_lockres_clear_refmap_bit(lock->ml.node, res); list_del_init(&lock->list); dlm_lock_put(lock); } } queue++; } + bit = 0; + while (1) { + bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit); + if (bit >= O2NM_MAX_NODES) + break; + /* do not clear the local node reference, if there is a + * process holding this, let it drop the ref itself */ + if (bit != dlm->node_num) { + mlog(0, "%s:%.*s: node %u had a ref to this " + "migrating lockres, clearing\n", dlm->name, + res->lockname.len, res->lockname.name, bit); + dlm_lockres_clear_refmap_bit(bit, res); + } + bit++; + } } /* for now this is not too intelligent. we will @@ -2601,6 +2904,16 @@ static int dlm_do_migrate_request(struct mlog(0, "migrate request (node %u) returned %d!\n", nodenum, status); ret = status; + } else if (status == DLM_MIGRATE_RESPONSE_MASTERY_REF) { + /* during the migration request we short-circuited + * the mastery of the lockres. make sure we have + * a mastery ref for nodenum */ + mlog(0, "%s:%.*s: need ref for node %u\n", + dlm->name, res->lockname.len, res->lockname.name, + nodenum); + spin_lock(&res->spinlock); + dlm_lockres_set_refmap_bit(nodenum, res); + spin_unlock(&res->spinlock); } } @@ -2745,7 +3058,13 @@ static int dlm_add_migration_mle(struct /* remove it from the list so that only one * mle will be found */ list_del_init(&tmp->list); - __dlm_mle_detach_hb_events(dlm, mle); + /* this was obviously WRONG. mle is uninited here. should be tmp. */ + __dlm_mle_detach_hb_events(dlm, tmp); + ret = DLM_MIGRATE_RESPONSE_MASTERY_REF; + mlog(0, "%s:%.*s: master=%u, newmaster=%u, " + "telling master to get ref for cleared out mle " + "during migration\n", dlm->name, namelen, name, + master, new_master); } spin_unlock(&tmp->spinlock); } @@ -2753,6 +3072,8 @@ static int dlm_add_migration_mle(struct /* now add a migration mle to the tail of the list */ dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen); mle->new_master = new_master; + /* the new master will be sending an assert master for this. + * at that point we will get the refmap reference */ mle->master = master; /* do this for consistency with other mle types */ set_bit(new_master, mle->maybe_map); @@ -2902,6 +3223,13 @@ int dlm_finish_migration(struct dlm_ctxt clear_bit(dlm->node_num, iter.node_map); spin_unlock(&dlm->spinlock); + /* ownership of the lockres is changing. account for the + * mastery reference here since old_master will briefly have + * a reference after the migration completes */ + spin_lock(&res->spinlock); + dlm_lockres_set_refmap_bit(old_master, res); + spin_unlock(&res->spinlock); + mlog(0, "now time to do a migrate request to other nodes\n"); ret = dlm_do_migrate_request(dlm, res, old_master, dlm->node_num, &iter); @@ -2914,8 +3242,7 @@ int dlm_finish_migration(struct dlm_ctxt res->lockname.len, res->lockname.name); /* this call now finishes out the nodemap * even if one or more nodes die */ - ret = dlm_do_assert_master(dlm, res->lockname.name, - res->lockname.len, iter.node_map, + ret = dlm_do_assert_master(dlm, res, iter.node_map, DLM_ASSERT_MASTER_FINISH_MIGRATION); if (ret < 0) { /* no longer need to retry. all living nodes contacted. */ @@ -2927,8 +3254,7 @@ int dlm_finish_migration(struct dlm_ctxt set_bit(old_master, iter.node_map); mlog(0, "doing assert master of %.*s back to %u\n", res->lockname.len, res->lockname.name, old_master); - ret = dlm_do_assert_master(dlm, res->lockname.name, - res->lockname.len, iter.node_map, + ret = dlm_do_assert_master(dlm, res, iter.node_map, DLM_ASSERT_MASTER_FINISH_MIGRATION); if (ret < 0) { mlog(0, "assert master to original master failed " diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index 367a11e..f93315c 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -1129,6 +1129,11 @@ static int dlm_send_mig_lockres_msg(stru if (total_locks == mres_total_locks) mres->flags |= DLM_MRES_ALL_DONE; + mlog(0, "%s:%.*s: sending mig lockres (%s) to %u\n", + dlm->name, res->lockname.len, res->lockname.name, + orig_flags & DLM_MRES_MIGRATION ? "migrate" : "recovery", + send_to); + /* send it */ ret = o2net_send_message(DLM_MIG_LOCKRES_MSG, dlm->key, mres, sz, send_to, &status); @@ -1213,6 +1218,34 @@ static int dlm_add_lock_to_array(struct return 0; } +static void dlm_add_dummy_lock(struct dlm_ctxt *dlm, + struct dlm_migratable_lockres *mres) +{ + struct dlm_lock dummy; + memset(&dummy, 0, sizeof(dummy)); + dummy.ml.cookie = 0; + dummy.ml.type = LKM_IVMODE; + dummy.ml.convert_type = LKM_IVMODE; + dummy.ml.highest_blocked = LKM_IVMODE; + dummy.lksb = NULL; + dummy.ml.node = dlm->node_num; + dlm_add_lock_to_array(&dummy, mres, DLM_BLOCKED_LIST); +} + +static inline int dlm_is_dummy_lock(struct dlm_ctxt *dlm, + struct dlm_migratable_lock *ml, + u8 *nodenum) +{ + if (unlikely(ml->cookie == 0 && + ml->type == LKM_IVMODE && + ml->convert_type == LKM_IVMODE && + ml->highest_blocked == LKM_IVMODE && + ml->list == DLM_BLOCKED_LIST)) { + *nodenum = ml->node; + return 1; + } + return 0; +} int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, struct dlm_migratable_lockres *mres, @@ -1260,6 +1293,14 @@ int dlm_send_one_lockres(struct dlm_ctxt goto error; } } + if (total_locks == 0) { + /* send a dummy lock to indicate a mastery reference only */ + mlog(0, "%s:%.*s: sending dummy lock to %u, %s\n", + dlm->name, res->lockname.len, res->lockname.name, + send_to, flags & DLM_MRES_RECOVERY ? "recovery" : + "migration"); + dlm_add_dummy_lock(dlm, mres); + } /* flush any remaining locks */ ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks); if (ret < 0) @@ -1386,13 +1427,16 @@ int dlm_mig_lockres_handler(struct o2net /* add an extra ref for just-allocated lockres * otherwise the lockres will be purged immediately */ dlm_lockres_get(res); - } /* at this point we have allocated everything we need, * and we have a hashed lockres with an extra ref and * the proper res->state flags. */ ret = 0; + spin_lock(&res->spinlock); + /* drop this either when master requery finds a different master + * or when a lock is added by the recovery worker */ + dlm_lockres_grab_inflight_ref(dlm, res); if (mres->master == DLM_LOCK_RES_OWNER_UNKNOWN) { /* migration cannot have an unknown master */ BUG_ON(!(mres->flags & DLM_MRES_RECOVERY)); @@ -1400,10 +1444,11 @@ int dlm_mig_lockres_handler(struct o2net "unknown owner.. will need to requery: " "%.*s\n", mres->lockname_len, mres->lockname); } else { - spin_lock(&res->spinlock); + /* take a reference now to pin the lockres, drop it + * when locks are added in the worker */ dlm_change_lockres_owner(dlm, res, dlm->node_num); - spin_unlock(&res->spinlock); } + spin_unlock(&res->spinlock); /* queue up work for dlm_mig_lockres_worker */ dlm_grab(dlm); /* get an extra ref for the work item */ @@ -1459,6 +1504,9 @@ again: "this node will take it.\n", res->lockname.len, res->lockname.name); } else { + spin_lock(&res->spinlock); + dlm_lockres_drop_inflight_ref(dlm, res); + spin_unlock(&res->spinlock); mlog(0, "master needs to respond to sender " "that node %u still owns %.*s\n", real_master, res->lockname.len, @@ -1660,21 +1708,38 @@ static int dlm_process_recovery_data(str { struct dlm_migratable_lock *ml; struct list_head *queue; + struct list_head *tmpq = NULL; struct dlm_lock *newlock = NULL; struct dlm_lockstatus *lksb = NULL; int ret = 0; - int i, bad; + int i, j, bad; struct list_head *iter; struct dlm_lock *lock = NULL; + u8 from = O2NM_MAX_NODES; + unsigned int added = 0; mlog(0, "running %d locks for this lockres\n", mres->num_locks); for (i=0; inum_locks; i++) { ml = &(mres->ml[i]); + + if (dlm_is_dummy_lock(dlm, ml, &from)) { + /* placeholder, just need to set the refmap bit */ + BUG_ON(mres->num_locks != 1); + mlog(0, "%s:%.*s: dummy lock for %u\n", + dlm->name, mres->lockname_len, mres->lockname, + from); + spin_lock(&res->spinlock); + dlm_lockres_set_refmap_bit(from, res); + spin_unlock(&res->spinlock); + added++; + break; + } BUG_ON(ml->highest_blocked != LKM_IVMODE); newlock = NULL; lksb = NULL; queue = dlm_list_num_to_pointer(res, ml->list); + tmpq = NULL; /* if the lock is for the local node it needs to * be moved to the proper location within the queue. @@ -1684,11 +1749,16 @@ static int dlm_process_recovery_data(str BUG_ON(!(mres->flags & DLM_MRES_MIGRATION)); spin_lock(&res->spinlock); - list_for_each(iter, queue) { - lock = list_entry (iter, struct dlm_lock, list); - if (lock->ml.cookie != ml->cookie) - lock = NULL; - else + for (j = DLM_GRANTED_LIST; j <= DLM_BLOCKED_LIST; j++) { + tmpq = dlm_list_idx_to_ptr(res, j); + list_for_each(iter, tmpq) { + lock = list_entry (iter, struct dlm_lock, list); + if (lock->ml.cookie != ml->cookie) + lock = NULL; + else + break; + } + if (lock) break; } @@ -1700,10 +1770,18 @@ static int dlm_process_recovery_data(str "with cookie %u:%llu!\n", dlm_get_lock_cookie_node(c), dlm_get_lock_cookie_seq(c)); + __dlm_print_one_lock_resource(res); BUG(); } BUG_ON(lock->ml.node != ml->node); + if (tmpq != queue) { + mlog(0, "lock was on %u instead of %u for %.*s\n", + j, ml->list, res->lockname.len, res->lockname.name); + spin_unlock(&res->spinlock); + continue; + } + /* see NOTE above about why we do not update * to match the master here */ @@ -1711,6 +1789,7 @@ static int dlm_process_recovery_data(str /* do not alter lock refcount. switching lists. */ list_move_tail(&lock->list, queue); spin_unlock(&res->spinlock); + added++; mlog(0, "just reordered a local lock!\n"); continue; @@ -1817,12 +1896,24 @@ skip_lvb: if (!bad) { dlm_lock_get(newlock); list_add_tail(&newlock->list, queue); + mlog(0, "%s:%.*s: added lock for node %u, " + "setting refmap bit\n", dlm->name, + res->lockname.len, res->lockname.name, ml->node); + dlm_lockres_set_refmap_bit(ml->node, res); + added++; } spin_unlock(&res->spinlock); } mlog(0, "done running all the locks\n"); leave: + /* balance the ref taken when the work was queued */ + if (added > 0) { + spin_lock(&res->spinlock); + dlm_lockres_drop_inflight_ref(dlm, res); + spin_unlock(&res->spinlock); + } + if (ret < 0) { mlog_errno(ret); if (newlock) @@ -1935,9 +2026,11 @@ static void dlm_finish_local_lockres_rec if (res->owner == dead_node) { list_del_init(&res->recovering); spin_lock(&res->spinlock); + /* new_master has our reference from + * the lock state sent during recovery */ dlm_change_lockres_owner(dlm, res, new_master); res->state &= ~DLM_LOCK_RES_RECOVERING; - if (!__dlm_lockres_unused(res)) + if (__dlm_lockres_has_locks(res)) __dlm_dirty_lockres(dlm, res); spin_unlock(&res->spinlock); wake_up(&res->wq); @@ -1977,9 +2070,11 @@ static void dlm_finish_local_lockres_rec dlm_lockres_put(res); } spin_lock(&res->spinlock); + /* new_master has our reference from + * the lock state sent during recovery */ dlm_change_lockres_owner(dlm, res, new_master); res->state &= ~DLM_LOCK_RES_RECOVERING; - if (!__dlm_lockres_unused(res)) + if (__dlm_lockres_has_locks(res)) __dlm_dirty_lockres(dlm, res); spin_unlock(&res->spinlock); wake_up(&res->wq); @@ -2048,6 +2143,7 @@ static void dlm_free_dead_locks(struct d { struct list_head *iter, *tmpiter; struct dlm_lock *lock; + unsigned int freed = 0; /* this node is the lockres master: * 1) remove any stale locks for the dead node @@ -2062,6 +2158,7 @@ static void dlm_free_dead_locks(struct d if (lock->ml.node == dead_node) { list_del_init(&lock->list); dlm_lock_put(lock); + freed++; } } list_for_each_safe(iter, tmpiter, &res->converting) { @@ -2069,6 +2166,7 @@ static void dlm_free_dead_locks(struct d if (lock->ml.node == dead_node) { list_del_init(&lock->list); dlm_lock_put(lock); + freed++; } } list_for_each_safe(iter, tmpiter, &res->blocked) { @@ -2076,9 +2174,23 @@ static void dlm_free_dead_locks(struct d if (lock->ml.node == dead_node) { list_del_init(&lock->list); dlm_lock_put(lock); + freed++; } } + if (freed) { + mlog(0, "%s:%.*s: freed %u locks for dead node %u, " + "dropping ref from lockres\n", dlm->name, + res->lockname.len, res->lockname.name, freed, dead_node); + BUG_ON(!test_bit(dead_node, res->refmap)); + dlm_lockres_clear_refmap_bit(dead_node, res); + } else if (test_bit(dead_node, res->refmap)) { + mlog(0, "%s:%.*s: dead node %u had a ref, but had " + "no locks and had not purged before dying\n", dlm->name, + res->lockname.len, res->lockname.name, dead_node); + dlm_lockres_clear_refmap_bit(dead_node, res); + } + /* do not kick thread yet */ __dlm_dirty_lockres(dlm, res); } @@ -2141,9 +2253,21 @@ static void dlm_do_local_recovery_cleanu spin_lock(&res->spinlock); /* zero the lvb if necessary */ dlm_revalidate_lvb(dlm, res, dead_node); - if (res->owner == dead_node) + if (res->owner == dead_node) { + if (res->state & DLM_LOCK_RES_DROPPING_REF) + mlog(0, "%s:%.*s: owned by " + "dead node %u, this node was " + "dropping its ref when it died. " + "continue, dropping the flag.\n", + dlm->name, res->lockname.len, + res->lockname.name, dead_node); + + /* the wake_up for this will happen when the + * RECOVERING flag is dropped later */ + res->state &= ~DLM_LOCK_RES_DROPPING_REF; + dlm_move_lockres_to_recovery_list(dlm, res); - else if (res->owner == dlm->node_num) { + } else if (res->owner == dlm->node_num) { dlm_free_dead_locks(dlm, res, dead_node); __dlm_lockres_calc_usage(dlm, res); } diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c index 0c822f3..3b94e4d 100644 --- a/fs/ocfs2/dlm/dlmthread.c +++ b/fs/ocfs2/dlm/dlmthread.c @@ -54,9 +54,6 @@ #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_ #include "cluster/masklog.h" static int dlm_thread(void *data); -static void dlm_purge_lockres_now(struct dlm_ctxt *dlm, - struct dlm_lock_resource *lockres); - static void dlm_flush_asts(struct dlm_ctxt *dlm); #define dlm_lock_is_remote(dlm, lock) ((lock)->ml.node != (dlm)->node_num) @@ -82,14 +79,33 @@ repeat: current->state = TASK_RUNNING; } - -int __dlm_lockres_unused(struct dlm_lock_resource *res) +int __dlm_lockres_has_locks(struct dlm_lock_resource *res) { if (list_empty(&res->granted) && list_empty(&res->converting) && - list_empty(&res->blocked) && - list_empty(&res->dirty)) - return 1; + list_empty(&res->blocked)) + return 0; + return 1; +} + +/* "unused": the lockres has no locks, is not on the dirty list, + * has no inflight locks (in the gap between mastery and acquiring + * the first lock), and has no bits in its refmap. + * truly ready to be freed. */ +int __dlm_lockres_unused(struct dlm_lock_resource *res) +{ + if (!__dlm_lockres_has_locks(res) && + (list_empty(&res->dirty) && !(res->state & DLM_LOCK_RES_DIRTY))) { + /* try not to scan the bitmap unless the first two + * conditions are already true */ + int bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0); + if (bit >= O2NM_MAX_NODES) { + /* since the bit for dlm->node_num is not + * set, inflight_locks better be zero */ + BUG_ON(res->inflight_locks != 0); + return 1; + } + } return 0; } @@ -106,46 +122,21 @@ void __dlm_lockres_calc_usage(struct dlm assert_spin_locked(&res->spinlock); if (__dlm_lockres_unused(res)){ - /* For now, just keep any resource we master */ - if (res->owner == dlm->node_num) - { - if (!list_empty(&res->purge)) { - mlog(0, "we master %s:%.*s, but it is on " - "the purge list. Removing\n", - dlm->name, res->lockname.len, - res->lockname.name); - list_del_init(&res->purge); - dlm->purge_count--; - } - return; - } - if (list_empty(&res->purge)) { - mlog(0, "putting lockres %.*s from purge list\n", - res->lockname.len, res->lockname.name); + mlog(0, "putting lockres %.*s:%p onto purge list\n", + res->lockname.len, res->lockname.name, res); res->last_used = jiffies; + dlm_lockres_get(res); list_add_tail(&res->purge, &dlm->purge_list); dlm->purge_count++; - - /* if this node is not the owner, there is - * no way to keep track of who the owner could be. - * unhash it to avoid serious problems. */ - if (res->owner != dlm->node_num) { - mlog(0, "%s:%.*s: doing immediate " - "purge of lockres owned by %u\n", - dlm->name, res->lockname.len, - res->lockname.name, res->owner); - - dlm_purge_lockres_now(dlm, res); - } } } else if (!list_empty(&res->purge)) { - mlog(0, "removing lockres %.*s from purge list, " - "owner=%u\n", res->lockname.len, res->lockname.name, - res->owner); + mlog(0, "removing lockres %.*s:%p from purge list, owner=%u\n", + res->lockname.len, res->lockname.name, res, res->owner); list_del_init(&res->purge); + dlm_lockres_put(res); dlm->purge_count--; } } @@ -163,68 +154,61 @@ void dlm_lockres_calc_usage(struct dlm_c spin_unlock(&dlm->spinlock); } -/* TODO: Eventual API: Called with the dlm spinlock held, may drop it - * to do migration, but will re-acquire before exit. */ -void dlm_purge_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *lockres) +static int dlm_purge_lockres(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res) { int master; - int ret; + int ret = 0; - spin_lock(&lockres->spinlock); - master = lockres->owner == dlm->node_num; - spin_unlock(&lockres->spinlock); - - mlog(0, "purging lockres %.*s, master = %d\n", lockres->lockname.len, - lockres->lockname.name, master); - - /* Non master is the easy case -- no migration required, just - * quit. */ + spin_lock(&res->spinlock); + if (!__dlm_lockres_unused(res)) { + spin_unlock(&res->spinlock); + mlog(0, "%s:%.*s: tried to purge but not unused\n", + dlm->name, res->lockname.len, res->lockname.name); + return -ENOTEMPTY; + } + master = (res->owner == dlm->node_num); if (!master) - goto finish; - - /* Wheee! Migrate lockres here! */ - spin_unlock(&dlm->spinlock); -again: + res->state |= DLM_LOCK_RES_DROPPING_REF; + spin_unlock(&res->spinlock); - ret = dlm_migrate_lockres(dlm, lockres, O2NM_MAX_NODES); - if (ret == -ENOTEMPTY) { - mlog(ML_ERROR, "lockres %.*s still has local locks!\n", - lockres->lockname.len, lockres->lockname.name); + mlog(0, "purging lockres %.*s, master = %d\n", res->lockname.len, + res->lockname.name, master); - BUG(); - } else if (ret < 0) { - mlog(ML_NOTICE, "lockres %.*s: migrate failed, retrying\n", - lockres->lockname.len, lockres->lockname.name); - msleep(100); - goto again; + if (!master) { + /* drop spinlock to do messaging, retake below */ + spin_unlock(&dlm->spinlock); + /* clear our bit from the master's refmap, ignore errors */ + ret = dlm_drop_lockres_ref(dlm, res); + if (ret < 0) { + mlog_errno(ret); + if (!dlm_is_host_down(ret)) + BUG(); + } + mlog(0, "%s:%.*s: dlm_deref_lockres returned %d\n", + dlm->name, res->lockname.len, res->lockname.name, ret); + spin_lock(&dlm->spinlock); } - spin_lock(&dlm->spinlock); - -finish: - if (!list_empty(&lockres->purge)) { - list_del_init(&lockres->purge); + if (!list_empty(&res->purge)) { + mlog(0, "removing lockres %.*s:%p from purgelist, " + "master = %d\n", res->lockname.len, res->lockname.name, + res, master); + list_del_init(&res->purge); + dlm_lockres_put(res); dlm->purge_count--; } - __dlm_unhash_lockres(lockres); -} + __dlm_unhash_lockres(res); -/* make an unused lockres go away immediately. - * as soon as the dlm spinlock is dropped, this lockres - * will not be found. kfree still happens on last put. */ -static void dlm_purge_lockres_now(struct dlm_ctxt *dlm, - struct dlm_lock_resource *lockres) -{ - assert_spin_locked(&dlm->spinlock); - assert_spin_locked(&lockres->spinlock); - - BUG_ON(!__dlm_lockres_unused(lockres)); - - if (!list_empty(&lockres->purge)) { - list_del_init(&lockres->purge); - dlm->purge_count--; + /* lockres is not in the hash now. drop the flag and wake up + * any processes waiting in dlm_get_lock_resource. */ + if (!master) { + spin_lock(&res->spinlock); + res->state &= ~DLM_LOCK_RES_DROPPING_REF; + spin_unlock(&res->spinlock); + wake_up(&res->wq); } - __dlm_unhash_lockres(lockres); + return 0; } static void dlm_run_purge_list(struct dlm_ctxt *dlm, @@ -268,13 +252,17 @@ static void dlm_run_purge_list(struct dl break; } + mlog(0, "removing lockres %.*s:%p from purgelist\n", + lockres->lockname.len, lockres->lockname.name, lockres); list_del_init(&lockres->purge); + dlm_lockres_put(lockres); dlm->purge_count--; /* This may drop and reacquire the dlm spinlock if it * has to do migration. */ mlog(0, "calling dlm_purge_lockres!\n"); - dlm_purge_lockres(dlm, lockres); + if (dlm_purge_lockres(dlm, lockres)) + BUG(); mlog(0, "DONE calling dlm_purge_lockres!\n"); /* Avoid adding any scheduling latencies */ @@ -467,12 +455,17 @@ void __dlm_dirty_lockres(struct dlm_ctxt assert_spin_locked(&res->spinlock); /* don't shuffle secondary queues */ - if ((res->owner == dlm->node_num) && - !(res->state & DLM_LOCK_RES_DIRTY)) { - /* ref for dirty_list */ - dlm_lockres_get(res); - list_add_tail(&res->dirty, &dlm->dirty_list); - res->state |= DLM_LOCK_RES_DIRTY; + if ((res->owner == dlm->node_num)) { + if (res->state & (DLM_LOCK_RES_MIGRATING | + DLM_LOCK_RES_BLOCK_DIRTY)) + return; + + if (list_empty(&res->dirty)) { + /* ref for dirty_list */ + dlm_lockres_get(res); + list_add_tail(&res->dirty, &dlm->dirty_list); + res->state |= DLM_LOCK_RES_DIRTY; + } } } @@ -651,7 +644,7 @@ static int dlm_thread(void *data) dlm_lockres_get(res); spin_lock(&res->spinlock); - res->state &= ~DLM_LOCK_RES_DIRTY; + /* We clear the DLM_LOCK_RES_DIRTY state once we shuffle lists below */ list_del_init(&res->dirty); spin_unlock(&res->spinlock); spin_unlock(&dlm->spinlock); @@ -675,10 +668,11 @@ static int dlm_thread(void *data) /* it is now ok to move lockreses in these states * to the dirty list, assuming that they will only be * dirty for a short while. */ + BUG_ON(res->state & DLM_LOCK_RES_MIGRATING); if (res->state & (DLM_LOCK_RES_IN_PROGRESS | - DLM_LOCK_RES_MIGRATING | DLM_LOCK_RES_RECOVERING)) { /* move it to the tail and keep going */ + res->state &= ~DLM_LOCK_RES_DIRTY; spin_unlock(&res->spinlock); mlog(0, "delaying list shuffling for in-" "progress lockres %.*s, state=%d\n", @@ -699,6 +693,7 @@ static int dlm_thread(void *data) /* called while holding lockres lock */ dlm_shuffle_lists(dlm, res); + res->state &= ~DLM_LOCK_RES_DIRTY; spin_unlock(&res->spinlock); dlm_lockres_calc_usage(dlm, res); @@ -709,11 +704,8 @@ in_progress: /* if the lock was in-progress, stick * it on the back of the list */ if (delay) { - /* ref for dirty_list */ - dlm_lockres_get(res); spin_lock(&res->spinlock); - list_add_tail(&res->dirty, &dlm->dirty_list); - res->state |= DLM_LOCK_RES_DIRTY; + __dlm_dirty_lockres(dlm, res); spin_unlock(&res->spinlock); } dlm_lockres_put(res); diff --git a/fs/ocfs2/dlm/dlmunlock.c b/fs/ocfs2/dlm/dlmunlock.c index 37be4b2..3c8a250 100644 --- a/fs/ocfs2/dlm/dlmunlock.c +++ b/fs/ocfs2/dlm/dlmunlock.c @@ -147,6 +147,10 @@ static enum dlm_status dlmunlock_common( goto leave; } + if (res->state & DLM_LOCK_RES_MIGRATING) { + status = DLM_MIGRATING; + goto leave; + } /* see above for what the spec says about * LKM_CANCEL and the lock queue state */ diff --git a/fs/ocfs2/export.c b/fs/ocfs2/export.c index 06be6e7..56e1fef 100644 --- a/fs/ocfs2/export.c +++ b/fs/ocfs2/export.c @@ -60,14 +60,11 @@ static struct dentry *ocfs2_get_dentry(s inode = ocfs2_iget(OCFS2_SB(sb), handle->ih_blkno, 0); - if (IS_ERR(inode)) { - mlog_errno(PTR_ERR(inode)); + if (IS_ERR(inode)) return (void *)inode; - } if (handle->ih_generation != inode->i_generation) { iput(inode); - mlog_errno(-ESTALE); return ERR_PTR(-ESTALE); } diff --git a/fs/ocfs2/inode.c b/fs/ocfs2/inode.c index e4d9149..28ab56f 100644 --- a/fs/ocfs2/inode.c +++ b/fs/ocfs2/inode.c @@ -146,7 +146,6 @@ struct inode *ocfs2_iget(struct ocfs2_su if (is_bad_inode(inode)) { iput(inode); inode = ERR_PTR(-ESTALE); - mlog_errno(PTR_ERR(inode)); goto bail; } @@ -155,8 +154,7 @@ bail: mlog(0, "returning inode with number %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno); mlog_exit_ptr(inode); - } else - mlog_errno(PTR_ERR(inode)); + } return inode; } @@ -247,7 +245,7 @@ int ocfs2_populate_inode(struct inode *i * today. change if needed. */ if (!OCFS2_IS_VALID_DINODE(fe) || !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL))) { - mlog(ML_ERROR, "Invalid dinode: i_ino=%lu, i_blkno=%llu, " + mlog(0, "Invalid dinode: i_ino=%lu, i_blkno=%llu, " "signature = %.*s, flags = 0x%x\n", inode->i_ino, (unsigned long long)le64_to_cpu(fe->i_blkno), 7, @@ -478,11 +476,8 @@ static int ocfs2_read_locked_inode(struc S_ISBLK(le16_to_cpu(fe->i_mode))) inode->i_rdev = huge_decode_dev(le64_to_cpu(fe->id1.dev1.i_rdev)); - if (ocfs2_populate_inode(inode, fe, 0) < 0) { - mlog(ML_ERROR, "populate failed! i_blkno=%llu, i_ino=%lu\n", - (unsigned long long)fe->i_blkno, inode->i_ino); + if (ocfs2_populate_inode(inode, fe, 0) < 0) goto bail; - } BUG_ON(args->fi_blkno != le64_to_cpu(fe->i_blkno)); diff --git a/fs/ocfs2/namei.c b/fs/ocfs2/namei.c index 9637039..f3d7803 100644 --- a/fs/ocfs2/namei.c +++ b/fs/ocfs2/namei.c @@ -932,14 +932,15 @@ static int ocfs2_unlink(struct inode *di goto leave; } - if (S_ISDIR(inode->i_mode)) { + dir->i_ctime = dir->i_mtime = CURRENT_TIME; + if (S_ISDIR(inode->i_mode)) drop_nlink(dir); - status = ocfs2_mark_inode_dirty(handle, dir, - parent_node_bh); - if (status < 0) { - mlog_errno(status); + + status = ocfs2_mark_inode_dirty(handle, dir, parent_node_bh); + if (status < 0) { + mlog_errno(status); + if (S_ISDIR(inode->i_mode)) inc_nlink(dir); - } } leave: @@ -1068,6 +1069,7 @@ static int ocfs2_rename(struct inode *ol char orphan_name[OCFS2_ORPHAN_NAMELEN + 1]; struct buffer_head *orphan_entry_bh = NULL; struct buffer_head *newfe_bh = NULL; + struct buffer_head *old_inode_bh = NULL; struct buffer_head *insert_entry_bh = NULL; struct ocfs2_super *osb = NULL; u64 newfe_blkno; @@ -1079,7 +1081,7 @@ static int ocfs2_rename(struct inode *ol struct buffer_head *new_de_bh = NULL, *old_de_bh = NULL; // bhs for above struct buffer_head *old_inode_de_bh = NULL; // if old_dentry is a dir, // this is the 1st dirent bh - nlink_t old_dir_nlink = old_dir->i_nlink, new_dir_nlink = new_dir->i_nlink; + nlink_t old_dir_nlink = old_dir->i_nlink; /* At some point it might be nice to break this function up a * bit. */ @@ -1139,12 +1141,11 @@ static int ocfs2_rename(struct inode *ol } /* - * Though we don't require an inode meta data update if - * old_inode is not a directory, we lock anyway here to ensure - * the vote thread on other nodes won't have to concurrently - * downconvert the inode and the dentry locks. + * Aside from allowing a meta data update, the locking here + * also ensures that the vote thread on other nodes won't have + * to concurrently downconvert the inode and the dentry locks. */ - status = ocfs2_meta_lock(old_inode, NULL, 1); + status = ocfs2_meta_lock(old_inode, &old_inode_bh, 1); if (status < 0) { if (status != -ENOENT) mlog_errno(status); @@ -1355,6 +1356,7 @@ static int ocfs2_rename(struct inode *ol old_inode->i_ctime = CURRENT_TIME; mark_inode_dirty(old_inode); + ocfs2_mark_inode_dirty(handle, old_inode, old_inode_bh); /* now that the name has been added to new_dir, remove the old name */ status = ocfs2_delete_entry(handle, old_dir, old_de, old_de_bh); @@ -1384,27 +1386,22 @@ static int ocfs2_rename(struct inode *ol } } mark_inode_dirty(old_dir); - if (new_inode) + ocfs2_mark_inode_dirty(handle, old_dir, old_dir_bh); + if (new_inode) { mark_inode_dirty(new_inode); + ocfs2_mark_inode_dirty(handle, new_inode, newfe_bh); + } - if (old_dir != new_dir) - if (new_dir_nlink != new_dir->i_nlink) { - if (!new_dir_bh) { - mlog(ML_ERROR, "need to change nlink for new " - "dir %llu from %d to %d but bh is NULL\n", - (unsigned long long)OCFS2_I(new_dir)->ip_blkno, - (int)new_dir_nlink, new_dir->i_nlink); - } else { - struct ocfs2_dinode *fe; - status = ocfs2_journal_access(handle, - new_dir, - new_dir_bh, - OCFS2_JOURNAL_ACCESS_WRITE); - fe = (struct ocfs2_dinode *) new_dir_bh->b_data; - fe->i_links_count = cpu_to_le16(new_dir->i_nlink); - status = ocfs2_journal_dirty(handle, new_dir_bh); - } - } + if (old_dir != new_dir) { + /* Keep the same times on both directories.*/ + new_dir->i_ctime = new_dir->i_mtime = old_dir->i_ctime; + + /* + * This will also pick up the i_nlink change from the + * block above. + */ + ocfs2_mark_inode_dirty(handle, new_dir, new_dir_bh); + } if (old_dir_nlink != old_dir->i_nlink) { if (!old_dir_bh) { @@ -1455,6 +1452,8 @@ bail: iput(new_inode); if (newfe_bh) brelse(newfe_bh); + if (old_inode_bh) + brelse(old_inode_bh); if (old_dir_bh) brelse(old_dir_bh); if (new_dir_bh) @@ -1826,6 +1825,13 @@ static int __ocfs2_add_entry(handle_t *h (le16_to_cpu(de->rec_len) >= rec_len)) || (le16_to_cpu(de->rec_len) >= (OCFS2_DIR_REC_LEN(de->name_len) + rec_len))) { + dir->i_mtime = dir->i_ctime = CURRENT_TIME; + retval = ocfs2_mark_inode_dirty(handle, dir, parent_fe_bh); + if (retval < 0) { + mlog_errno(retval); + goto bail; + } + status = ocfs2_journal_access(handle, dir, insert_bh, OCFS2_JOURNAL_ACCESS_WRITE); /* By now the buffer is marked for journaling */ @@ -1848,7 +1854,6 @@ static int __ocfs2_add_entry(handle_t *h de->name_len = namelen; memcpy(de->name, name, namelen); - dir->i_mtime = dir->i_ctime = CURRENT_TIME; dir->i_version++; status = ocfs2_journal_dirty(handle, insert_bh); retval = 0; diff --git a/fs/ocfs2/symlink.c b/fs/ocfs2/symlink.c index 957d687..03b0191 100644 --- a/fs/ocfs2/symlink.c +++ b/fs/ocfs2/symlink.c @@ -158,8 +158,7 @@ static void *ocfs2_follow_link(struct de } status = vfs_follow_link(nd, link); - if (status && status != -ENOENT) - mlog_errno(status); + bail: if (page) { kunmap(page); diff --git a/include/linux/configfs.h b/include/linux/configfs.h index fef6f3d..62f3763 100644 --- a/include/linux/configfs.h +++ b/include/linux/configfs.h @@ -157,6 +157,7 @@ struct configfs_group_operations { struct config_item *(*make_item)(struct config_group *group, const char *name); struct config_group *(*make_group)(struct config_group *group, const char *name); int (*commit_item)(struct config_item *item); + void (*disconnect_notify)(struct config_group *group, struct config_item *item); void (*drop_item)(struct config_group *group, struct config_item *item); };