diff options
Diffstat (limited to 'drivers/staging/lustre/lustre/osc/osc_io.c')
-rw-r--r-- | drivers/staging/lustre/lustre/osc/osc_io.c | 330 |
1 files changed, 179 insertions, 151 deletions
diff --git a/drivers/staging/lustre/lustre/osc/osc_io.c b/drivers/staging/lustre/lustre/osc/osc_io.c index 8a559cbcdd0c..228a97c098fe 100644 --- a/drivers/staging/lustre/lustre/osc/osc_io.c +++ b/drivers/staging/lustre/lustre/osc/osc_io.c @@ -49,12 +49,6 @@ * */ -static struct osc_req *cl2osc_req(const struct cl_req_slice *slice) -{ - LINVRNT(slice->crs_dev->cd_lu_dev.ld_type == &osc_device_type); - return container_of0(slice, struct osc_req, or_cl); -} - static struct osc_io *cl2osc_io(const struct lu_env *env, const struct cl_io_slice *slice) { @@ -64,20 +58,6 @@ static struct osc_io *cl2osc_io(const struct lu_env *env, return oio; } -static struct osc_page *osc_cl_page_osc(struct cl_page *page, - struct osc_object *osc) -{ - const struct cl_page_slice *slice; - - if (osc) - slice = cl_object_page_slice(&osc->oo_cl, page); - else - slice = cl_page_at(page, &osc_device_type); - LASSERT(slice); - - return cl2osc_page(slice); -} - /***************************************************************************** * * io operations. @@ -88,6 +68,45 @@ static void osc_io_fini(const struct lu_env *env, const struct cl_io_slice *io) { } +static void osc_read_ahead_release(const struct lu_env *env, void *cbdata) +{ + struct ldlm_lock *dlmlock = cbdata; + struct lustre_handle lockh; + + ldlm_lock2handle(dlmlock, &lockh); + ldlm_lock_decref(&lockh, LCK_PR); + LDLM_LOCK_PUT(dlmlock); +} + +static int osc_io_read_ahead(const struct lu_env *env, + const struct cl_io_slice *ios, + pgoff_t start, struct cl_read_ahead *ra) +{ + struct osc_object *osc = cl2osc(ios->cis_obj); + struct ldlm_lock *dlmlock; + int result = -ENODATA; + + dlmlock = osc_dlmlock_at_pgoff(env, osc, start, 0); + if (dlmlock) { + LASSERT(dlmlock->l_ast_data == osc); + if (dlmlock->l_req_mode != LCK_PR) { + struct lustre_handle lockh; + + ldlm_lock2handle(dlmlock, &lockh); + ldlm_lock_addref(&lockh, LCK_PR); + ldlm_lock_decref(&lockh, dlmlock->l_req_mode); + } + + ra->cra_end = cl_index(osc2cl(osc), + dlmlock->l_policy_data.l_extent.end); + ra->cra_release = osc_read_ahead_release; + ra->cra_cbdata = dlmlock; + result = 0; + } + + return result; +} + /** * An implementation of cl_io_operations::cio_io_submit() method for osc * layer. Iterates over pages in the in-queue, prepares each for io by calling @@ -334,7 +353,7 @@ static int osc_io_rw_iter_init(const struct lu_env *env, npages = max_pages; c = atomic_long_read(cli->cl_lru_left); - if (c < npages && osc_lru_reclaim(cli) > 0) + if (c < npages && osc_lru_reclaim(cli, npages) > 0) c = atomic_long_read(cli->cl_lru_left); while (c >= npages) { if (c == atomic_long_cmpxchg(cli->cl_lru_left, c, c - npages)) { @@ -343,6 +362,17 @@ static int osc_io_rw_iter_init(const struct lu_env *env, } c = atomic_long_read(cli->cl_lru_left); } + if (atomic_long_read(cli->cl_lru_left) < max_pages) { + /* + * If there aren't enough pages in the per-OSC LRU then + * wake up the LRU thread to try and clear out space, so + * we don't block if pages are being dirtied quickly. + */ + CDEBUG(D_CACHE, "%s: queue LRU, left: %lu/%ld.\n", + cli_name(cli), atomic_long_read(cli->cl_lru_left), + max_pages); + (void)ptlrpcd_queue_work(cli->cl_lru_work); + } return 0; } @@ -446,7 +476,6 @@ static int osc_io_setattr_start(const struct lu_env *env, __u64 size = io->u.ci_setattr.sa_attr.lvb_size; unsigned int ia_valid = io->u.ci_setattr.sa_valid; int result = 0; - struct obd_info oinfo = { }; /* truncate cache dirty pages first */ if (cl_io_is_trunc(io)) @@ -486,11 +515,19 @@ static int osc_io_setattr_start(const struct lu_env *env, oa->o_oi = loi->loi_oi; obdo_set_parent_fid(oa, io->u.ci_setattr.sa_parent_fid); oa->o_stripe_idx = io->u.ci_setattr.sa_stripe_index; - oa->o_mtime = attr->cat_mtime; - oa->o_atime = attr->cat_atime; - oa->o_ctime = attr->cat_ctime; - oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLATIME | - OBD_MD_FLCTIME | OBD_MD_FLMTIME; + oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP; + if (ia_valid & ATTR_CTIME) { + oa->o_valid |= OBD_MD_FLCTIME; + oa->o_ctime = attr->cat_ctime; + } + if (ia_valid & ATTR_ATIME) { + oa->o_valid |= OBD_MD_FLATIME; + oa->o_atime = attr->cat_atime; + } + if (ia_valid & ATTR_MTIME) { + oa->o_valid |= OBD_MD_FLMTIME; + oa->o_mtime = attr->cat_mtime; + } if (ia_valid & ATTR_SIZE) { oa->o_size = size; oa->o_blocks = OBD_OBJECT_EOF; @@ -503,19 +540,21 @@ static int osc_io_setattr_start(const struct lu_env *env, } else { LASSERT(oio->oi_lockless == 0); } + if (ia_valid & ATTR_ATTR_FLAG) { + oa->o_flags = io->u.ci_setattr.sa_attr_flags; + oa->o_valid |= OBD_MD_FLFLAGS; + } - oinfo.oi_oa = oa; init_completion(&cbargs->opc_sync); if (ia_valid & ATTR_SIZE) result = osc_punch_base(osc_export(cl2osc(obj)), - &oinfo, osc_async_upcall, + oa, osc_async_upcall, cbargs, PTLRPCD_SET); else - result = osc_setattr_async_base(osc_export(cl2osc(obj)), - &oinfo, NULL, - osc_async_upcall, - cbargs, PTLRPCD_SET); + result = osc_setattr_async(osc_export(cl2osc(obj)), + oa, osc_async_upcall, + cbargs, PTLRPCD_SET); cbargs->opc_rpc_sent = result == 0; } return result; @@ -557,6 +596,107 @@ static void osc_io_setattr_end(const struct lu_env *env, } } +struct osc_data_version_args { + struct osc_io *dva_oio; +}; + +static int +osc_data_version_interpret(const struct lu_env *env, struct ptlrpc_request *req, + void *arg, int rc) +{ + struct osc_data_version_args *dva = arg; + struct osc_io *oio = dva->dva_oio; + const struct ost_body *body; + + if (rc < 0) + goto out; + + body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + if (!body) { + rc = -EPROTO; + goto out; + } + + lustre_get_wire_obdo(&req->rq_import->imp_connect_data, &oio->oi_oa, + &body->oa); +out: + oio->oi_cbarg.opc_rc = rc; + complete(&oio->oi_cbarg.opc_sync); + + return 0; +} + +static int osc_io_data_version_start(const struct lu_env *env, + const struct cl_io_slice *slice) +{ + struct cl_data_version_io *dv = &slice->cis_io->u.ci_data_version; + struct osc_io *oio = cl2osc_io(env, slice); + struct osc_async_cbargs *cbargs = &oio->oi_cbarg; + struct osc_object *obj = cl2osc(slice->cis_obj); + struct obd_export *exp = osc_export(obj); + struct lov_oinfo *loi = obj->oo_oinfo; + struct osc_data_version_args *dva; + struct obdo *oa = &oio->oi_oa; + struct ptlrpc_request *req; + struct ost_body *body; + int rc; + + memset(oa, 0, sizeof(*oa)); + oa->o_oi = loi->loi_oi; + oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP; + + if (dv->dv_flags & (LL_DV_RD_FLUSH | LL_DV_WR_FLUSH)) { + oa->o_valid |= OBD_MD_FLFLAGS; + oa->o_flags |= OBD_FL_SRVLOCK; + if (dv->dv_flags & LL_DV_WR_FLUSH) + oa->o_flags |= OBD_FL_FLUSH; + } + + init_completion(&cbargs->opc_sync); + + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR); + if (!req) + return -ENOMEM; + + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR); + if (rc < 0) { + ptlrpc_request_free(req); + return rc; + } + + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa); + + ptlrpc_request_set_replen(req); + req->rq_interpret_reply = osc_data_version_interpret; + CLASSERT(sizeof(*dva) <= sizeof(req->rq_async_args)); + dva = ptlrpc_req_async_args(req); + dva->dva_oio = oio; + + ptlrpcd_add_req(req); + + return 0; +} + +static void osc_io_data_version_end(const struct lu_env *env, + const struct cl_io_slice *slice) +{ + struct cl_data_version_io *dv = &slice->cis_io->u.ci_data_version; + struct osc_io *oio = cl2osc_io(env, slice); + struct osc_async_cbargs *cbargs = &oio->oi_cbarg; + + wait_for_completion(&cbargs->opc_sync); + + if (cbargs->opc_rc) { + slice->cis_io->ci_result = cbargs->opc_rc; + } else if (!(oio->oi_oa.o_valid & OBD_MD_FLDATAVERSION)) { + slice->cis_io->ci_result = -EOPNOTSUPP; + } else { + dv->dv_data_version = oio->oi_oa.o_data_version; + slice->cis_io->ci_result = 0; + } +} + static int osc_io_read_start(const struct lu_env *env, const struct cl_io_slice *slice) { @@ -595,7 +735,6 @@ static int osc_fsync_ost(const struct lu_env *env, struct osc_object *obj, { struct osc_io *oio = osc_env_io(env); struct obdo *oa = &oio->oi_oa; - struct obd_info *oinfo = &oio->oi_info; struct lov_oinfo *loi = obj->oo_oinfo; struct osc_async_cbargs *cbargs = &oio->oi_cbarg; int rc = 0; @@ -611,12 +750,9 @@ static int osc_fsync_ost(const struct lu_env *env, struct osc_object *obj, obdo_set_parent_fid(oa, fio->fi_fid); - memset(oinfo, 0, sizeof(*oinfo)); - oinfo->oi_oa = oa; init_completion(&cbargs->opc_sync); - rc = osc_sync_base(osc_export(obj), oinfo, osc_async_upcall, cbargs, - PTLRPCD_SET); + rc = osc_sync_base(obj, oa, osc_async_upcall, cbargs, PTLRPCD_SET); return rc; } @@ -710,6 +846,10 @@ static const struct cl_io_operations osc_io_ops = { .cio_start = osc_io_setattr_start, .cio_end = osc_io_setattr_end }, + [CIT_DATA_VERSION] = { + .cio_start = osc_io_data_version_start, + .cio_end = osc_io_data_version_end, + }, [CIT_FAULT] = { .cio_start = osc_io_fault_start, .cio_end = osc_io_end, @@ -724,6 +864,7 @@ static const struct cl_io_operations osc_io_ops = { .cio_fini = osc_io_fini } }, + .cio_read_ahead = osc_io_read_ahead, .cio_submit = osc_io_submit, .cio_commit_async = osc_io_commit_async }; @@ -734,103 +875,6 @@ static const struct cl_io_operations osc_io_ops = { * */ -static int osc_req_prep(const struct lu_env *env, - const struct cl_req_slice *slice) -{ - return 0; -} - -static void osc_req_completion(const struct lu_env *env, - const struct cl_req_slice *slice, int ioret) -{ - struct osc_req *or; - - or = cl2osc_req(slice); - kmem_cache_free(osc_req_kmem, or); -} - -/** - * Implementation of struct cl_req_operations::cro_attr_set() for osc - * layer. osc is responsible for struct obdo::o_id and struct obdo::o_seq - * fields. - */ -static void osc_req_attr_set(const struct lu_env *env, - const struct cl_req_slice *slice, - const struct cl_object *obj, - struct cl_req_attr *attr, u64 flags) -{ - struct lov_oinfo *oinfo; - struct cl_req *clerq; - struct cl_page *apage; /* _some_ page in @clerq */ - struct ldlm_lock *lock; /* _some_ lock protecting @apage */ - struct osc_page *opg; - struct obdo *oa; - struct ost_lvb *lvb; - - oinfo = cl2osc(obj)->oo_oinfo; - lvb = &oinfo->loi_lvb; - oa = attr->cra_oa; - - if ((flags & OBD_MD_FLMTIME) != 0) { - oa->o_mtime = lvb->lvb_mtime; - oa->o_valid |= OBD_MD_FLMTIME; - } - if ((flags & OBD_MD_FLATIME) != 0) { - oa->o_atime = lvb->lvb_atime; - oa->o_valid |= OBD_MD_FLATIME; - } - if ((flags & OBD_MD_FLCTIME) != 0) { - oa->o_ctime = lvb->lvb_ctime; - oa->o_valid |= OBD_MD_FLCTIME; - } - if (flags & OBD_MD_FLGROUP) { - ostid_set_seq(&oa->o_oi, ostid_seq(&oinfo->loi_oi)); - oa->o_valid |= OBD_MD_FLGROUP; - } - if (flags & OBD_MD_FLID) { - ostid_set_id(&oa->o_oi, ostid_id(&oinfo->loi_oi)); - oa->o_valid |= OBD_MD_FLID; - } - if (flags & OBD_MD_FLHANDLE) { - clerq = slice->crs_req; - LASSERT(!list_empty(&clerq->crq_pages)); - apage = container_of(clerq->crq_pages.next, - struct cl_page, cp_flight); - opg = osc_cl_page_osc(apage, NULL); - lock = osc_dlmlock_at_pgoff(env, cl2osc(obj), osc_index(opg), - 1, 1); - if (!lock && !opg->ops_srvlock) { - struct ldlm_resource *res; - struct ldlm_res_id *resname; - - CL_PAGE_DEBUG(D_ERROR, env, apage, "uncovered page!\n"); - - resname = &osc_env_info(env)->oti_resname; - ostid_build_res_name(&oinfo->loi_oi, resname); - res = ldlm_resource_get( - osc_export(cl2osc(obj))->exp_obd->obd_namespace, - NULL, resname, LDLM_EXTENT, 0); - ldlm_resource_dump(D_ERROR, res); - - dump_stack(); - LBUG(); - } - - /* check for lockless io. */ - if (lock) { - oa->o_handle = lock->l_remote_handle; - oa->o_valid |= OBD_MD_FLHANDLE; - LDLM_LOCK_PUT(lock); - } - } -} - -static const struct cl_req_operations osc_req_ops = { - .cro_prep = osc_req_prep, - .cro_attr_set = osc_req_attr_set, - .cro_completion = osc_req_completion -}; - int osc_io_init(const struct lu_env *env, struct cl_object *obj, struct cl_io *io) { @@ -841,20 +885,4 @@ int osc_io_init(const struct lu_env *env, return 0; } -int osc_req_init(const struct lu_env *env, struct cl_device *dev, - struct cl_req *req) -{ - struct osc_req *or; - int result; - - or = kmem_cache_zalloc(osc_req_kmem, GFP_NOFS); - if (or) { - cl_req_slice_add(req, &or->or_cl, dev, &osc_req_ops); - result = 0; - } else { - result = -ENOMEM; - } - return result; -} - /** @} osc */ |