aboutsummaryrefslogtreecommitdiffstats
path: root/drivers/staging/lustre/lustre/ldlm/ldlm_request.c
diff options
context:
space:
mode:
Diffstat (limited to 'drivers/staging/lustre/lustre/ldlm/ldlm_request.c')
-rw-r--r--drivers/staging/lustre/lustre/ldlm/ldlm_request.c380
1 files changed, 34 insertions, 346 deletions
diff --git a/drivers/staging/lustre/lustre/ldlm/ldlm_request.c b/drivers/staging/lustre/lustre/ldlm/ldlm_request.c
index 6245a2c36a0f..fdf81b87aad7 100644
--- a/drivers/staging/lustre/lustre/ldlm/ldlm_request.c
+++ b/drivers/staging/lustre/lustre/ldlm/ldlm_request.c
@@ -87,7 +87,7 @@ struct ldlm_async_args {
struct lustre_handle lock_handle;
};
-int ldlm_expired_completion_wait(void *data)
+static int ldlm_expired_completion_wait(void *data)
{
struct lock_wait_data *lwd = data;
struct ldlm_lock *lock = lwd->lwd_lock;
@@ -97,15 +97,14 @@ int ldlm_expired_completion_wait(void *data)
if (lock->l_conn_export == NULL) {
static unsigned long next_dump, last_dump;
- LCONSOLE_WARN("lock timed out (enqueued at "CFS_TIME_T", "
- CFS_DURATION_T"s ago)\n",
- lock->l_last_activity,
- cfs_time_sub(get_seconds(),
- lock->l_last_activity));
- LDLM_DEBUG(lock, "lock timed out (enqueued at " CFS_TIME_T ", " CFS_DURATION_T "s ago); not entering recovery in server code, just going back to sleep",
- lock->l_last_activity,
- cfs_time_sub(get_seconds(),
- lock->l_last_activity));
+ LCONSOLE_WARN("lock timed out (enqueued at %lld, %llds ago)\n",
+ (s64)lock->l_last_activity,
+ (s64)(ktime_get_real_seconds() -
+ lock->l_last_activity));
+ LDLM_DEBUG(lock, "lock timed out (enqueued at %lld, %llds ago); not entering recovery in server code, just going back to sleep",
+ (s64)lock->l_last_activity,
+ (s64)(ktime_get_real_seconds() -
+ lock->l_last_activity));
if (cfs_time_after(cfs_time_current(), next_dump)) {
last_dump = next_dump;
next_dump = cfs_time_shift(300);
@@ -120,19 +119,17 @@ int ldlm_expired_completion_wait(void *data)
obd = lock->l_conn_export->exp_obd;
imp = obd->u.cli.cl_import;
ptlrpc_fail_import(imp, lwd->lwd_conn_cnt);
- LDLM_ERROR(lock, "lock timed out (enqueued at "CFS_TIME_T", "
- CFS_DURATION_T"s ago), entering recovery for %s@%s",
- lock->l_last_activity,
- cfs_time_sub(get_seconds(), lock->l_last_activity),
- obd2cli_tgt(obd), imp->imp_connection->c_remote_uuid.uuid);
+ LDLM_ERROR(lock, "lock timed out (enqueued at %lld, %llds ago), entering recovery for %s@%s",
+ (s64)lock->l_last_activity,
+ (s64)(ktime_get_real_seconds() - lock->l_last_activity),
+ obd2cli_tgt(obd), imp->imp_connection->c_remote_uuid.uuid);
return 0;
}
-EXPORT_SYMBOL(ldlm_expired_completion_wait);
/* We use the same basis for both server side and client side functions
from a single node. */
-int ldlm_get_enq_timeout(struct ldlm_lock *lock)
+static int ldlm_get_enq_timeout(struct ldlm_lock *lock)
{
int timeout = at_get(ldlm_lock_to_ns_at(lock));
@@ -144,7 +141,6 @@ int ldlm_get_enq_timeout(struct ldlm_lock *lock)
timeout = min_t(int, at_max, timeout + (timeout >> 1)); /* 150% */
return max(timeout, ldlm_enqueue_min);
}
-EXPORT_SYMBOL(ldlm_get_enq_timeout);
/**
* Helper function for ldlm_completion_ast(), updating timings when lock is
@@ -159,10 +155,9 @@ static int ldlm_completion_tail(struct ldlm_lock *lock)
LDLM_DEBUG(lock, "client-side enqueue: destroyed");
result = -EIO;
} else {
- delay = cfs_time_sub(get_seconds(),
- lock->l_last_activity);
- LDLM_DEBUG(lock, "client-side enqueue: granted after "
- CFS_DURATION_T"s", delay);
+ delay = ktime_get_real_seconds() - lock->l_last_activity;
+ LDLM_DEBUG(lock, "client-side enqueue: granted after %lds",
+ delay);
/* Update our time estimate */
at_measured(ldlm_lock_to_ns_at(lock),
@@ -191,7 +186,6 @@ int ldlm_completion_ast_async(struct ldlm_lock *lock, __u64 flags, void *data)
}
LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, going forward");
- ldlm_reprocess_all(lock->l_resource);
return 0;
}
EXPORT_SYMBOL(ldlm_completion_ast_async);
@@ -270,8 +264,7 @@ noreproc:
spin_unlock(&imp->imp_lock);
}
- if (ns_is_client(ldlm_lock_to_ns(lock)) &&
- OBD_FAIL_CHECK_RESET(OBD_FAIL_LDLM_INTR_CP_AST,
+ if (OBD_FAIL_CHECK_RESET(OBD_FAIL_LDLM_INTR_CP_AST,
OBD_FAIL_LDLM_CP_BL_RACE | OBD_FAIL_ONCE)) {
lock->l_flags |= LDLM_FL_FAIL_LOC;
rc = -EINTR;
@@ -291,172 +284,6 @@ noreproc:
}
EXPORT_SYMBOL(ldlm_completion_ast);
-/**
- * A helper to build a blocking AST function
- *
- * Perform a common operation for blocking ASTs:
- * deferred lock cancellation.
- *
- * \param lock the lock blocking or canceling AST was called on
- * \retval 0
- * \see mdt_blocking_ast
- * \see ldlm_blocking_ast
- */
-int ldlm_blocking_ast_nocheck(struct ldlm_lock *lock)
-{
- int do_ast;
-
- lock->l_flags |= LDLM_FL_CBPENDING;
- do_ast = !lock->l_readers && !lock->l_writers;
- unlock_res_and_lock(lock);
-
- if (do_ast) {
- struct lustre_handle lockh;
- int rc;
-
- LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
- ldlm_lock2handle(lock, &lockh);
- rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
- if (rc < 0)
- CERROR("ldlm_cli_cancel: %d\n", rc);
- } else {
- LDLM_DEBUG(lock, "Lock still has references, will be cancelled later");
- }
- return 0;
-}
-EXPORT_SYMBOL(ldlm_blocking_ast_nocheck);
-
-/**
- * Server blocking AST
- *
- * ->l_blocking_ast() callback for LDLM locks acquired by server-side
- * OBDs.
- *
- * \param lock the lock which blocks a request or cancelling lock
- * \param desc unused
- * \param data unused
- * \param flag indicates whether this cancelling or blocking callback
- * \retval 0
- * \see ldlm_blocking_ast_nocheck
- */
-int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
- void *data, int flag)
-{
- if (flag == LDLM_CB_CANCELING) {
- /* Don't need to do anything here. */
- return 0;
- }
-
- lock_res_and_lock(lock);
- /* Get this: if ldlm_blocking_ast is racing with intent_policy, such
- * that ldlm_blocking_ast is called just before intent_policy method
- * takes the lr_lock, then by the time we get the lock, we might not
- * be the correct blocking function anymore. So check, and return
- * early, if so. */
- if (lock->l_blocking_ast != ldlm_blocking_ast) {
- unlock_res_and_lock(lock);
- return 0;
- }
- return ldlm_blocking_ast_nocheck(lock);
-}
-EXPORT_SYMBOL(ldlm_blocking_ast);
-
-/**
- * ->l_glimpse_ast() for DLM extent locks acquired on the server-side. See
- * comment in filter_intent_policy() on why you may need this.
- */
-int ldlm_glimpse_ast(struct ldlm_lock *lock, void *reqp)
-{
- /*
- * Returning -ELDLM_NO_LOCK_DATA actually works, but the reason for
- * that is rather subtle: with OST-side locking, it may so happen that
- * _all_ extent locks are held by the OST. If client wants to obtain
- * current file size it calls ll{,u}_glimpse_size(), and (as locks are
- * on the server), dummy glimpse callback fires and does
- * nothing. Client still receives correct file size due to the
- * following fragment in filter_intent_policy():
- *
- * rc = l->l_glimpse_ast(l, NULL); // this will update the LVB
- * if (rc != 0 && res->lr_namespace->ns_lvbo &&
- * res->lr_namespace->ns_lvbo->lvbo_update) {
- * res->lr_namespace->ns_lvbo->lvbo_update(res, NULL, 0, 1);
- * }
- *
- * that is, after glimpse_ast() fails, filter_lvbo_update() runs, and
- * returns correct file size to the client.
- */
- return -ELDLM_NO_LOCK_DATA;
-}
-EXPORT_SYMBOL(ldlm_glimpse_ast);
-
-/**
- * Enqueue a local lock (typically on a server).
- */
-int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
- const struct ldlm_res_id *res_id,
- ldlm_type_t type, ldlm_policy_data_t *policy,
- ldlm_mode_t mode, __u64 *flags,
- ldlm_blocking_callback blocking,
- ldlm_completion_callback completion,
- ldlm_glimpse_callback glimpse,
- void *data, __u32 lvb_len, enum lvb_type lvb_type,
- const __u64 *client_cookie,
- struct lustre_handle *lockh)
-{
- struct ldlm_lock *lock;
- int err;
- const struct ldlm_callback_suite cbs = { .lcs_completion = completion,
- .lcs_blocking = blocking,
- .lcs_glimpse = glimpse,
- };
-
- LASSERT(!(*flags & LDLM_FL_REPLAY));
- if (unlikely(ns_is_client(ns))) {
- CERROR("Trying to enqueue local lock in a shadow namespace\n");
- LBUG();
- }
-
- lock = ldlm_lock_create(ns, res_id, type, mode, &cbs, data, lvb_len,
- lvb_type);
- if (unlikely(!lock)) {
- err = -ENOMEM;
- goto out_nolock;
- }
-
- ldlm_lock2handle(lock, lockh);
-
- /* NB: we don't have any lock now (lock_res_and_lock)
- * because it's a new lock */
- ldlm_lock_addref_internal_nolock(lock, mode);
- lock->l_flags |= LDLM_FL_LOCAL;
- if (*flags & LDLM_FL_ATOMIC_CB)
- lock->l_flags |= LDLM_FL_ATOMIC_CB;
-
- if (policy != NULL)
- lock->l_policy_data = *policy;
- if (client_cookie != NULL)
- lock->l_client_cookie = *client_cookie;
- if (type == LDLM_EXTENT)
- lock->l_req_extent = policy->l_extent;
-
- err = ldlm_lock_enqueue(ns, &lock, policy, flags);
- if (unlikely(err != ELDLM_OK))
- goto out;
-
- if (policy != NULL)
- *policy = lock->l_policy_data;
-
- if (lock->l_completion_ast)
- lock->l_completion_ast(lock, *flags, NULL);
-
- LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created");
- out:
- LDLM_LOCK_RELEASE(lock);
- out_nolock:
- return err;
-}
-EXPORT_SYMBOL(ldlm_cli_enqueue_local);
-
static void failed_lock_cleanup(struct ldlm_namespace *ns,
struct ldlm_lock *lock, int mode)
{
@@ -813,27 +640,6 @@ int ldlm_prep_enqueue_req(struct obd_export *exp, struct ptlrpc_request *req,
}
EXPORT_SYMBOL(ldlm_prep_enqueue_req);
-struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp, int lvb_len)
-{
- struct ptlrpc_request *req;
- int rc;
-
- req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
- if (req == NULL)
- return ERR_PTR(-ENOMEM);
-
- rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
- if (rc) {
- ptlrpc_request_free(req);
- return ERR_PTR(rc);
- }
-
- req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
- ptlrpc_request_set_replen(req);
- return req;
-}
-EXPORT_SYMBOL(ldlm_enqueue_pack);
-
/**
* Client-side lock enqueue.
*
@@ -977,107 +783,6 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
}
EXPORT_SYMBOL(ldlm_cli_enqueue);
-static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode,
- __u32 *flags)
-{
- struct ldlm_resource *res;
- int rc;
-
- if (ns_is_client(ldlm_lock_to_ns(lock))) {
- CERROR("Trying to cancel local lock\n");
- LBUG();
- }
- LDLM_DEBUG(lock, "client-side local convert");
-
- res = ldlm_lock_convert(lock, new_mode, flags);
- if (res) {
- ldlm_reprocess_all(res);
- rc = 0;
- } else {
- rc = LUSTRE_EDEADLK;
- }
- LDLM_DEBUG(lock, "client-side local convert handler END");
- LDLM_LOCK_PUT(lock);
- return rc;
-}
-
-/* FIXME: one of ldlm_cli_convert or the server side should reject attempted
- * conversion of locks which are on the waiting or converting queue */
-/* Caller of this code is supposed to take care of lock readers/writers
- accounting */
-int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, __u32 *flags)
-{
- struct ldlm_request *body;
- struct ldlm_reply *reply;
- struct ldlm_lock *lock;
- struct ldlm_resource *res;
- struct ptlrpc_request *req;
- int rc;
-
- lock = ldlm_handle2lock(lockh);
- if (!lock) {
- LBUG();
- return -EINVAL;
- }
- *flags = 0;
-
- if (lock->l_conn_export == NULL)
- return ldlm_cli_convert_local(lock, new_mode, flags);
-
- LDLM_DEBUG(lock, "client-side convert");
-
- req = ptlrpc_request_alloc_pack(class_exp2cliimp(lock->l_conn_export),
- &RQF_LDLM_CONVERT, LUSTRE_DLM_VERSION,
- LDLM_CONVERT);
- if (req == NULL) {
- LDLM_LOCK_PUT(lock);
- return -ENOMEM;
- }
-
- body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
- body->lock_handle[0] = lock->l_remote_handle;
-
- body->lock_desc.l_req_mode = new_mode;
- body->lock_flags = ldlm_flags_to_wire(*flags);
-
-
- ptlrpc_request_set_replen(req);
- rc = ptlrpc_queue_wait(req);
- if (rc != ELDLM_OK)
- goto out;
-
- reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
- if (reply == NULL) {
- rc = -EPROTO;
- goto out;
- }
-
- if (req->rq_status) {
- rc = req->rq_status;
- goto out;
- }
-
- res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags);
- if (res != NULL) {
- ldlm_reprocess_all(res);
- /* Go to sleep until the lock is granted. */
- /* FIXME: or cancelled. */
- if (lock->l_completion_ast) {
- rc = lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC,
- NULL);
- if (rc)
- goto out;
- }
- } else {
- rc = LUSTRE_EDEADLK;
- }
- out:
- LDLM_LOCK_PUT(lock);
- ptlrpc_req_finished(req);
- return rc;
-}
-EXPORT_SYMBOL(ldlm_cli_convert);
-
/**
* Cancel locks locally.
* Returns:
@@ -1109,13 +814,8 @@ static __u64 ldlm_cli_cancel_local(struct ldlm_lock *lock)
}
ldlm_lock_cancel(lock);
} else {
- if (ns_is_client(ldlm_lock_to_ns(lock))) {
- LDLM_ERROR(lock, "Trying to cancel local lock");
- LBUG();
- }
- LDLM_DEBUG(lock, "server-side local cancel");
- ldlm_lock_cancel(lock);
- ldlm_reprocess_all(lock->l_resource);
+ LDLM_ERROR(lock, "Trying to cancel local lock");
+ LBUG();
}
return rc;
@@ -1159,8 +859,9 @@ static void ldlm_cancel_pack(struct ptlrpc_request *req,
/**
* Prepare and send a batched cancel RPC. It will include \a count lock
* handles of locks given in \a cancels list. */
-int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
- int count, ldlm_cancel_flags_t flags)
+static int ldlm_cli_cancel_req(struct obd_export *exp,
+ struct list_head *cancels,
+ int count, ldlm_cancel_flags_t flags)
{
struct ptlrpc_request *req = NULL;
struct obd_import *imp;
@@ -1212,12 +913,12 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
ptlrpc_request_set_replen(req);
if (flags & LCF_ASYNC) {
- ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
+ ptlrpcd_add_req(req);
sent = count;
goto out;
- } else {
- rc = ptlrpc_queue_wait(req);
}
+
+ rc = ptlrpc_queue_wait(req);
if (rc == LUSTRE_ESTALE) {
CDEBUG(D_DLMTRACE, "client/server (nid %s) out of sync -- not fatal\n",
libcfs_nid2str(req->rq_import->
@@ -1242,7 +943,6 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
out:
return sent ? sent : rc;
}
-EXPORT_SYMBOL(ldlm_cli_cancel_req);
static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp)
{
@@ -1723,9 +1423,9 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
return added;
}
-int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
- int count, int max, ldlm_cancel_flags_t cancel_flags,
- int flags)
+int ldlm_cancel_lru_local(struct ldlm_namespace *ns,
+ struct list_head *cancels, int count, int max,
+ ldlm_cancel_flags_t cancel_flags, int flags)
{
int added;
@@ -1962,8 +1662,8 @@ EXPORT_SYMBOL(ldlm_cli_cancel_unused);
/* Lock iterators. */
-int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter,
- void *closure)
+static int ldlm_resource_foreach(struct ldlm_resource *res,
+ ldlm_iterator_t iter, void *closure)
{
struct list_head *tmp, *next;
struct ldlm_lock *lock;
@@ -1982,15 +1682,6 @@ int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter,
}
}
- list_for_each_safe(tmp, next, &res->lr_converting) {
- lock = list_entry(tmp, struct ldlm_lock, l_res_link);
-
- if (iter(lock, closure) == LDLM_ITER_STOP) {
- rc = LDLM_ITER_STOP;
- goto out;
- }
- }
-
list_for_each_safe(tmp, next, &res->lr_waiting) {
lock = list_entry(tmp, struct ldlm_lock, l_res_link);
@@ -2003,7 +1694,6 @@ int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter,
unlock_res(res);
return rc;
}
-EXPORT_SYMBOL(ldlm_resource_foreach);
struct iter_helper_data {
ldlm_iterator_t iter;
@@ -2027,8 +1717,8 @@ static int ldlm_res_iter_helper(struct cfs_hash *hs, struct cfs_hash_bd *bd,
LDLM_ITER_STOP;
}
-void ldlm_namespace_foreach(struct ldlm_namespace *ns,
- ldlm_iterator_t iter, void *closure)
+static void ldlm_namespace_foreach(struct ldlm_namespace *ns,
+ ldlm_iterator_t iter, void *closure)
{
struct iter_helper_data helper = {
@@ -2040,7 +1730,6 @@ void ldlm_namespace_foreach(struct ldlm_namespace *ns,
ldlm_res_iter_helper, &helper);
}
-EXPORT_SYMBOL(ldlm_namespace_foreach);
/* non-blocking function to manipulate a lock whose cb_data is being put away.
* return 0: find no resource
@@ -2106,7 +1795,6 @@ static int replay_lock_interpret(const struct lu_env *env,
if (rc != ELDLM_OK)
goto out;
-
reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
if (reply == NULL) {
rc = -EPROTO;
@@ -2223,7 +1911,7 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
aa = ptlrpc_req_async_args(req);
aa->lock_handle = body->lock_handle[0];
req->rq_interpret_reply = (ptlrpc_interpterer_t)replay_lock_interpret;
- ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
+ ptlrpcd_add_req(req);
return 0;
}