aboutsummaryrefslogtreecommitdiffstats
path: root/drivers/staging/lustre/lustre/osc/osc_io.c
diff options
context:
space:
mode:
Diffstat (limited to 'drivers/staging/lustre/lustre/osc/osc_io.c')
-rw-r--r--drivers/staging/lustre/lustre/osc/osc_io.c330
1 files changed, 179 insertions, 151 deletions
diff --git a/drivers/staging/lustre/lustre/osc/osc_io.c b/drivers/staging/lustre/lustre/osc/osc_io.c
index 8a559cbcdd0c..228a97c098fe 100644
--- a/drivers/staging/lustre/lustre/osc/osc_io.c
+++ b/drivers/staging/lustre/lustre/osc/osc_io.c
@@ -49,12 +49,6 @@
*
*/
-static struct osc_req *cl2osc_req(const struct cl_req_slice *slice)
-{
- LINVRNT(slice->crs_dev->cd_lu_dev.ld_type == &osc_device_type);
- return container_of0(slice, struct osc_req, or_cl);
-}
-
static struct osc_io *cl2osc_io(const struct lu_env *env,
const struct cl_io_slice *slice)
{
@@ -64,20 +58,6 @@ static struct osc_io *cl2osc_io(const struct lu_env *env,
return oio;
}
-static struct osc_page *osc_cl_page_osc(struct cl_page *page,
- struct osc_object *osc)
-{
- const struct cl_page_slice *slice;
-
- if (osc)
- slice = cl_object_page_slice(&osc->oo_cl, page);
- else
- slice = cl_page_at(page, &osc_device_type);
- LASSERT(slice);
-
- return cl2osc_page(slice);
-}
-
/*****************************************************************************
*
* io operations.
@@ -88,6 +68,45 @@ static void osc_io_fini(const struct lu_env *env, const struct cl_io_slice *io)
{
}
+static void osc_read_ahead_release(const struct lu_env *env, void *cbdata)
+{
+ struct ldlm_lock *dlmlock = cbdata;
+ struct lustre_handle lockh;
+
+ ldlm_lock2handle(dlmlock, &lockh);
+ ldlm_lock_decref(&lockh, LCK_PR);
+ LDLM_LOCK_PUT(dlmlock);
+}
+
+static int osc_io_read_ahead(const struct lu_env *env,
+ const struct cl_io_slice *ios,
+ pgoff_t start, struct cl_read_ahead *ra)
+{
+ struct osc_object *osc = cl2osc(ios->cis_obj);
+ struct ldlm_lock *dlmlock;
+ int result = -ENODATA;
+
+ dlmlock = osc_dlmlock_at_pgoff(env, osc, start, 0);
+ if (dlmlock) {
+ LASSERT(dlmlock->l_ast_data == osc);
+ if (dlmlock->l_req_mode != LCK_PR) {
+ struct lustre_handle lockh;
+
+ ldlm_lock2handle(dlmlock, &lockh);
+ ldlm_lock_addref(&lockh, LCK_PR);
+ ldlm_lock_decref(&lockh, dlmlock->l_req_mode);
+ }
+
+ ra->cra_end = cl_index(osc2cl(osc),
+ dlmlock->l_policy_data.l_extent.end);
+ ra->cra_release = osc_read_ahead_release;
+ ra->cra_cbdata = dlmlock;
+ result = 0;
+ }
+
+ return result;
+}
+
/**
* An implementation of cl_io_operations::cio_io_submit() method for osc
* layer. Iterates over pages in the in-queue, prepares each for io by calling
@@ -334,7 +353,7 @@ static int osc_io_rw_iter_init(const struct lu_env *env,
npages = max_pages;
c = atomic_long_read(cli->cl_lru_left);
- if (c < npages && osc_lru_reclaim(cli) > 0)
+ if (c < npages && osc_lru_reclaim(cli, npages) > 0)
c = atomic_long_read(cli->cl_lru_left);
while (c >= npages) {
if (c == atomic_long_cmpxchg(cli->cl_lru_left, c, c - npages)) {
@@ -343,6 +362,17 @@ static int osc_io_rw_iter_init(const struct lu_env *env,
}
c = atomic_long_read(cli->cl_lru_left);
}
+ if (atomic_long_read(cli->cl_lru_left) < max_pages) {
+ /*
+ * If there aren't enough pages in the per-OSC LRU then
+ * wake up the LRU thread to try and clear out space, so
+ * we don't block if pages are being dirtied quickly.
+ */
+ CDEBUG(D_CACHE, "%s: queue LRU, left: %lu/%ld.\n",
+ cli_name(cli), atomic_long_read(cli->cl_lru_left),
+ max_pages);
+ (void)ptlrpcd_queue_work(cli->cl_lru_work);
+ }
return 0;
}
@@ -446,7 +476,6 @@ static int osc_io_setattr_start(const struct lu_env *env,
__u64 size = io->u.ci_setattr.sa_attr.lvb_size;
unsigned int ia_valid = io->u.ci_setattr.sa_valid;
int result = 0;
- struct obd_info oinfo = { };
/* truncate cache dirty pages first */
if (cl_io_is_trunc(io))
@@ -486,11 +515,19 @@ static int osc_io_setattr_start(const struct lu_env *env,
oa->o_oi = loi->loi_oi;
obdo_set_parent_fid(oa, io->u.ci_setattr.sa_parent_fid);
oa->o_stripe_idx = io->u.ci_setattr.sa_stripe_index;
- oa->o_mtime = attr->cat_mtime;
- oa->o_atime = attr->cat_atime;
- oa->o_ctime = attr->cat_ctime;
- oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLATIME |
- OBD_MD_FLCTIME | OBD_MD_FLMTIME;
+ oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
+ if (ia_valid & ATTR_CTIME) {
+ oa->o_valid |= OBD_MD_FLCTIME;
+ oa->o_ctime = attr->cat_ctime;
+ }
+ if (ia_valid & ATTR_ATIME) {
+ oa->o_valid |= OBD_MD_FLATIME;
+ oa->o_atime = attr->cat_atime;
+ }
+ if (ia_valid & ATTR_MTIME) {
+ oa->o_valid |= OBD_MD_FLMTIME;
+ oa->o_mtime = attr->cat_mtime;
+ }
if (ia_valid & ATTR_SIZE) {
oa->o_size = size;
oa->o_blocks = OBD_OBJECT_EOF;
@@ -503,19 +540,21 @@ static int osc_io_setattr_start(const struct lu_env *env,
} else {
LASSERT(oio->oi_lockless == 0);
}
+ if (ia_valid & ATTR_ATTR_FLAG) {
+ oa->o_flags = io->u.ci_setattr.sa_attr_flags;
+ oa->o_valid |= OBD_MD_FLFLAGS;
+ }
- oinfo.oi_oa = oa;
init_completion(&cbargs->opc_sync);
if (ia_valid & ATTR_SIZE)
result = osc_punch_base(osc_export(cl2osc(obj)),
- &oinfo, osc_async_upcall,
+ oa, osc_async_upcall,
cbargs, PTLRPCD_SET);
else
- result = osc_setattr_async_base(osc_export(cl2osc(obj)),
- &oinfo, NULL,
- osc_async_upcall,
- cbargs, PTLRPCD_SET);
+ result = osc_setattr_async(osc_export(cl2osc(obj)),
+ oa, osc_async_upcall,
+ cbargs, PTLRPCD_SET);
cbargs->opc_rpc_sent = result == 0;
}
return result;
@@ -557,6 +596,107 @@ static void osc_io_setattr_end(const struct lu_env *env,
}
}
+struct osc_data_version_args {
+ struct osc_io *dva_oio;
+};
+
+static int
+osc_data_version_interpret(const struct lu_env *env, struct ptlrpc_request *req,
+ void *arg, int rc)
+{
+ struct osc_data_version_args *dva = arg;
+ struct osc_io *oio = dva->dva_oio;
+ const struct ost_body *body;
+
+ if (rc < 0)
+ goto out;
+
+ body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+ if (!body) {
+ rc = -EPROTO;
+ goto out;
+ }
+
+ lustre_get_wire_obdo(&req->rq_import->imp_connect_data, &oio->oi_oa,
+ &body->oa);
+out:
+ oio->oi_cbarg.opc_rc = rc;
+ complete(&oio->oi_cbarg.opc_sync);
+
+ return 0;
+}
+
+static int osc_io_data_version_start(const struct lu_env *env,
+ const struct cl_io_slice *slice)
+{
+ struct cl_data_version_io *dv = &slice->cis_io->u.ci_data_version;
+ struct osc_io *oio = cl2osc_io(env, slice);
+ struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
+ struct osc_object *obj = cl2osc(slice->cis_obj);
+ struct obd_export *exp = osc_export(obj);
+ struct lov_oinfo *loi = obj->oo_oinfo;
+ struct osc_data_version_args *dva;
+ struct obdo *oa = &oio->oi_oa;
+ struct ptlrpc_request *req;
+ struct ost_body *body;
+ int rc;
+
+ memset(oa, 0, sizeof(*oa));
+ oa->o_oi = loi->loi_oi;
+ oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
+
+ if (dv->dv_flags & (LL_DV_RD_FLUSH | LL_DV_WR_FLUSH)) {
+ oa->o_valid |= OBD_MD_FLFLAGS;
+ oa->o_flags |= OBD_FL_SRVLOCK;
+ if (dv->dv_flags & LL_DV_WR_FLUSH)
+ oa->o_flags |= OBD_FL_FLUSH;
+ }
+
+ init_completion(&cbargs->opc_sync);
+
+ req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
+ if (!req)
+ return -ENOMEM;
+
+ rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
+ if (rc < 0) {
+ ptlrpc_request_free(req);
+ return rc;
+ }
+
+ body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+ lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
+
+ ptlrpc_request_set_replen(req);
+ req->rq_interpret_reply = osc_data_version_interpret;
+ CLASSERT(sizeof(*dva) <= sizeof(req->rq_async_args));
+ dva = ptlrpc_req_async_args(req);
+ dva->dva_oio = oio;
+
+ ptlrpcd_add_req(req);
+
+ return 0;
+}
+
+static void osc_io_data_version_end(const struct lu_env *env,
+ const struct cl_io_slice *slice)
+{
+ struct cl_data_version_io *dv = &slice->cis_io->u.ci_data_version;
+ struct osc_io *oio = cl2osc_io(env, slice);
+ struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
+
+ wait_for_completion(&cbargs->opc_sync);
+
+ if (cbargs->opc_rc) {
+ slice->cis_io->ci_result = cbargs->opc_rc;
+ } else if (!(oio->oi_oa.o_valid & OBD_MD_FLDATAVERSION)) {
+ slice->cis_io->ci_result = -EOPNOTSUPP;
+ } else {
+ dv->dv_data_version = oio->oi_oa.o_data_version;
+ slice->cis_io->ci_result = 0;
+ }
+}
+
static int osc_io_read_start(const struct lu_env *env,
const struct cl_io_slice *slice)
{
@@ -595,7 +735,6 @@ static int osc_fsync_ost(const struct lu_env *env, struct osc_object *obj,
{
struct osc_io *oio = osc_env_io(env);
struct obdo *oa = &oio->oi_oa;
- struct obd_info *oinfo = &oio->oi_info;
struct lov_oinfo *loi = obj->oo_oinfo;
struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
int rc = 0;
@@ -611,12 +750,9 @@ static int osc_fsync_ost(const struct lu_env *env, struct osc_object *obj,
obdo_set_parent_fid(oa, fio->fi_fid);
- memset(oinfo, 0, sizeof(*oinfo));
- oinfo->oi_oa = oa;
init_completion(&cbargs->opc_sync);
- rc = osc_sync_base(osc_export(obj), oinfo, osc_async_upcall, cbargs,
- PTLRPCD_SET);
+ rc = osc_sync_base(obj, oa, osc_async_upcall, cbargs, PTLRPCD_SET);
return rc;
}
@@ -710,6 +846,10 @@ static const struct cl_io_operations osc_io_ops = {
.cio_start = osc_io_setattr_start,
.cio_end = osc_io_setattr_end
},
+ [CIT_DATA_VERSION] = {
+ .cio_start = osc_io_data_version_start,
+ .cio_end = osc_io_data_version_end,
+ },
[CIT_FAULT] = {
.cio_start = osc_io_fault_start,
.cio_end = osc_io_end,
@@ -724,6 +864,7 @@ static const struct cl_io_operations osc_io_ops = {
.cio_fini = osc_io_fini
}
},
+ .cio_read_ahead = osc_io_read_ahead,
.cio_submit = osc_io_submit,
.cio_commit_async = osc_io_commit_async
};
@@ -734,103 +875,6 @@ static const struct cl_io_operations osc_io_ops = {
*
*/
-static int osc_req_prep(const struct lu_env *env,
- const struct cl_req_slice *slice)
-{
- return 0;
-}
-
-static void osc_req_completion(const struct lu_env *env,
- const struct cl_req_slice *slice, int ioret)
-{
- struct osc_req *or;
-
- or = cl2osc_req(slice);
- kmem_cache_free(osc_req_kmem, or);
-}
-
-/**
- * Implementation of struct cl_req_operations::cro_attr_set() for osc
- * layer. osc is responsible for struct obdo::o_id and struct obdo::o_seq
- * fields.
- */
-static void osc_req_attr_set(const struct lu_env *env,
- const struct cl_req_slice *slice,
- const struct cl_object *obj,
- struct cl_req_attr *attr, u64 flags)
-{
- struct lov_oinfo *oinfo;
- struct cl_req *clerq;
- struct cl_page *apage; /* _some_ page in @clerq */
- struct ldlm_lock *lock; /* _some_ lock protecting @apage */
- struct osc_page *opg;
- struct obdo *oa;
- struct ost_lvb *lvb;
-
- oinfo = cl2osc(obj)->oo_oinfo;
- lvb = &oinfo->loi_lvb;
- oa = attr->cra_oa;
-
- if ((flags & OBD_MD_FLMTIME) != 0) {
- oa->o_mtime = lvb->lvb_mtime;
- oa->o_valid |= OBD_MD_FLMTIME;
- }
- if ((flags & OBD_MD_FLATIME) != 0) {
- oa->o_atime = lvb->lvb_atime;
- oa->o_valid |= OBD_MD_FLATIME;
- }
- if ((flags & OBD_MD_FLCTIME) != 0) {
- oa->o_ctime = lvb->lvb_ctime;
- oa->o_valid |= OBD_MD_FLCTIME;
- }
- if (flags & OBD_MD_FLGROUP) {
- ostid_set_seq(&oa->o_oi, ostid_seq(&oinfo->loi_oi));
- oa->o_valid |= OBD_MD_FLGROUP;
- }
- if (flags & OBD_MD_FLID) {
- ostid_set_id(&oa->o_oi, ostid_id(&oinfo->loi_oi));
- oa->o_valid |= OBD_MD_FLID;
- }
- if (flags & OBD_MD_FLHANDLE) {
- clerq = slice->crs_req;
- LASSERT(!list_empty(&clerq->crq_pages));
- apage = container_of(clerq->crq_pages.next,
- struct cl_page, cp_flight);
- opg = osc_cl_page_osc(apage, NULL);
- lock = osc_dlmlock_at_pgoff(env, cl2osc(obj), osc_index(opg),
- 1, 1);
- if (!lock && !opg->ops_srvlock) {
- struct ldlm_resource *res;
- struct ldlm_res_id *resname;
-
- CL_PAGE_DEBUG(D_ERROR, env, apage, "uncovered page!\n");
-
- resname = &osc_env_info(env)->oti_resname;
- ostid_build_res_name(&oinfo->loi_oi, resname);
- res = ldlm_resource_get(
- osc_export(cl2osc(obj))->exp_obd->obd_namespace,
- NULL, resname, LDLM_EXTENT, 0);
- ldlm_resource_dump(D_ERROR, res);
-
- dump_stack();
- LBUG();
- }
-
- /* check for lockless io. */
- if (lock) {
- oa->o_handle = lock->l_remote_handle;
- oa->o_valid |= OBD_MD_FLHANDLE;
- LDLM_LOCK_PUT(lock);
- }
- }
-}
-
-static const struct cl_req_operations osc_req_ops = {
- .cro_prep = osc_req_prep,
- .cro_attr_set = osc_req_attr_set,
- .cro_completion = osc_req_completion
-};
-
int osc_io_init(const struct lu_env *env,
struct cl_object *obj, struct cl_io *io)
{
@@ -841,20 +885,4 @@ int osc_io_init(const struct lu_env *env,
return 0;
}
-int osc_req_init(const struct lu_env *env, struct cl_device *dev,
- struct cl_req *req)
-{
- struct osc_req *or;
- int result;
-
- or = kmem_cache_zalloc(osc_req_kmem, GFP_NOFS);
- if (or) {
- cl_req_slice_add(req, &or->or_cl, dev, &osc_req_ops);
- result = 0;
- } else {
- result = -ENOMEM;
- }
- return result;
-}
-
/** @} osc */