diff options
Diffstat (limited to 'drivers/staging/lustre/lustre/mdc/mdc_locks.c')
-rw-r--r-- | drivers/staging/lustre/lustre/mdc/mdc_locks.c | 1202 |
1 files changed, 0 insertions, 1202 deletions
diff --git a/drivers/staging/lustre/lustre/mdc/mdc_locks.c b/drivers/staging/lustre/lustre/mdc/mdc_locks.c deleted file mode 100644 index 695ef44532cf..000000000000 --- a/drivers/staging/lustre/lustre/mdc/mdc_locks.c +++ /dev/null @@ -1,1202 +0,0 @@ -// SPDX-License-Identifier: GPL-2.0 -/* - * GPL HEADER START - * - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 only, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License version 2 for more details (a copy is included - * in the LICENSE file that accompanied this code). - * - * You should have received a copy of the GNU General Public License - * version 2 along with this program; If not, see - * http://www.gnu.org/licenses/gpl-2.0.html - * - * GPL HEADER END - */ -/* - * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. - * Use is subject to license terms. - * - * Copyright (c) 2011, 2015, Intel Corporation. - */ -/* - * This file is part of Lustre, http://www.lustre.org/ - * Lustre is a trademark of Sun Microsystems, Inc. - */ - -#define DEBUG_SUBSYSTEM S_MDC - -#include <linux/module.h> - -#include <lustre_intent.h> -#include <obd.h> -#include <obd_class.h> -#include <lustre_dlm.h> -#include <lustre_fid.h> -#include <lustre_mdc.h> -#include <lustre_net.h> -#include <lustre_req_layout.h> -#include <lustre_swab.h> - -#include "mdc_internal.h" - -struct mdc_getattr_args { - struct obd_export *ga_exp; - struct md_enqueue_info *ga_minfo; -}; - -int it_open_error(int phase, struct lookup_intent *it) -{ - if (it_disposition(it, DISP_OPEN_LEASE)) { - if (phase >= DISP_OPEN_LEASE) - return it->it_status; - else - return 0; - } - if (it_disposition(it, DISP_OPEN_OPEN)) { - if (phase >= DISP_OPEN_OPEN) - return it->it_status; - else - return 0; - } - - if (it_disposition(it, DISP_OPEN_CREATE)) { - if (phase >= DISP_OPEN_CREATE) - return it->it_status; - else - return 0; - } - - if (it_disposition(it, DISP_LOOKUP_EXECD)) { - if (phase >= DISP_LOOKUP_EXECD) - return it->it_status; - else - return 0; - } - - if (it_disposition(it, DISP_IT_EXECD)) { - if (phase >= DISP_IT_EXECD) - return it->it_status; - else - return 0; - } - CERROR("it disp: %X, status: %d\n", it->it_disposition, - it->it_status); - LBUG(); - return 0; -} -EXPORT_SYMBOL(it_open_error); - -/* this must be called on a lockh that is known to have a referenced lock */ -int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh, - void *data, __u64 *bits) -{ - struct ldlm_lock *lock; - struct inode *new_inode = data; - - if (bits) - *bits = 0; - - if (!lustre_handle_is_used(lockh)) - return 0; - - lock = ldlm_handle2lock(lockh); - - LASSERT(lock); - lock_res_and_lock(lock); - if (lock->l_resource->lr_lvb_inode && - lock->l_resource->lr_lvb_inode != data) { - struct inode *old_inode = lock->l_resource->lr_lvb_inode; - - LASSERTF(old_inode->i_state & I_FREEING, - "Found existing inode %p/%lu/%u state %lu in lock: setting data to %p/%lu/%u\n", - old_inode, old_inode->i_ino, old_inode->i_generation, - old_inode->i_state, new_inode, new_inode->i_ino, - new_inode->i_generation); - } - lock->l_resource->lr_lvb_inode = new_inode; - if (bits) - *bits = lock->l_policy_data.l_inodebits.bits; - - unlock_res_and_lock(lock); - LDLM_LOCK_PUT(lock); - - return 0; -} - -enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags, - const struct lu_fid *fid, enum ldlm_type type, - union ldlm_policy_data *policy, - enum ldlm_mode mode, - struct lustre_handle *lockh) -{ - struct ldlm_res_id res_id; - enum ldlm_mode rc; - - fid_build_reg_res_name(fid, &res_id); - /* LU-4405: Clear bits not supported by server */ - policy->l_inodebits.bits &= exp_connect_ibits(exp); - rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags, - &res_id, type, policy, mode, lockh, 0); - return rc; -} - -int mdc_cancel_unused(struct obd_export *exp, - const struct lu_fid *fid, - union ldlm_policy_data *policy, - enum ldlm_mode mode, - enum ldlm_cancel_flags flags, - void *opaque) -{ - struct ldlm_res_id res_id; - struct obd_device *obd = class_exp2obd(exp); - int rc; - - fid_build_reg_res_name(fid, &res_id); - rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id, - policy, mode, flags, opaque); - return rc; -} - -int mdc_null_inode(struct obd_export *exp, - const struct lu_fid *fid) -{ - struct ldlm_res_id res_id; - struct ldlm_resource *res; - struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace; - - LASSERTF(ns, "no namespace passed\n"); - - fid_build_reg_res_name(fid, &res_id); - - res = ldlm_resource_get(ns, NULL, &res_id, 0, 0); - if (IS_ERR(res)) - return 0; - - lock_res(res); - res->lr_lvb_inode = NULL; - unlock_res(res); - - ldlm_resource_putref(res); - return 0; -} - -static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc) -{ - /* Don't hold error requests for replay. */ - if (req->rq_replay) { - spin_lock(&req->rq_lock); - req->rq_replay = 0; - spin_unlock(&req->rq_lock); - } - if (rc && req->rq_transno != 0) { - DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc); - LBUG(); - } -} - -/* Save a large LOV EA into the request buffer so that it is available - * for replay. We don't do this in the initial request because the - * original request doesn't need this buffer (at most it sends just the - * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty - * buffer and may also be difficult to allocate and save a very large - * request buffer for each open. (bug 5707) - * - * OOM here may cause recovery failure if lmm is needed (only for the - * original open if the MDS crashed just when this client also OOM'd) - * but this is incredibly unlikely, and questionable whether the client - * could do MDS recovery under OOM anyways... - */ -static void mdc_realloc_openmsg(struct ptlrpc_request *req, - struct mdt_body *body) -{ - int rc; - - /* FIXME: remove this explicit offset. */ - rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4, - body->mbo_eadatasize); - if (rc) { - CERROR("Can't enlarge segment %d size to %d\n", - DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize); - body->mbo_valid &= ~OBD_MD_FLEASIZE; - body->mbo_eadatasize = 0; - } -} - -static struct ptlrpc_request * -mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it, - struct md_op_data *op_data) -{ - struct ptlrpc_request *req; - struct obd_device *obddev = class_exp2obd(exp); - struct ldlm_intent *lit; - const void *lmm = op_data->op_data; - u32 lmmsize = op_data->op_data_size; - LIST_HEAD(cancels); - int count = 0; - int mode; - int rc; - - it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG; - - /* XXX: openlock is not cancelled for cross-refs. */ - /* If inode is known, cancel conflicting OPEN locks. */ - if (fid_is_sane(&op_data->op_fid2)) { - if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */ - if (it->it_flags & FMODE_WRITE) - mode = LCK_EX; - else - mode = LCK_PR; - } else { - if (it->it_flags & (FMODE_WRITE | MDS_OPEN_TRUNC)) - mode = LCK_CW; - else if (it->it_flags & __FMODE_EXEC) - mode = LCK_PR; - else - mode = LCK_CR; - } - count = mdc_resource_get_unused(exp, &op_data->op_fid2, - &cancels, mode, - MDS_INODELOCK_OPEN); - } - - /* If CREATE, cancel parent's UPDATE lock. */ - if (it->it_op & IT_CREAT) - mode = LCK_EX; - else - mode = LCK_CR; - count += mdc_resource_get_unused(exp, &op_data->op_fid1, - &cancels, mode, - MDS_INODELOCK_UPDATE); - - req = ptlrpc_request_alloc(class_exp2cliimp(exp), - &RQF_LDLM_INTENT_OPEN); - if (!req) { - ldlm_lock_list_put(&cancels, l_bl_ast, count); - return ERR_PTR(-ENOMEM); - } - - req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, - op_data->op_namelen + 1); - req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, - max(lmmsize, obddev->u.cli.cl_default_mds_easize)); - - rc = ldlm_prep_enqueue_req(exp, req, &cancels, count); - if (rc < 0) { - ptlrpc_request_free(req); - return ERR_PTR(rc); - } - - spin_lock(&req->rq_lock); - req->rq_replay = req->rq_import->imp_replayable; - spin_unlock(&req->rq_lock); - - /* pack the intent */ - lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); - lit->opc = (__u64)it->it_op; - - /* pack the intended request */ - mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm, - lmmsize); - - req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, - obddev->u.cli.cl_max_mds_easize); - - ptlrpc_request_set_replen(req); - return req; -} - -static struct ptlrpc_request * -mdc_intent_getxattr_pack(struct obd_export *exp, - struct lookup_intent *it, - struct md_op_data *op_data) -{ - struct ptlrpc_request *req; - struct ldlm_intent *lit; - int rc, count = 0; - u32 maxdata; - LIST_HEAD(cancels); - - req = ptlrpc_request_alloc(class_exp2cliimp(exp), - &RQF_LDLM_INTENT_GETXATTR); - if (!req) - return ERR_PTR(-ENOMEM); - - rc = ldlm_prep_enqueue_req(exp, req, &cancels, count); - if (rc) { - ptlrpc_request_free(req); - return ERR_PTR(rc); - } - - /* pack the intent */ - lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); - lit->opc = IT_GETXATTR; - - maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize; - - /* pack the intended request */ - mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, maxdata, -1, - 0); - - req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER, maxdata); - - req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER, maxdata); - - req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, - RCL_SERVER, maxdata); - - ptlrpc_request_set_replen(req); - - return req; -} - -static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp, - struct lookup_intent *it, - struct md_op_data *op_data) -{ - struct ptlrpc_request *req; - struct obd_device *obddev = class_exp2obd(exp); - struct ldlm_intent *lit; - int rc; - - req = ptlrpc_request_alloc(class_exp2cliimp(exp), - &RQF_LDLM_INTENT_UNLINK); - if (!req) - return ERR_PTR(-ENOMEM); - - req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, - op_data->op_namelen + 1); - - rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); - if (rc) { - ptlrpc_request_free(req); - return ERR_PTR(rc); - } - - /* pack the intent */ - lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); - lit->opc = (__u64)it->it_op; - - /* pack the intended request */ - mdc_unlink_pack(req, op_data); - - req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, - obddev->u.cli.cl_default_mds_easize); - ptlrpc_request_set_replen(req); - return req; -} - -static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp, - struct lookup_intent *it, - struct md_op_data *op_data) -{ - struct ptlrpc_request *req; - struct obd_device *obddev = class_exp2obd(exp); - u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | - OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA | - OBD_MD_MEA | OBD_MD_FLACL; - struct ldlm_intent *lit; - int rc; - u32 easize; - - req = ptlrpc_request_alloc(class_exp2cliimp(exp), - &RQF_LDLM_INTENT_GETATTR); - if (!req) - return ERR_PTR(-ENOMEM); - - req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, - op_data->op_namelen + 1); - - rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); - if (rc) { - ptlrpc_request_free(req); - return ERR_PTR(rc); - } - - /* pack the intent */ - lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); - lit->opc = (__u64)it->it_op; - - if (obddev->u.cli.cl_default_mds_easize > 0) - easize = obddev->u.cli.cl_default_mds_easize; - else - easize = obddev->u.cli.cl_max_mds_easize; - - /* pack the intended request */ - mdc_getattr_pack(req, valid, it->it_flags, op_data, easize); - - req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize); - ptlrpc_request_set_replen(req); - return req; -} - -static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp, - struct lookup_intent *it, - struct md_op_data *unused) -{ - struct obd_device *obd = class_exp2obd(exp); - struct ptlrpc_request *req; - struct ldlm_intent *lit; - struct layout_intent *layout; - int rc; - - req = ptlrpc_request_alloc(class_exp2cliimp(exp), - &RQF_LDLM_INTENT_LAYOUT); - if (!req) - return ERR_PTR(-ENOMEM); - - req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0); - rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); - if (rc) { - ptlrpc_request_free(req); - return ERR_PTR(rc); - } - - /* pack the intent */ - lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); - lit->opc = (__u64)it->it_op; - - /* pack the layout intent request */ - layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT); - /* LAYOUT_INTENT_ACCESS is generic, specific operation will be - * set for replication - */ - layout->li_opc = LAYOUT_INTENT_ACCESS; - - req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, - obd->u.cli.cl_default_mds_easize); - ptlrpc_request_set_replen(req); - return req; -} - -static struct ptlrpc_request * -mdc_enqueue_pack(struct obd_export *exp, int lvb_len) -{ - struct ptlrpc_request *req; - int rc; - - req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE); - if (!req) - return ERR_PTR(-ENOMEM); - - rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); - if (rc) { - ptlrpc_request_free(req); - return ERR_PTR(rc); - } - - req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len); - ptlrpc_request_set_replen(req); - return req; -} - -static int mdc_finish_enqueue(struct obd_export *exp, - struct ptlrpc_request *req, - struct ldlm_enqueue_info *einfo, - struct lookup_intent *it, - struct lustre_handle *lockh, - int rc) -{ - struct req_capsule *pill = &req->rq_pill; - struct ldlm_request *lockreq; - struct ldlm_reply *lockrep; - struct ldlm_lock *lock; - void *lvb_data = NULL; - u32 lvb_len = 0; - - LASSERT(rc >= 0); - /* Similarly, if we're going to replay this request, we don't want to - * actually get a lock, just perform the intent. - */ - if (req->rq_transno || req->rq_replay) { - lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ); - lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY); - } - - if (rc == ELDLM_LOCK_ABORTED) { - einfo->ei_mode = 0; - memset(lockh, 0, sizeof(*lockh)); - rc = 0; - } else { /* rc = 0 */ - lock = ldlm_handle2lock(lockh); - - /* If the server gave us back a different lock mode, we should - * fix up our variables. - */ - if (lock->l_req_mode != einfo->ei_mode) { - ldlm_lock_addref(lockh, lock->l_req_mode); - ldlm_lock_decref(lockh, einfo->ei_mode); - einfo->ei_mode = lock->l_req_mode; - } - LDLM_LOCK_PUT(lock); - } - - lockrep = req_capsule_server_get(pill, &RMF_DLM_REP); - - it->it_disposition = (int)lockrep->lock_policy_res1; - it->it_status = (int)lockrep->lock_policy_res2; - it->it_lock_mode = einfo->ei_mode; - it->it_lock_handle = lockh->cookie; - it->it_request = req; - - /* Technically speaking rq_transno must already be zero if - * it_status is in error, so the check is a bit redundant - */ - if ((!req->rq_transno || it->it_status < 0) && req->rq_replay) - mdc_clear_replay_flag(req, it->it_status); - - /* If we're doing an IT_OPEN which did not result in an actual - * successful open, then we need to remove the bit which saves - * this request for unconditional replay. - * - * It's important that we do this first! Otherwise we might exit the - * function without doing so, and try to replay a failed create - * (bug 3440) - */ - if (it->it_op & IT_OPEN && req->rq_replay && - (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0)) - mdc_clear_replay_flag(req, it->it_status); - - DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d", - it->it_op, it->it_disposition, it->it_status); - - /* We know what to expect, so we do any byte flipping required here */ - if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) { - struct mdt_body *body; - - body = req_capsule_server_get(pill, &RMF_MDT_BODY); - if (!body) { - CERROR("Can't swab mdt_body\n"); - return -EPROTO; - } - - if (it_disposition(it, DISP_OPEN_OPEN) && - !it_open_error(DISP_OPEN_OPEN, it)) { - /* - * If this is a successful OPEN request, we need to set - * replay handler and data early, so that if replay - * happens immediately after swabbing below, new reply - * is swabbed by that handler correctly. - */ - mdc_set_open_replay_data(NULL, NULL, it); - } - - if ((body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) { - void *eadata; - - mdc_update_max_ea_from_body(exp, body); - - /* - * The eadata is opaque; just check that it is there. - * Eventually, obd_unpackmd() will check the contents. - */ - eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD, - body->mbo_eadatasize); - if (!eadata) - return -EPROTO; - - /* save lvb data and length in case this is for layout - * lock - */ - lvb_data = eadata; - lvb_len = body->mbo_eadatasize; - - /* - * We save the reply LOV EA in case we have to replay a - * create for recovery. If we didn't allocate a large - * enough request buffer above we need to reallocate it - * here to hold the actual LOV EA. - * - * To not save LOV EA if request is not going to replay - * (for example error one). - */ - if ((it->it_op & IT_OPEN) && req->rq_replay) { - void *lmm; - - if (req_capsule_get_size(pill, &RMF_EADATA, - RCL_CLIENT) < - body->mbo_eadatasize) - mdc_realloc_openmsg(req, body); - else - req_capsule_shrink(pill, &RMF_EADATA, - body->mbo_eadatasize, - RCL_CLIENT); - - req_capsule_set_size(pill, &RMF_EADATA, - RCL_CLIENT, - body->mbo_eadatasize); - - lmm = req_capsule_client_get(pill, &RMF_EADATA); - if (lmm) - memcpy(lmm, eadata, body->mbo_eadatasize); - } - } - } else if (it->it_op & IT_LAYOUT) { - /* maybe the lock was granted right away and layout - * is packed into RMF_DLM_LVB of req - */ - lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER); - if (lvb_len > 0) { - lvb_data = req_capsule_server_sized_get(pill, - &RMF_DLM_LVB, - lvb_len); - if (!lvb_data) - return -EPROTO; - } - } - - /* fill in stripe data for layout lock */ - lock = ldlm_handle2lock(lockh); - if (lock && ldlm_has_layout(lock) && lvb_data) { - void *lmm; - - LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d", - ldlm_it2str(it->it_op), lvb_len); - - lmm = kvzalloc(lvb_len, GFP_NOFS); - if (!lmm) { - LDLM_LOCK_PUT(lock); - return -ENOMEM; - } - memcpy(lmm, lvb_data, lvb_len); - - /* install lvb_data */ - lock_res_and_lock(lock); - if (!lock->l_lvb_data) { - lock->l_lvb_type = LVB_T_LAYOUT; - lock->l_lvb_data = lmm; - lock->l_lvb_len = lvb_len; - lmm = NULL; - } - unlock_res_and_lock(lock); - if (lmm) - kvfree(lmm); - } - if (lock) - LDLM_LOCK_PUT(lock); - - return rc; -} - -/* We always reserve enough space in the reply packet for a stripe MD, because - * we don't know in advance the file type. - */ -int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, - const union ldlm_policy_data *policy, - struct lookup_intent *it, struct md_op_data *op_data, - struct lustre_handle *lockh, u64 extra_lock_flags) -{ - static const union ldlm_policy_data lookup_policy = { - .l_inodebits = { MDS_INODELOCK_LOOKUP } - }; - static const union ldlm_policy_data update_policy = { - .l_inodebits = { MDS_INODELOCK_UPDATE } - }; - static const union ldlm_policy_data layout_policy = { - .l_inodebits = { MDS_INODELOCK_LAYOUT } - }; - static const union ldlm_policy_data getxattr_policy = { - .l_inodebits = { MDS_INODELOCK_XATTR } - }; - struct obd_device *obddev = class_exp2obd(exp); - struct ptlrpc_request *req = NULL; - u64 flags, saved_flags = extra_lock_flags; - struct ldlm_res_id res_id; - int generation, resends = 0; - struct ldlm_reply *lockrep; - enum lvb_type lvb_type = LVB_T_NONE; - int rc; - - LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n", - einfo->ei_type); - fid_build_reg_res_name(&op_data->op_fid1, &res_id); - - if (it) { - LASSERT(!policy); - - saved_flags |= LDLM_FL_HAS_INTENT; - if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR)) - policy = &update_policy; - else if (it->it_op & IT_LAYOUT) - policy = &layout_policy; - else if (it->it_op & (IT_GETXATTR | IT_SETXATTR)) - policy = &getxattr_policy; - else - policy = &lookup_policy; - } - - generation = obddev->u.cli.cl_import->imp_generation; -resend: - flags = saved_flags; - if (!it) { - /* The only way right now is FLOCK. */ - LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n", - einfo->ei_type); - res_id.name[3] = LDLM_FLOCK; - } else if (it->it_op & IT_OPEN) { - req = mdc_intent_open_pack(exp, it, op_data); - } else if (it->it_op & IT_UNLINK) { - req = mdc_intent_unlink_pack(exp, it, op_data); - } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) { - req = mdc_intent_getattr_pack(exp, it, op_data); - } else if (it->it_op & IT_READDIR) { - req = mdc_enqueue_pack(exp, 0); - } else if (it->it_op & IT_LAYOUT) { - if (!imp_connect_lvb_type(class_exp2cliimp(exp))) - return -EOPNOTSUPP; - req = mdc_intent_layout_pack(exp, it, op_data); - lvb_type = LVB_T_LAYOUT; - } else if (it->it_op & IT_GETXATTR) { - req = mdc_intent_getxattr_pack(exp, it, op_data); - } else { - LBUG(); - return -EINVAL; - } - - if (IS_ERR(req)) - return PTR_ERR(req); - - if (resends) { - req->rq_generation_set = 1; - req->rq_import_generation = generation; - req->rq_sent = ktime_get_real_seconds() + resends; - } - - /* It is important to obtain modify RPC slot first (if applicable), so - * that threads that are waiting for a modify RPC slot are not polluting - * our rpcs in flight counter. - * We do not do flock request limiting, though - */ - if (it) { - mdc_get_mod_rpc_slot(req, it); - rc = obd_get_request_slot(&obddev->u.cli); - if (rc != 0) { - mdc_put_mod_rpc_slot(req, it); - mdc_clear_replay_flag(req, 0); - ptlrpc_req_finished(req); - return rc; - } - } - - rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL, - 0, lvb_type, lockh, 0); - if (!it) { - /* For flock requests we immediately return without further - * delay and let caller deal with the rest, since rest of - * this function metadata processing makes no sense for flock - * requests anyway. But in case of problem during comms with - * Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we - * can not rely on caller and this mainly for F_UNLCKs - * (explicits or automatically generated by Kernel to clean - * current FLocks upon exit) that can't be trashed - */ - if (((rc == -EINTR) || (rc == -ETIMEDOUT)) && - (einfo->ei_type == LDLM_FLOCK) && - (einfo->ei_mode == LCK_NL)) - goto resend; - return rc; - } - - obd_put_request_slot(&obddev->u.cli); - mdc_put_mod_rpc_slot(req, it); - - if (rc < 0) { - CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n", - obddev->obd_name, rc); - - mdc_clear_replay_flag(req, rc); - ptlrpc_req_finished(req); - return rc; - } - - lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP); - - lockrep->lock_policy_res2 = - ptlrpc_status_ntoh(lockrep->lock_policy_res2); - - /* - * Retry infinitely when the server returns -EINPROGRESS for the - * intent operation, when server returns -EINPROGRESS for acquiring - * intent lock, we'll retry in after_reply(). - */ - if (it->it_op && (int)lockrep->lock_policy_res2 == -EINPROGRESS) { - mdc_clear_replay_flag(req, rc); - ptlrpc_req_finished(req); - resends++; - - CDEBUG(D_HA, "%s: resend:%d op:%d " DFID "/" DFID "\n", - obddev->obd_name, resends, it->it_op, - PFID(&op_data->op_fid1), PFID(&op_data->op_fid2)); - - if (generation == obddev->u.cli.cl_import->imp_generation) { - goto resend; - } else { - CDEBUG(D_HA, "resend cross eviction\n"); - return -EIO; - } - } - - rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc); - if (rc < 0) { - if (lustre_handle_is_used(lockh)) { - ldlm_lock_decref(lockh, einfo->ei_mode); - memset(lockh, 0, sizeof(*lockh)); - } - ptlrpc_req_finished(req); - - it->it_lock_handle = 0; - it->it_lock_mode = 0; - it->it_request = NULL; - } - - return rc; -} - -static int mdc_finish_intent_lock(struct obd_export *exp, - struct ptlrpc_request *request, - struct md_op_data *op_data, - struct lookup_intent *it, - struct lustre_handle *lockh) -{ - struct lustre_handle old_lock; - struct mdt_body *mdt_body; - struct ldlm_lock *lock; - int rc; - - LASSERT(request != LP_POISON); - LASSERT(request->rq_repmsg != LP_POISON); - - if (it->it_op & IT_READDIR) - return 0; - - if (!it_disposition(it, DISP_IT_EXECD)) { - /* The server failed before it even started executing the - * intent, i.e. because it couldn't unpack the request. - */ - LASSERT(it->it_status != 0); - return it->it_status; - } - rc = it_open_error(DISP_IT_EXECD, it); - if (rc) - return rc; - - mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY); - LASSERT(mdt_body); /* mdc_enqueue checked */ - - rc = it_open_error(DISP_LOOKUP_EXECD, it); - if (rc) - return rc; - - /* keep requests around for the multiple phases of the call - * this shows the DISP_XX must guarantee we make it into the call - */ - if (!it_disposition(it, DISP_ENQ_CREATE_REF) && - it_disposition(it, DISP_OPEN_CREATE) && - !it_open_error(DISP_OPEN_CREATE, it)) { - it_set_disposition(it, DISP_ENQ_CREATE_REF); - ptlrpc_request_addref(request); /* balanced in ll_create_node */ - } - if (!it_disposition(it, DISP_ENQ_OPEN_REF) && - it_disposition(it, DISP_OPEN_OPEN) && - !it_open_error(DISP_OPEN_OPEN, it)) { - it_set_disposition(it, DISP_ENQ_OPEN_REF); - ptlrpc_request_addref(request); /* balanced in ll_file_open */ - /* BUG 11546 - eviction in the middle of open rpc processing */ - OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout); - } - - if (it->it_op & IT_CREAT) - /* XXX this belongs in ll_create_it */ - ; - else if (it->it_op == IT_OPEN) - LASSERT(!it_disposition(it, DISP_OPEN_CREATE)); - else - LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT)); - - /* If we already have a matching lock, then cancel the new - * one. We have to set the data here instead of in - * mdc_enqueue, because we need to use the child's inode as - * the l_ast_data to match, and that's not available until - * intent_finish has performed the iget().) - */ - lock = ldlm_handle2lock(lockh); - if (lock) { - union ldlm_policy_data policy = lock->l_policy_data; - - LDLM_DEBUG(lock, "matching against this"); - - LASSERTF(fid_res_name_eq(&mdt_body->mbo_fid1, - &lock->l_resource->lr_name), - "Lock res_id: " DLDLMRES ", fid: " DFID "\n", - PLDLMRES(lock->l_resource), PFID(&mdt_body->mbo_fid1)); - LDLM_LOCK_PUT(lock); - - memcpy(&old_lock, lockh, sizeof(*lockh)); - if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL, - LDLM_IBITS, &policy, LCK_NL, - &old_lock, 0)) { - ldlm_lock_decref_and_cancel(lockh, - it->it_lock_mode); - memcpy(lockh, &old_lock, sizeof(old_lock)); - it->it_lock_handle = lockh->cookie; - } - } - CDEBUG(D_DENTRY, - "D_IT dentry %.*s intent: %s status %d disp %x rc %d\n", - (int)op_data->op_namelen, op_data->op_name, - ldlm_it2str(it->it_op), it->it_status, it->it_disposition, rc); - return rc; -} - -int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it, - struct lu_fid *fid, __u64 *bits) -{ - /* We could just return 1 immediately, but since we should only - * be called in revalidate_it if we already have a lock, let's - * verify that. - */ - struct ldlm_res_id res_id; - struct lustre_handle lockh; - union ldlm_policy_data policy; - enum ldlm_mode mode; - - if (it->it_lock_handle) { - lockh.cookie = it->it_lock_handle; - mode = ldlm_revalidate_lock_handle(&lockh, bits); - } else { - fid_build_reg_res_name(fid, &res_id); - switch (it->it_op) { - case IT_GETATTR: - /* File attributes are held under multiple bits: - * nlink is under lookup lock, size and times are - * under UPDATE lock and recently we've also got - * a separate permissions lock for owner/group/acl that - * were protected by lookup lock before. - * Getattr must provide all of that information, - * so we need to ensure we have all of those locks. - * Unfortunately, if the bits are split across multiple - * locks, there's no easy way to match all of them here, - * so an extra RPC would be performed to fetch all - * of those bits at once for now. - */ - /* For new MDTs(> 2.4), UPDATE|PERM should be enough, - * but for old MDTs (< 2.4), permission is covered - * by LOOKUP lock, so it needs to match all bits here. - */ - policy.l_inodebits.bits = MDS_INODELOCK_UPDATE | - MDS_INODELOCK_LOOKUP | - MDS_INODELOCK_PERM; - break; - case IT_READDIR: - policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; - break; - case IT_LAYOUT: - policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT; - break; - default: - policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP; - break; - } - - mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid, - LDLM_IBITS, &policy, - LCK_CR | LCK_CW | LCK_PR | LCK_PW, - &lockh); - } - - if (mode) { - it->it_lock_handle = lockh.cookie; - it->it_lock_mode = mode; - } else { - it->it_lock_handle = 0; - it->it_lock_mode = 0; - } - - return !!mode; -} - -/* - * This long block is all about fixing up the lock and request state - * so that it is correct as of the moment _before_ the operation was - * applied; that way, the VFS will think that everything is normal and - * call Lustre's regular VFS methods. - * - * If we're performing a creation, that means that unless the creation - * failed with EEXIST, we should fake up a negative dentry. - * - * For everything else, we want the lookup to succeed. - * - * One additional note: if CREATE or OPEN succeeded, we add an extra - * reference to the request because we need to keep it around until - * ll_create/ll_open gets called. - * - * The server will return to us, in it_disposition, an indication of - * exactly what it_status refers to. - * - * If DISP_OPEN_OPEN is set, then it_status refers to the open() call, - * otherwise if DISP_OPEN_CREATE is set, then it_status is the - * creation failure mode. In either case, one of DISP_LOOKUP_NEG or - * DISP_LOOKUP_POS will be set, indicating whether the child lookup - * was successful. - * - * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the - * child lookup. - */ -int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, - struct lookup_intent *it, struct ptlrpc_request **reqp, - ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags) -{ - struct ldlm_enqueue_info einfo = { - .ei_type = LDLM_IBITS, - .ei_mode = it_to_lock_mode(it), - .ei_cb_bl = cb_blocking, - .ei_cb_cp = ldlm_completion_ast, - }; - struct lustre_handle lockh; - int rc = 0; - - LASSERT(it); - - CDEBUG(D_DLMTRACE, "(name: %.*s," DFID ") in obj " DFID - ", intent: %s flags %#Lo\n", (int)op_data->op_namelen, - op_data->op_name, PFID(&op_data->op_fid2), - PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), - it->it_flags); - - lockh.cookie = 0; - if (fid_is_sane(&op_data->op_fid2) && - (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) { - /* We could just return 1 immediately, but since we should only - * be called in revalidate_it if we already have a lock, let's - * verify that. - */ - it->it_lock_handle = 0; - rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL); - /* Only return failure if it was not GETATTR by cfid - * (from inode_revalidate) - */ - if (rc || op_data->op_namelen != 0) - return rc; - } - - /* For case if upper layer did not alloc fid, do it now. */ - if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) { - rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data); - if (rc < 0) { - CERROR("Can't alloc new fid, rc %d\n", rc); - return rc; - } - } - rc = mdc_enqueue(exp, &einfo, NULL, it, op_data, &lockh, - extra_lock_flags); - if (rc < 0) - return rc; - - *reqp = it->it_request; - rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh); - return rc; -} - -static int mdc_intent_getattr_async_interpret(const struct lu_env *env, - struct ptlrpc_request *req, - void *args, int rc) -{ - struct mdc_getattr_args *ga = args; - struct obd_export *exp = ga->ga_exp; - struct md_enqueue_info *minfo = ga->ga_minfo; - struct ldlm_enqueue_info *einfo = &minfo->mi_einfo; - struct lookup_intent *it; - struct lustre_handle *lockh; - struct obd_device *obddev; - struct ldlm_reply *lockrep; - __u64 flags = LDLM_FL_HAS_INTENT; - - it = &minfo->mi_it; - lockh = &minfo->mi_lockh; - - obddev = class_exp2obd(exp); - - obd_put_request_slot(&obddev->u.cli); - if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE)) - rc = -ETIMEDOUT; - - rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode, - &flags, NULL, 0, lockh, rc); - if (rc < 0) { - CERROR("ldlm_cli_enqueue_fini: %d\n", rc); - mdc_clear_replay_flag(req, rc); - goto out; - } - - lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP); - - lockrep->lock_policy_res2 = - ptlrpc_status_ntoh(lockrep->lock_policy_res2); - - rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc); - if (rc) - goto out; - - rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh); - -out: - minfo->mi_cb(req, minfo, rc); - return 0; -} - -int mdc_intent_getattr_async(struct obd_export *exp, - struct md_enqueue_info *minfo) -{ - struct md_op_data *op_data = &minfo->mi_data; - struct lookup_intent *it = &minfo->mi_it; - struct ptlrpc_request *req; - struct mdc_getattr_args *ga; - struct obd_device *obddev = class_exp2obd(exp); - struct ldlm_res_id res_id; - union ldlm_policy_data policy = { - .l_inodebits = { MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE } - }; - int rc = 0; - __u64 flags = LDLM_FL_HAS_INTENT; - - CDEBUG(D_DLMTRACE, - "name: %.*s in inode " DFID ", intent: %s flags %#Lo\n", - (int)op_data->op_namelen, op_data->op_name, - PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags); - - fid_build_reg_res_name(&op_data->op_fid1, &res_id); - req = mdc_intent_getattr_pack(exp, it, op_data); - if (IS_ERR(req)) - return PTR_ERR(req); - - rc = obd_get_request_slot(&obddev->u.cli); - if (rc != 0) { - ptlrpc_req_finished(req); - return rc; - } - - rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy, - &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1); - if (rc < 0) { - obd_put_request_slot(&obddev->u.cli); - ptlrpc_req_finished(req); - return rc; - } - - BUILD_BUG_ON(sizeof(*ga) > sizeof(req->rq_async_args)); - ga = ptlrpc_req_async_args(req); - ga->ga_exp = exp; - ga->ga_minfo = minfo; - - req->rq_interpret_reply = mdc_intent_getattr_async_interpret; - ptlrpcd_add_req(req); - - return 0; -} |