aboutsummaryrefslogtreecommitdiffstats
path: root/drivers/staging/lustre/lustre/mdc/mdc_locks.c
diff options
context:
space:
mode:
Diffstat (limited to 'drivers/staging/lustre/lustre/mdc/mdc_locks.c')
-rw-r--r--drivers/staging/lustre/lustre/mdc/mdc_locks.c1202
1 files changed, 0 insertions, 1202 deletions
diff --git a/drivers/staging/lustre/lustre/mdc/mdc_locks.c b/drivers/staging/lustre/lustre/mdc/mdc_locks.c
deleted file mode 100644
index 695ef44532cf..000000000000
--- a/drivers/staging/lustre/lustre/mdc/mdc_locks.c
+++ /dev/null
@@ -1,1202 +0,0 @@
-// SPDX-License-Identifier: GPL-2.0
-/*
- * GPL HEADER START
- *
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 only,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * General Public License version 2 for more details (a copy is included
- * in the LICENSE file that accompanied this code).
- *
- * You should have received a copy of the GNU General Public License
- * version 2 along with this program; If not, see
- * http://www.gnu.org/licenses/gpl-2.0.html
- *
- * GPL HEADER END
- */
-/*
- * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
- * Use is subject to license terms.
- *
- * Copyright (c) 2011, 2015, Intel Corporation.
- */
-/*
- * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
- */
-
-#define DEBUG_SUBSYSTEM S_MDC
-
-#include <linux/module.h>
-
-#include <lustre_intent.h>
-#include <obd.h>
-#include <obd_class.h>
-#include <lustre_dlm.h>
-#include <lustre_fid.h>
-#include <lustre_mdc.h>
-#include <lustre_net.h>
-#include <lustre_req_layout.h>
-#include <lustre_swab.h>
-
-#include "mdc_internal.h"
-
-struct mdc_getattr_args {
- struct obd_export *ga_exp;
- struct md_enqueue_info *ga_minfo;
-};
-
-int it_open_error(int phase, struct lookup_intent *it)
-{
- if (it_disposition(it, DISP_OPEN_LEASE)) {
- if (phase >= DISP_OPEN_LEASE)
- return it->it_status;
- else
- return 0;
- }
- if (it_disposition(it, DISP_OPEN_OPEN)) {
- if (phase >= DISP_OPEN_OPEN)
- return it->it_status;
- else
- return 0;
- }
-
- if (it_disposition(it, DISP_OPEN_CREATE)) {
- if (phase >= DISP_OPEN_CREATE)
- return it->it_status;
- else
- return 0;
- }
-
- if (it_disposition(it, DISP_LOOKUP_EXECD)) {
- if (phase >= DISP_LOOKUP_EXECD)
- return it->it_status;
- else
- return 0;
- }
-
- if (it_disposition(it, DISP_IT_EXECD)) {
- if (phase >= DISP_IT_EXECD)
- return it->it_status;
- else
- return 0;
- }
- CERROR("it disp: %X, status: %d\n", it->it_disposition,
- it->it_status);
- LBUG();
- return 0;
-}
-EXPORT_SYMBOL(it_open_error);
-
-/* this must be called on a lockh that is known to have a referenced lock */
-int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
- void *data, __u64 *bits)
-{
- struct ldlm_lock *lock;
- struct inode *new_inode = data;
-
- if (bits)
- *bits = 0;
-
- if (!lustre_handle_is_used(lockh))
- return 0;
-
- lock = ldlm_handle2lock(lockh);
-
- LASSERT(lock);
- lock_res_and_lock(lock);
- if (lock->l_resource->lr_lvb_inode &&
- lock->l_resource->lr_lvb_inode != data) {
- struct inode *old_inode = lock->l_resource->lr_lvb_inode;
-
- LASSERTF(old_inode->i_state & I_FREEING,
- "Found existing inode %p/%lu/%u state %lu in lock: setting data to %p/%lu/%u\n",
- old_inode, old_inode->i_ino, old_inode->i_generation,
- old_inode->i_state, new_inode, new_inode->i_ino,
- new_inode->i_generation);
- }
- lock->l_resource->lr_lvb_inode = new_inode;
- if (bits)
- *bits = lock->l_policy_data.l_inodebits.bits;
-
- unlock_res_and_lock(lock);
- LDLM_LOCK_PUT(lock);
-
- return 0;
-}
-
-enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
- const struct lu_fid *fid, enum ldlm_type type,
- union ldlm_policy_data *policy,
- enum ldlm_mode mode,
- struct lustre_handle *lockh)
-{
- struct ldlm_res_id res_id;
- enum ldlm_mode rc;
-
- fid_build_reg_res_name(fid, &res_id);
- /* LU-4405: Clear bits not supported by server */
- policy->l_inodebits.bits &= exp_connect_ibits(exp);
- rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
- &res_id, type, policy, mode, lockh, 0);
- return rc;
-}
-
-int mdc_cancel_unused(struct obd_export *exp,
- const struct lu_fid *fid,
- union ldlm_policy_data *policy,
- enum ldlm_mode mode,
- enum ldlm_cancel_flags flags,
- void *opaque)
-{
- struct ldlm_res_id res_id;
- struct obd_device *obd = class_exp2obd(exp);
- int rc;
-
- fid_build_reg_res_name(fid, &res_id);
- rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
- policy, mode, flags, opaque);
- return rc;
-}
-
-int mdc_null_inode(struct obd_export *exp,
- const struct lu_fid *fid)
-{
- struct ldlm_res_id res_id;
- struct ldlm_resource *res;
- struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
-
- LASSERTF(ns, "no namespace passed\n");
-
- fid_build_reg_res_name(fid, &res_id);
-
- res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
- if (IS_ERR(res))
- return 0;
-
- lock_res(res);
- res->lr_lvb_inode = NULL;
- unlock_res(res);
-
- ldlm_resource_putref(res);
- return 0;
-}
-
-static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
-{
- /* Don't hold error requests for replay. */
- if (req->rq_replay) {
- spin_lock(&req->rq_lock);
- req->rq_replay = 0;
- spin_unlock(&req->rq_lock);
- }
- if (rc && req->rq_transno != 0) {
- DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
- LBUG();
- }
-}
-
-/* Save a large LOV EA into the request buffer so that it is available
- * for replay. We don't do this in the initial request because the
- * original request doesn't need this buffer (at most it sends just the
- * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
- * buffer and may also be difficult to allocate and save a very large
- * request buffer for each open. (bug 5707)
- *
- * OOM here may cause recovery failure if lmm is needed (only for the
- * original open if the MDS crashed just when this client also OOM'd)
- * but this is incredibly unlikely, and questionable whether the client
- * could do MDS recovery under OOM anyways...
- */
-static void mdc_realloc_openmsg(struct ptlrpc_request *req,
- struct mdt_body *body)
-{
- int rc;
-
- /* FIXME: remove this explicit offset. */
- rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
- body->mbo_eadatasize);
- if (rc) {
- CERROR("Can't enlarge segment %d size to %d\n",
- DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize);
- body->mbo_valid &= ~OBD_MD_FLEASIZE;
- body->mbo_eadatasize = 0;
- }
-}
-
-static struct ptlrpc_request *
-mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
- struct md_op_data *op_data)
-{
- struct ptlrpc_request *req;
- struct obd_device *obddev = class_exp2obd(exp);
- struct ldlm_intent *lit;
- const void *lmm = op_data->op_data;
- u32 lmmsize = op_data->op_data_size;
- LIST_HEAD(cancels);
- int count = 0;
- int mode;
- int rc;
-
- it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
-
- /* XXX: openlock is not cancelled for cross-refs. */
- /* If inode is known, cancel conflicting OPEN locks. */
- if (fid_is_sane(&op_data->op_fid2)) {
- if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
- if (it->it_flags & FMODE_WRITE)
- mode = LCK_EX;
- else
- mode = LCK_PR;
- } else {
- if (it->it_flags & (FMODE_WRITE | MDS_OPEN_TRUNC))
- mode = LCK_CW;
- else if (it->it_flags & __FMODE_EXEC)
- mode = LCK_PR;
- else
- mode = LCK_CR;
- }
- count = mdc_resource_get_unused(exp, &op_data->op_fid2,
- &cancels, mode,
- MDS_INODELOCK_OPEN);
- }
-
- /* If CREATE, cancel parent's UPDATE lock. */
- if (it->it_op & IT_CREAT)
- mode = LCK_EX;
- else
- mode = LCK_CR;
- count += mdc_resource_get_unused(exp, &op_data->op_fid1,
- &cancels, mode,
- MDS_INODELOCK_UPDATE);
-
- req = ptlrpc_request_alloc(class_exp2cliimp(exp),
- &RQF_LDLM_INTENT_OPEN);
- if (!req) {
- ldlm_lock_list_put(&cancels, l_bl_ast, count);
- return ERR_PTR(-ENOMEM);
- }
-
- req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
- op_data->op_namelen + 1);
- req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
- max(lmmsize, obddev->u.cli.cl_default_mds_easize));
-
- rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
- if (rc < 0) {
- ptlrpc_request_free(req);
- return ERR_PTR(rc);
- }
-
- spin_lock(&req->rq_lock);
- req->rq_replay = req->rq_import->imp_replayable;
- spin_unlock(&req->rq_lock);
-
- /* pack the intent */
- lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
- lit->opc = (__u64)it->it_op;
-
- /* pack the intended request */
- mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
- lmmsize);
-
- req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
- obddev->u.cli.cl_max_mds_easize);
-
- ptlrpc_request_set_replen(req);
- return req;
-}
-
-static struct ptlrpc_request *
-mdc_intent_getxattr_pack(struct obd_export *exp,
- struct lookup_intent *it,
- struct md_op_data *op_data)
-{
- struct ptlrpc_request *req;
- struct ldlm_intent *lit;
- int rc, count = 0;
- u32 maxdata;
- LIST_HEAD(cancels);
-
- req = ptlrpc_request_alloc(class_exp2cliimp(exp),
- &RQF_LDLM_INTENT_GETXATTR);
- if (!req)
- return ERR_PTR(-ENOMEM);
-
- rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
- if (rc) {
- ptlrpc_request_free(req);
- return ERR_PTR(rc);
- }
-
- /* pack the intent */
- lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
- lit->opc = IT_GETXATTR;
-
- maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
-
- /* pack the intended request */
- mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, maxdata, -1,
- 0);
-
- req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER, maxdata);
-
- req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER, maxdata);
-
- req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
- RCL_SERVER, maxdata);
-
- ptlrpc_request_set_replen(req);
-
- return req;
-}
-
-static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
- struct lookup_intent *it,
- struct md_op_data *op_data)
-{
- struct ptlrpc_request *req;
- struct obd_device *obddev = class_exp2obd(exp);
- struct ldlm_intent *lit;
- int rc;
-
- req = ptlrpc_request_alloc(class_exp2cliimp(exp),
- &RQF_LDLM_INTENT_UNLINK);
- if (!req)
- return ERR_PTR(-ENOMEM);
-
- req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
- op_data->op_namelen + 1);
-
- rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
- if (rc) {
- ptlrpc_request_free(req);
- return ERR_PTR(rc);
- }
-
- /* pack the intent */
- lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
- lit->opc = (__u64)it->it_op;
-
- /* pack the intended request */
- mdc_unlink_pack(req, op_data);
-
- req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
- obddev->u.cli.cl_default_mds_easize);
- ptlrpc_request_set_replen(req);
- return req;
-}
-
-static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
- struct lookup_intent *it,
- struct md_op_data *op_data)
-{
- struct ptlrpc_request *req;
- struct obd_device *obddev = class_exp2obd(exp);
- u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
- OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
- OBD_MD_MEA | OBD_MD_FLACL;
- struct ldlm_intent *lit;
- int rc;
- u32 easize;
-
- req = ptlrpc_request_alloc(class_exp2cliimp(exp),
- &RQF_LDLM_INTENT_GETATTR);
- if (!req)
- return ERR_PTR(-ENOMEM);
-
- req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
- op_data->op_namelen + 1);
-
- rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
- if (rc) {
- ptlrpc_request_free(req);
- return ERR_PTR(rc);
- }
-
- /* pack the intent */
- lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
- lit->opc = (__u64)it->it_op;
-
- if (obddev->u.cli.cl_default_mds_easize > 0)
- easize = obddev->u.cli.cl_default_mds_easize;
- else
- easize = obddev->u.cli.cl_max_mds_easize;
-
- /* pack the intended request */
- mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
-
- req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
- ptlrpc_request_set_replen(req);
- return req;
-}
-
-static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
- struct lookup_intent *it,
- struct md_op_data *unused)
-{
- struct obd_device *obd = class_exp2obd(exp);
- struct ptlrpc_request *req;
- struct ldlm_intent *lit;
- struct layout_intent *layout;
- int rc;
-
- req = ptlrpc_request_alloc(class_exp2cliimp(exp),
- &RQF_LDLM_INTENT_LAYOUT);
- if (!req)
- return ERR_PTR(-ENOMEM);
-
- req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
- rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
- if (rc) {
- ptlrpc_request_free(req);
- return ERR_PTR(rc);
- }
-
- /* pack the intent */
- lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
- lit->opc = (__u64)it->it_op;
-
- /* pack the layout intent request */
- layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
- /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
- * set for replication
- */
- layout->li_opc = LAYOUT_INTENT_ACCESS;
-
- req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
- obd->u.cli.cl_default_mds_easize);
- ptlrpc_request_set_replen(req);
- return req;
-}
-
-static struct ptlrpc_request *
-mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
-{
- struct ptlrpc_request *req;
- int rc;
-
- req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
- if (!req)
- return ERR_PTR(-ENOMEM);
-
- rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
- if (rc) {
- ptlrpc_request_free(req);
- return ERR_PTR(rc);
- }
-
- req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
- ptlrpc_request_set_replen(req);
- return req;
-}
-
-static int mdc_finish_enqueue(struct obd_export *exp,
- struct ptlrpc_request *req,
- struct ldlm_enqueue_info *einfo,
- struct lookup_intent *it,
- struct lustre_handle *lockh,
- int rc)
-{
- struct req_capsule *pill = &req->rq_pill;
- struct ldlm_request *lockreq;
- struct ldlm_reply *lockrep;
- struct ldlm_lock *lock;
- void *lvb_data = NULL;
- u32 lvb_len = 0;
-
- LASSERT(rc >= 0);
- /* Similarly, if we're going to replay this request, we don't want to
- * actually get a lock, just perform the intent.
- */
- if (req->rq_transno || req->rq_replay) {
- lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
- lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
- }
-
- if (rc == ELDLM_LOCK_ABORTED) {
- einfo->ei_mode = 0;
- memset(lockh, 0, sizeof(*lockh));
- rc = 0;
- } else { /* rc = 0 */
- lock = ldlm_handle2lock(lockh);
-
- /* If the server gave us back a different lock mode, we should
- * fix up our variables.
- */
- if (lock->l_req_mode != einfo->ei_mode) {
- ldlm_lock_addref(lockh, lock->l_req_mode);
- ldlm_lock_decref(lockh, einfo->ei_mode);
- einfo->ei_mode = lock->l_req_mode;
- }
- LDLM_LOCK_PUT(lock);
- }
-
- lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
-
- it->it_disposition = (int)lockrep->lock_policy_res1;
- it->it_status = (int)lockrep->lock_policy_res2;
- it->it_lock_mode = einfo->ei_mode;
- it->it_lock_handle = lockh->cookie;
- it->it_request = req;
-
- /* Technically speaking rq_transno must already be zero if
- * it_status is in error, so the check is a bit redundant
- */
- if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
- mdc_clear_replay_flag(req, it->it_status);
-
- /* If we're doing an IT_OPEN which did not result in an actual
- * successful open, then we need to remove the bit which saves
- * this request for unconditional replay.
- *
- * It's important that we do this first! Otherwise we might exit the
- * function without doing so, and try to replay a failed create
- * (bug 3440)
- */
- if (it->it_op & IT_OPEN && req->rq_replay &&
- (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
- mdc_clear_replay_flag(req, it->it_status);
-
- DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
- it->it_op, it->it_disposition, it->it_status);
-
- /* We know what to expect, so we do any byte flipping required here */
- if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
- struct mdt_body *body;
-
- body = req_capsule_server_get(pill, &RMF_MDT_BODY);
- if (!body) {
- CERROR("Can't swab mdt_body\n");
- return -EPROTO;
- }
-
- if (it_disposition(it, DISP_OPEN_OPEN) &&
- !it_open_error(DISP_OPEN_OPEN, it)) {
- /*
- * If this is a successful OPEN request, we need to set
- * replay handler and data early, so that if replay
- * happens immediately after swabbing below, new reply
- * is swabbed by that handler correctly.
- */
- mdc_set_open_replay_data(NULL, NULL, it);
- }
-
- if ((body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
- void *eadata;
-
- mdc_update_max_ea_from_body(exp, body);
-
- /*
- * The eadata is opaque; just check that it is there.
- * Eventually, obd_unpackmd() will check the contents.
- */
- eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
- body->mbo_eadatasize);
- if (!eadata)
- return -EPROTO;
-
- /* save lvb data and length in case this is for layout
- * lock
- */
- lvb_data = eadata;
- lvb_len = body->mbo_eadatasize;
-
- /*
- * We save the reply LOV EA in case we have to replay a
- * create for recovery. If we didn't allocate a large
- * enough request buffer above we need to reallocate it
- * here to hold the actual LOV EA.
- *
- * To not save LOV EA if request is not going to replay
- * (for example error one).
- */
- if ((it->it_op & IT_OPEN) && req->rq_replay) {
- void *lmm;
-
- if (req_capsule_get_size(pill, &RMF_EADATA,
- RCL_CLIENT) <
- body->mbo_eadatasize)
- mdc_realloc_openmsg(req, body);
- else
- req_capsule_shrink(pill, &RMF_EADATA,
- body->mbo_eadatasize,
- RCL_CLIENT);
-
- req_capsule_set_size(pill, &RMF_EADATA,
- RCL_CLIENT,
- body->mbo_eadatasize);
-
- lmm = req_capsule_client_get(pill, &RMF_EADATA);
- if (lmm)
- memcpy(lmm, eadata, body->mbo_eadatasize);
- }
- }
- } else if (it->it_op & IT_LAYOUT) {
- /* maybe the lock was granted right away and layout
- * is packed into RMF_DLM_LVB of req
- */
- lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
- if (lvb_len > 0) {
- lvb_data = req_capsule_server_sized_get(pill,
- &RMF_DLM_LVB,
- lvb_len);
- if (!lvb_data)
- return -EPROTO;
- }
- }
-
- /* fill in stripe data for layout lock */
- lock = ldlm_handle2lock(lockh);
- if (lock && ldlm_has_layout(lock) && lvb_data) {
- void *lmm;
-
- LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
- ldlm_it2str(it->it_op), lvb_len);
-
- lmm = kvzalloc(lvb_len, GFP_NOFS);
- if (!lmm) {
- LDLM_LOCK_PUT(lock);
- return -ENOMEM;
- }
- memcpy(lmm, lvb_data, lvb_len);
-
- /* install lvb_data */
- lock_res_and_lock(lock);
- if (!lock->l_lvb_data) {
- lock->l_lvb_type = LVB_T_LAYOUT;
- lock->l_lvb_data = lmm;
- lock->l_lvb_len = lvb_len;
- lmm = NULL;
- }
- unlock_res_and_lock(lock);
- if (lmm)
- kvfree(lmm);
- }
- if (lock)
- LDLM_LOCK_PUT(lock);
-
- return rc;
-}
-
-/* We always reserve enough space in the reply packet for a stripe MD, because
- * we don't know in advance the file type.
- */
-int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
- const union ldlm_policy_data *policy,
- struct lookup_intent *it, struct md_op_data *op_data,
- struct lustre_handle *lockh, u64 extra_lock_flags)
-{
- static const union ldlm_policy_data lookup_policy = {
- .l_inodebits = { MDS_INODELOCK_LOOKUP }
- };
- static const union ldlm_policy_data update_policy = {
- .l_inodebits = { MDS_INODELOCK_UPDATE }
- };
- static const union ldlm_policy_data layout_policy = {
- .l_inodebits = { MDS_INODELOCK_LAYOUT }
- };
- static const union ldlm_policy_data getxattr_policy = {
- .l_inodebits = { MDS_INODELOCK_XATTR }
- };
- struct obd_device *obddev = class_exp2obd(exp);
- struct ptlrpc_request *req = NULL;
- u64 flags, saved_flags = extra_lock_flags;
- struct ldlm_res_id res_id;
- int generation, resends = 0;
- struct ldlm_reply *lockrep;
- enum lvb_type lvb_type = LVB_T_NONE;
- int rc;
-
- LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
- einfo->ei_type);
- fid_build_reg_res_name(&op_data->op_fid1, &res_id);
-
- if (it) {
- LASSERT(!policy);
-
- saved_flags |= LDLM_FL_HAS_INTENT;
- if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
- policy = &update_policy;
- else if (it->it_op & IT_LAYOUT)
- policy = &layout_policy;
- else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
- policy = &getxattr_policy;
- else
- policy = &lookup_policy;
- }
-
- generation = obddev->u.cli.cl_import->imp_generation;
-resend:
- flags = saved_flags;
- if (!it) {
- /* The only way right now is FLOCK. */
- LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
- einfo->ei_type);
- res_id.name[3] = LDLM_FLOCK;
- } else if (it->it_op & IT_OPEN) {
- req = mdc_intent_open_pack(exp, it, op_data);
- } else if (it->it_op & IT_UNLINK) {
- req = mdc_intent_unlink_pack(exp, it, op_data);
- } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
- req = mdc_intent_getattr_pack(exp, it, op_data);
- } else if (it->it_op & IT_READDIR) {
- req = mdc_enqueue_pack(exp, 0);
- } else if (it->it_op & IT_LAYOUT) {
- if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
- return -EOPNOTSUPP;
- req = mdc_intent_layout_pack(exp, it, op_data);
- lvb_type = LVB_T_LAYOUT;
- } else if (it->it_op & IT_GETXATTR) {
- req = mdc_intent_getxattr_pack(exp, it, op_data);
- } else {
- LBUG();
- return -EINVAL;
- }
-
- if (IS_ERR(req))
- return PTR_ERR(req);
-
- if (resends) {
- req->rq_generation_set = 1;
- req->rq_import_generation = generation;
- req->rq_sent = ktime_get_real_seconds() + resends;
- }
-
- /* It is important to obtain modify RPC slot first (if applicable), so
- * that threads that are waiting for a modify RPC slot are not polluting
- * our rpcs in flight counter.
- * We do not do flock request limiting, though
- */
- if (it) {
- mdc_get_mod_rpc_slot(req, it);
- rc = obd_get_request_slot(&obddev->u.cli);
- if (rc != 0) {
- mdc_put_mod_rpc_slot(req, it);
- mdc_clear_replay_flag(req, 0);
- ptlrpc_req_finished(req);
- return rc;
- }
- }
-
- rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
- 0, lvb_type, lockh, 0);
- if (!it) {
- /* For flock requests we immediately return without further
- * delay and let caller deal with the rest, since rest of
- * this function metadata processing makes no sense for flock
- * requests anyway. But in case of problem during comms with
- * Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
- * can not rely on caller and this mainly for F_UNLCKs
- * (explicits or automatically generated by Kernel to clean
- * current FLocks upon exit) that can't be trashed
- */
- if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
- (einfo->ei_type == LDLM_FLOCK) &&
- (einfo->ei_mode == LCK_NL))
- goto resend;
- return rc;
- }
-
- obd_put_request_slot(&obddev->u.cli);
- mdc_put_mod_rpc_slot(req, it);
-
- if (rc < 0) {
- CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n",
- obddev->obd_name, rc);
-
- mdc_clear_replay_flag(req, rc);
- ptlrpc_req_finished(req);
- return rc;
- }
-
- lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
-
- lockrep->lock_policy_res2 =
- ptlrpc_status_ntoh(lockrep->lock_policy_res2);
-
- /*
- * Retry infinitely when the server returns -EINPROGRESS for the
- * intent operation, when server returns -EINPROGRESS for acquiring
- * intent lock, we'll retry in after_reply().
- */
- if (it->it_op && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
- mdc_clear_replay_flag(req, rc);
- ptlrpc_req_finished(req);
- resends++;
-
- CDEBUG(D_HA, "%s: resend:%d op:%d " DFID "/" DFID "\n",
- obddev->obd_name, resends, it->it_op,
- PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
-
- if (generation == obddev->u.cli.cl_import->imp_generation) {
- goto resend;
- } else {
- CDEBUG(D_HA, "resend cross eviction\n");
- return -EIO;
- }
- }
-
- rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
- if (rc < 0) {
- if (lustre_handle_is_used(lockh)) {
- ldlm_lock_decref(lockh, einfo->ei_mode);
- memset(lockh, 0, sizeof(*lockh));
- }
- ptlrpc_req_finished(req);
-
- it->it_lock_handle = 0;
- it->it_lock_mode = 0;
- it->it_request = NULL;
- }
-
- return rc;
-}
-
-static int mdc_finish_intent_lock(struct obd_export *exp,
- struct ptlrpc_request *request,
- struct md_op_data *op_data,
- struct lookup_intent *it,
- struct lustre_handle *lockh)
-{
- struct lustre_handle old_lock;
- struct mdt_body *mdt_body;
- struct ldlm_lock *lock;
- int rc;
-
- LASSERT(request != LP_POISON);
- LASSERT(request->rq_repmsg != LP_POISON);
-
- if (it->it_op & IT_READDIR)
- return 0;
-
- if (!it_disposition(it, DISP_IT_EXECD)) {
- /* The server failed before it even started executing the
- * intent, i.e. because it couldn't unpack the request.
- */
- LASSERT(it->it_status != 0);
- return it->it_status;
- }
- rc = it_open_error(DISP_IT_EXECD, it);
- if (rc)
- return rc;
-
- mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
- LASSERT(mdt_body); /* mdc_enqueue checked */
-
- rc = it_open_error(DISP_LOOKUP_EXECD, it);
- if (rc)
- return rc;
-
- /* keep requests around for the multiple phases of the call
- * this shows the DISP_XX must guarantee we make it into the call
- */
- if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
- it_disposition(it, DISP_OPEN_CREATE) &&
- !it_open_error(DISP_OPEN_CREATE, it)) {
- it_set_disposition(it, DISP_ENQ_CREATE_REF);
- ptlrpc_request_addref(request); /* balanced in ll_create_node */
- }
- if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
- it_disposition(it, DISP_OPEN_OPEN) &&
- !it_open_error(DISP_OPEN_OPEN, it)) {
- it_set_disposition(it, DISP_ENQ_OPEN_REF);
- ptlrpc_request_addref(request); /* balanced in ll_file_open */
- /* BUG 11546 - eviction in the middle of open rpc processing */
- OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
- }
-
- if (it->it_op & IT_CREAT)
- /* XXX this belongs in ll_create_it */
- ;
- else if (it->it_op == IT_OPEN)
- LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
- else
- LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
-
- /* If we already have a matching lock, then cancel the new
- * one. We have to set the data here instead of in
- * mdc_enqueue, because we need to use the child's inode as
- * the l_ast_data to match, and that's not available until
- * intent_finish has performed the iget().)
- */
- lock = ldlm_handle2lock(lockh);
- if (lock) {
- union ldlm_policy_data policy = lock->l_policy_data;
-
- LDLM_DEBUG(lock, "matching against this");
-
- LASSERTF(fid_res_name_eq(&mdt_body->mbo_fid1,
- &lock->l_resource->lr_name),
- "Lock res_id: " DLDLMRES ", fid: " DFID "\n",
- PLDLMRES(lock->l_resource), PFID(&mdt_body->mbo_fid1));
- LDLM_LOCK_PUT(lock);
-
- memcpy(&old_lock, lockh, sizeof(*lockh));
- if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
- LDLM_IBITS, &policy, LCK_NL,
- &old_lock, 0)) {
- ldlm_lock_decref_and_cancel(lockh,
- it->it_lock_mode);
- memcpy(lockh, &old_lock, sizeof(old_lock));
- it->it_lock_handle = lockh->cookie;
- }
- }
- CDEBUG(D_DENTRY,
- "D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
- (int)op_data->op_namelen, op_data->op_name,
- ldlm_it2str(it->it_op), it->it_status, it->it_disposition, rc);
- return rc;
-}
-
-int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
- struct lu_fid *fid, __u64 *bits)
-{
- /* We could just return 1 immediately, but since we should only
- * be called in revalidate_it if we already have a lock, let's
- * verify that.
- */
- struct ldlm_res_id res_id;
- struct lustre_handle lockh;
- union ldlm_policy_data policy;
- enum ldlm_mode mode;
-
- if (it->it_lock_handle) {
- lockh.cookie = it->it_lock_handle;
- mode = ldlm_revalidate_lock_handle(&lockh, bits);
- } else {
- fid_build_reg_res_name(fid, &res_id);
- switch (it->it_op) {
- case IT_GETATTR:
- /* File attributes are held under multiple bits:
- * nlink is under lookup lock, size and times are
- * under UPDATE lock and recently we've also got
- * a separate permissions lock for owner/group/acl that
- * were protected by lookup lock before.
- * Getattr must provide all of that information,
- * so we need to ensure we have all of those locks.
- * Unfortunately, if the bits are split across multiple
- * locks, there's no easy way to match all of them here,
- * so an extra RPC would be performed to fetch all
- * of those bits at once for now.
- */
- /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
- * but for old MDTs (< 2.4), permission is covered
- * by LOOKUP lock, so it needs to match all bits here.
- */
- policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
- MDS_INODELOCK_LOOKUP |
- MDS_INODELOCK_PERM;
- break;
- case IT_READDIR:
- policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
- break;
- case IT_LAYOUT:
- policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
- break;
- default:
- policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
- break;
- }
-
- mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
- LDLM_IBITS, &policy,
- LCK_CR | LCK_CW | LCK_PR | LCK_PW,
- &lockh);
- }
-
- if (mode) {
- it->it_lock_handle = lockh.cookie;
- it->it_lock_mode = mode;
- } else {
- it->it_lock_handle = 0;
- it->it_lock_mode = 0;
- }
-
- return !!mode;
-}
-
-/*
- * This long block is all about fixing up the lock and request state
- * so that it is correct as of the moment _before_ the operation was
- * applied; that way, the VFS will think that everything is normal and
- * call Lustre's regular VFS methods.
- *
- * If we're performing a creation, that means that unless the creation
- * failed with EEXIST, we should fake up a negative dentry.
- *
- * For everything else, we want the lookup to succeed.
- *
- * One additional note: if CREATE or OPEN succeeded, we add an extra
- * reference to the request because we need to keep it around until
- * ll_create/ll_open gets called.
- *
- * The server will return to us, in it_disposition, an indication of
- * exactly what it_status refers to.
- *
- * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
- * otherwise if DISP_OPEN_CREATE is set, then it_status is the
- * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
- * DISP_LOOKUP_POS will be set, indicating whether the child lookup
- * was successful.
- *
- * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
- * child lookup.
- */
-int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
- struct lookup_intent *it, struct ptlrpc_request **reqp,
- ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
-{
- struct ldlm_enqueue_info einfo = {
- .ei_type = LDLM_IBITS,
- .ei_mode = it_to_lock_mode(it),
- .ei_cb_bl = cb_blocking,
- .ei_cb_cp = ldlm_completion_ast,
- };
- struct lustre_handle lockh;
- int rc = 0;
-
- LASSERT(it);
-
- CDEBUG(D_DLMTRACE, "(name: %.*s," DFID ") in obj " DFID
- ", intent: %s flags %#Lo\n", (int)op_data->op_namelen,
- op_data->op_name, PFID(&op_data->op_fid2),
- PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
- it->it_flags);
-
- lockh.cookie = 0;
- if (fid_is_sane(&op_data->op_fid2) &&
- (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
- /* We could just return 1 immediately, but since we should only
- * be called in revalidate_it if we already have a lock, let's
- * verify that.
- */
- it->it_lock_handle = 0;
- rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
- /* Only return failure if it was not GETATTR by cfid
- * (from inode_revalidate)
- */
- if (rc || op_data->op_namelen != 0)
- return rc;
- }
-
- /* For case if upper layer did not alloc fid, do it now. */
- if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
- rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
- if (rc < 0) {
- CERROR("Can't alloc new fid, rc %d\n", rc);
- return rc;
- }
- }
- rc = mdc_enqueue(exp, &einfo, NULL, it, op_data, &lockh,
- extra_lock_flags);
- if (rc < 0)
- return rc;
-
- *reqp = it->it_request;
- rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
- return rc;
-}
-
-static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
- struct ptlrpc_request *req,
- void *args, int rc)
-{
- struct mdc_getattr_args *ga = args;
- struct obd_export *exp = ga->ga_exp;
- struct md_enqueue_info *minfo = ga->ga_minfo;
- struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
- struct lookup_intent *it;
- struct lustre_handle *lockh;
- struct obd_device *obddev;
- struct ldlm_reply *lockrep;
- __u64 flags = LDLM_FL_HAS_INTENT;
-
- it = &minfo->mi_it;
- lockh = &minfo->mi_lockh;
-
- obddev = class_exp2obd(exp);
-
- obd_put_request_slot(&obddev->u.cli);
- if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
- rc = -ETIMEDOUT;
-
- rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
- &flags, NULL, 0, lockh, rc);
- if (rc < 0) {
- CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
- mdc_clear_replay_flag(req, rc);
- goto out;
- }
-
- lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
-
- lockrep->lock_policy_res2 =
- ptlrpc_status_ntoh(lockrep->lock_policy_res2);
-
- rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
- if (rc)
- goto out;
-
- rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
-
-out:
- minfo->mi_cb(req, minfo, rc);
- return 0;
-}
-
-int mdc_intent_getattr_async(struct obd_export *exp,
- struct md_enqueue_info *minfo)
-{
- struct md_op_data *op_data = &minfo->mi_data;
- struct lookup_intent *it = &minfo->mi_it;
- struct ptlrpc_request *req;
- struct mdc_getattr_args *ga;
- struct obd_device *obddev = class_exp2obd(exp);
- struct ldlm_res_id res_id;
- union ldlm_policy_data policy = {
- .l_inodebits = { MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE }
- };
- int rc = 0;
- __u64 flags = LDLM_FL_HAS_INTENT;
-
- CDEBUG(D_DLMTRACE,
- "name: %.*s in inode " DFID ", intent: %s flags %#Lo\n",
- (int)op_data->op_namelen, op_data->op_name,
- PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
-
- fid_build_reg_res_name(&op_data->op_fid1, &res_id);
- req = mdc_intent_getattr_pack(exp, it, op_data);
- if (IS_ERR(req))
- return PTR_ERR(req);
-
- rc = obd_get_request_slot(&obddev->u.cli);
- if (rc != 0) {
- ptlrpc_req_finished(req);
- return rc;
- }
-
- rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
- &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
- if (rc < 0) {
- obd_put_request_slot(&obddev->u.cli);
- ptlrpc_req_finished(req);
- return rc;
- }
-
- BUILD_BUG_ON(sizeof(*ga) > sizeof(req->rq_async_args));
- ga = ptlrpc_req_async_args(req);
- ga->ga_exp = exp;
- ga->ga_minfo = minfo;
-
- req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
- ptlrpcd_add_req(req);
-
- return 0;
-}