aboutsummaryrefslogtreecommitdiffstats
path: root/drivers/staging/lustre/lustre/ldlm/ldlm_request.c
diff options
context:
space:
mode:
Diffstat (limited to 'drivers/staging/lustre/lustre/ldlm/ldlm_request.c')
-rw-r--r--drivers/staging/lustre/lustre/ldlm/ldlm_request.c119
1 files changed, 58 insertions, 61 deletions
diff --git a/drivers/staging/lustre/lustre/ldlm/ldlm_request.c b/drivers/staging/lustre/lustre/ldlm/ldlm_request.c
index af487f9937f4..35ba6f14d95f 100644
--- a/drivers/staging/lustre/lustre/ldlm/ldlm_request.c
+++ b/drivers/staging/lustre/lustre/ldlm/ldlm_request.c
@@ -63,8 +63,8 @@
#include "ldlm_internal.h"
-int ldlm_enqueue_min = OBD_TIMEOUT_DEFAULT;
-module_param(ldlm_enqueue_min, int, 0644);
+unsigned int ldlm_enqueue_min = OBD_TIMEOUT_DEFAULT;
+module_param(ldlm_enqueue_min, uint, 0644);
MODULE_PARM_DESC(ldlm_enqueue_min, "lock enqueue timeout minimum");
/* in client side, whether the cached locks will be canceled before replay */
@@ -123,44 +123,56 @@ static int ldlm_expired_completion_wait(void *data)
return 0;
}
+/**
+ * Calculate the Completion timeout (covering enqueue, BL AST, data flush,
+ * lock cancel, and their replies). Used for lock completion timeout on the
+ * client side.
+ *
+ * \param[in] lock lock which is waiting the completion callback
+ *
+ * \retval timeout in seconds to wait for the server reply
+ */
/* We use the same basis for both server side and client side functions
* from a single node.
*/
-static int ldlm_get_enq_timeout(struct ldlm_lock *lock)
+static unsigned int ldlm_cp_timeout(struct ldlm_lock *lock)
{
- int timeout = at_get(ldlm_lock_to_ns_at(lock));
+ unsigned int timeout;
if (AT_OFF)
- return obd_timeout / 2;
- /* Since these are non-updating timeouts, we should be conservative.
- * It would be nice to have some kind of "early reply" mechanism for
- * lock callbacks too...
+ return obd_timeout;
+
+ /*
+ * Wait a long time for enqueue - server may have to callback a
+ * lock from another client. Server will evict the other client if it
+ * doesn't respond reasonably, and then give us the lock.
*/
- timeout = min_t(int, at_max, timeout + (timeout >> 1)); /* 150% */
- return max(timeout, ldlm_enqueue_min);
+ timeout = at_get(ldlm_lock_to_ns_at(lock));
+ return max(3 * timeout, ldlm_enqueue_min);
}
/**
* Helper function for ldlm_completion_ast(), updating timings when lock is
* actually granted.
*/
-static int ldlm_completion_tail(struct ldlm_lock *lock)
+static int ldlm_completion_tail(struct ldlm_lock *lock, void *data)
{
long delay;
- int result;
+ int result = 0;
if (ldlm_is_destroyed(lock) || ldlm_is_failed(lock)) {
LDLM_DEBUG(lock, "client-side enqueue: destroyed");
result = -EIO;
+ } else if (!data) {
+ LDLM_DEBUG(lock, "client-side enqueue: granted");
} else {
+ /* Take into AT only CP RPC, not immediately granted locks */
delay = ktime_get_real_seconds() - lock->l_last_activity;
LDLM_DEBUG(lock, "client-side enqueue: granted after %lds",
delay);
/* Update our time estimate */
- at_measured(ldlm_lock_to_ns_at(lock),
- delay);
- result = 0;
+ at_measured(ldlm_lock_to_ns_at(lock), delay);
}
return result;
}
@@ -177,10 +189,9 @@ int ldlm_completion_ast_async(struct ldlm_lock *lock, __u64 flags, void *data)
return 0;
}
- if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
- LDLM_FL_BLOCK_CONV))) {
+ if (!(flags & LDLM_FL_BLOCKED_MASK)) {
wake_up(&lock->l_waitq);
- return ldlm_completion_tail(lock);
+ return ldlm_completion_tail(lock, data);
}
LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, going forward");
@@ -224,8 +235,7 @@ int ldlm_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
goto noreproc;
}
- if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
- LDLM_FL_BLOCK_CONV))) {
+ if (!(flags & LDLM_FL_BLOCKED_MASK)) {
wake_up(&lock->l_waitq);
return 0;
}
@@ -240,13 +250,10 @@ noreproc:
if (obd)
imp = obd->u.cli.cl_import;
- /* Wait a long time for enqueue - server may have to callback a
- * lock from another client. Server will evict the other client if it
- * doesn't respond reasonably, and then give us the lock.
- */
- timeout = ldlm_get_enq_timeout(lock) * 2;
+ timeout = ldlm_cp_timeout(lock);
lwd.lwd_lock = lock;
+ lock->l_last_activity = ktime_get_real_seconds();
if (ldlm_is_no_timeout(lock)) {
LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT");
@@ -279,7 +286,7 @@ noreproc:
return rc;
}
- return ldlm_completion_tail(lock);
+ return ldlm_completion_tail(lock, data);
}
EXPORT_SYMBOL(ldlm_completion_ast);
@@ -309,8 +316,6 @@ static void failed_lock_cleanup(struct ldlm_namespace *ns,
else
LDLM_DEBUG(lock, "lock was granted or failed in race");
- ldlm_lock_decref_internal(lock, mode);
-
/* XXX - HACK because we shouldn't call ldlm_lock_destroy()
* from llite/file.c/ll_file_flock().
*/
@@ -321,9 +326,14 @@ static void failed_lock_cleanup(struct ldlm_namespace *ns,
*/
if (lock->l_resource->lr_type == LDLM_FLOCK) {
lock_res_and_lock(lock);
- ldlm_resource_unlink_lock(lock);
- ldlm_lock_destroy_nolock(lock);
+ if (!ldlm_is_destroyed(lock)) {
+ ldlm_resource_unlink_lock(lock);
+ ldlm_lock_decref_internal_nolock(lock, mode);
+ ldlm_lock_destroy_nolock(lock);
+ }
unlock_res_and_lock(lock);
+ } else {
+ ldlm_lock_decref_internal(lock, mode);
}
}
@@ -418,11 +428,6 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
*flags = ldlm_flags_from_wire(reply->lock_flags);
lock->l_flags |= ldlm_flags_from_wire(reply->lock_flags &
LDLM_FL_INHERIT_MASK);
- /* move NO_TIMEOUT flag to the lock to force ldlm_lock_match()
- * to wait with no timeout as well
- */
- lock->l_flags |= ldlm_flags_from_wire(reply->lock_flags &
- LDLM_FL_NO_TIMEOUT);
unlock_res_and_lock(lock);
CDEBUG(D_INFO, "local: %p, remote cookie: %#llx, flags: 0x%llx\n",
@@ -556,7 +561,7 @@ static inline int ldlm_capsule_handles_avail(struct req_capsule *pill,
enum req_location loc,
int off)
{
- int size = req_capsule_msg_size(pill, loc);
+ u32 size = req_capsule_msg_size(pill, loc);
return ldlm_req_handles_avail(size, off);
}
@@ -565,7 +570,7 @@ static inline int ldlm_format_handles_avail(struct obd_import *imp,
const struct req_format *fmt,
enum req_location loc, int off)
{
- int size = req_capsule_fmt_size(imp->imp_msg_magic, fmt, loc);
+ u32 size = req_capsule_fmt_size(imp->imp_msg_magic, fmt, loc);
return ldlm_req_handles_avail(size, off);
}
@@ -696,8 +701,8 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
lock = ldlm_lock_create(ns, res_id, einfo->ei_type,
einfo->ei_mode, &cbs, einfo->ei_cbdata,
lvb_len, lvb_type);
- if (!lock)
- return -ENOMEM;
+ if (IS_ERR(lock))
+ return PTR_ERR(lock);
/* for the local lock, add the reference */
ldlm_lock_addref_internal(lock, einfo->ei_mode);
ldlm_lock2handle(lock, lockh);
@@ -719,6 +724,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
lock->l_export = NULL;
lock->l_blocking_ast = einfo->ei_cb_bl;
lock->l_flags |= (*flags & (LDLM_FL_NO_LRU | LDLM_FL_EXCL));
+ lock->l_last_activity = ktime_get_real_seconds();
/* lock not sent to server yet */
@@ -819,7 +825,7 @@ static __u64 ldlm_cli_cancel_local(struct ldlm_lock *lock)
lock_res_and_lock(lock);
ldlm_set_cbpending(lock);
local_only = !!(lock->l_flags &
- (LDLM_FL_LOCAL_ONLY|LDLM_FL_CANCEL_ON_BLOCK));
+ (LDLM_FL_LOCAL_ONLY | LDLM_FL_CANCEL_ON_BLOCK));
ldlm_cancel_callback(lock);
rc = ldlm_is_bl_ast(lock) ? LDLM_FL_BL_AST : LDLM_FL_CANCELING;
unlock_res_and_lock(lock);
@@ -1180,8 +1186,7 @@ static enum ldlm_policy_res ldlm_cancel_lrur_policy(struct ldlm_namespace *ns,
slv = ldlm_pool_get_slv(pl);
lvf = ldlm_pool_get_lvf(pl);
- la = cfs_duration_sec(cfs_time_sub(cur,
- lock->l_last_used));
+ la = cfs_duration_sec(cfs_time_sub(cur, lock->l_last_used));
lv = lvf * la * unused;
/* Inform pool about current CLV to see it via debugfs. */
@@ -1193,9 +1198,6 @@ static enum ldlm_policy_res ldlm_cancel_lrur_policy(struct ldlm_namespace *ns,
if (slv == 0 || lv < slv)
return LDLM_POLICY_KEEP_LOCK;
- if (ns->ns_cancel && ns->ns_cancel(lock) == 0)
- return LDLM_POLICY_KEEP_LOCK;
-
return LDLM_POLICY_CANCEL_LOCK;
}
@@ -1239,9 +1241,6 @@ static enum ldlm_policy_res ldlm_cancel_aged_policy(struct ldlm_namespace *ns,
cfs_time_add(lock->l_last_used, ns->ns_max_age)))
return LDLM_POLICY_KEEP_LOCK;
- if (ns->ns_cancel && ns->ns_cancel(lock) == 0)
- return LDLM_POLICY_KEEP_LOCK;
-
return LDLM_POLICY_CANCEL_LOCK;
}
@@ -1374,7 +1373,7 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
break;
list_for_each_entry_safe(lock, next, &ns->ns_unused_list,
- l_lru) {
+ l_lru) {
/* No locks which got blocking requests. */
LASSERT(!ldlm_is_bl_ast(lock));
@@ -1413,7 +1412,8 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
* That is, for shrinker policy we drop only
* old locks, but additionally choose them by
* their weight. Big extent locks will stay in
- * the cache. */
+ * the cache.
+ */
result = pf(ns, lock, unused, added, count);
if (result == LDLM_POLICY_KEEP_LOCK) {
lu_ref_del(&lock->l_reference,
@@ -1610,8 +1610,7 @@ int ldlm_cli_cancel_list(struct list_head *cancels, int count,
*/
while (count > 0) {
LASSERT(!list_empty(cancels));
- lock = list_entry(cancels->next, struct ldlm_lock,
- l_bl_ast);
+ lock = list_entry(cancels->next, struct ldlm_lock, l_bl_ast);
LASSERT(lock->l_conn_export);
if (exp_connect_cancelset(lock->l_conn_export)) {
@@ -1660,7 +1659,7 @@ int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
int rc;
res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
- if (!res) {
+ if (IS_ERR(res)) {
/* This is not a problem. */
CDEBUG(D_INFO, "No resource %llu\n", res_id->name[0]);
return 0;
@@ -1704,7 +1703,8 @@ static int ldlm_cli_hash_cancel_unused(struct cfs_hash *hs,
* that have 0 readers/writers.
*
* If flags & LCF_LOCAL, throw the locks away without trying
- * to notify the server. */
+ * to notify the server.
+ */
int ldlm_cli_cancel_unused(struct ldlm_namespace *ns,
const struct ldlm_res_id *res_id,
enum ldlm_cancel_flags flags, void *opaque)
@@ -1811,13 +1811,10 @@ int ldlm_resource_iterate(struct ldlm_namespace *ns,
struct ldlm_resource *res;
int rc;
- if (!ns) {
- CERROR("must pass in namespace\n");
- LBUG();
- }
+ LASSERTF(ns, "must pass in namespace\n");
res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
- if (!res)
+ if (IS_ERR(res))
return 0;
LDLM_RESOURCE_ADDREF(res);
@@ -1843,7 +1840,7 @@ static int ldlm_chain_lock_for_replay(struct ldlm_lock *lock, void *closure)
* bug 17614: locks being actively cancelled. Get a reference
* on a lock so that it does not disappear under us (e.g. due to cancel)
*/
- if (!(lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_CANCELING))) {
+ if (!(lock->l_flags & (LDLM_FL_FAILED | LDLM_FL_CANCELING))) {
list_add(&lock->l_pending_chain, list);
LDLM_LOCK_GET(lock);
}
@@ -2013,7 +2010,7 @@ static void ldlm_cancel_unused_locks_for_replay(struct ldlm_namespace *ns)
LCF_LOCAL, LDLM_CANCEL_NO_WAIT);
CDEBUG(D_DLMTRACE, "Canceled %d unused locks from namespace %s\n",
- canceled, ldlm_ns_name(ns));
+ canceled, ldlm_ns_name(ns));
}
int ldlm_replay_locks(struct obd_import *imp)