aboutsummaryrefslogtreecommitdiffstats
path: root/drivers/staging/lustre/lustre/osc/osc_io.c
diff options
context:
space:
mode:
Diffstat (limited to 'drivers/staging/lustre/lustre/osc/osc_io.c')
-rw-r--r--drivers/staging/lustre/lustre/osc/osc_io.c283
1 files changed, 160 insertions, 123 deletions
diff --git a/drivers/staging/lustre/lustre/osc/osc_io.c b/drivers/staging/lustre/lustre/osc/osc_io.c
index 6bd0a45d8b06..d534b0e0edf6 100644
--- a/drivers/staging/lustre/lustre/osc/osc_io.c
+++ b/drivers/staging/lustre/lustre/osc/osc_io.c
@@ -68,11 +68,15 @@ static struct osc_io *cl2osc_io(const struct lu_env *env,
return oio;
}
-static struct osc_page *osc_cl_page_osc(struct cl_page *page)
+static struct osc_page *osc_cl_page_osc(struct cl_page *page,
+ struct osc_object *osc)
{
const struct cl_page_slice *slice;
- slice = cl_page_at(page, &osc_device_type);
+ if (osc)
+ slice = cl_object_page_slice(&osc->oo_cl, page);
+ else
+ slice = cl_page_at(page, &osc_device_type);
LASSERT(slice);
return cl2osc_page(slice);
@@ -137,7 +141,7 @@ static int osc_io_submit(const struct lu_env *env,
io = page->cp_owner;
LASSERT(io);
- opg = osc_cl_page_osc(page);
+ opg = osc_cl_page_osc(page, osc);
oap = &opg->ops_oap;
LASSERT(osc == oap->oap_obj);
@@ -164,8 +168,10 @@ static int osc_io_submit(const struct lu_env *env,
}
cl_page_list_move(qout, qin, page);
+ spin_lock(&oap->oap_lock);
oap->oap_async_flags = ASYNC_URGENT|ASYNC_READY;
oap->oap_async_flags |= ASYNC_COUNT_STABLE;
+ spin_unlock(&oap->oap_lock);
osc_page_submit(env, opg, crt, brw_flags);
list_add_tail(&oap->oap_pending_item, &list);
@@ -185,6 +191,13 @@ static int osc_io_submit(const struct lu_env *env,
return qout->pl_nr > 0 ? 0 : result;
}
+/**
+ * This is called when a page is accessed within file in a way that creates
+ * new page, if one were missing (i.e., if there were a hole at that place in
+ * the file, or accessed page is beyond the current file size).
+ *
+ * Expand stripe KMS if necessary.
+ */
static void osc_page_touch_at(const struct lu_env *env,
struct cl_object *obj, pgoff_t idx, unsigned to)
{
@@ -208,7 +221,8 @@ static void osc_page_touch_at(const struct lu_env *env,
kms > loi->loi_kms ? "" : "not ", loi->loi_kms, kms,
loi->loi_lvb.lvb_size);
- valid = 0;
+ attr->cat_mtime = attr->cat_ctime = LTIME_S(CURRENT_TIME);
+ valid = CAT_MTIME | CAT_CTIME;
if (kms > loi->loi_kms) {
attr->cat_kms = kms;
valid |= CAT_KMS;
@@ -221,91 +235,128 @@ static void osc_page_touch_at(const struct lu_env *env,
cl_object_attr_unlock(obj);
}
-/**
- * This is called when a page is accessed within file in a way that creates
- * new page, if one were missing (i.e., if there were a hole at that place in
- * the file, or accessed page is beyond the current file size). Examples:
- * ->commit_write() and ->nopage() methods.
- *
- * Expand stripe KMS if necessary.
- */
-static void osc_page_touch(const struct lu_env *env,
- struct osc_page *opage, unsigned to)
-{
- struct cl_page *page = opage->ops_cl.cpl_page;
- struct cl_object *obj = opage->ops_cl.cpl_obj;
-
- osc_page_touch_at(env, obj, page->cp_index, to);
-}
-
-/**
- * Implements cl_io_operations::cio_prepare_write() method for osc layer.
- *
- * \retval -EIO transfer initiated against this osc will most likely fail
- * \retval 0 transfer initiated against this osc will most likely succeed.
- *
- * The reason for this check is to immediately return an error to the caller
- * in the case of a deactivated import. Note, that import can be deactivated
- * later, while pages, dirtied by this IO, are still in the cache, but this is
- * irrelevant, because that would still return an error to the application (if
- * it does fsync), but many applications don't do fsync because of performance
- * issues, and we wanted to return an -EIO at write time to notify the
- * application.
- */
-static int osc_io_prepare_write(const struct lu_env *env,
- const struct cl_io_slice *ios,
- const struct cl_page_slice *slice,
- unsigned from, unsigned to)
+static int osc_io_commit_async(const struct lu_env *env,
+ const struct cl_io_slice *ios,
+ struct cl_page_list *qin, int from, int to,
+ cl_commit_cbt cb)
{
- struct osc_device *dev = lu2osc_dev(slice->cpl_obj->co_lu.lo_dev);
- struct obd_import *imp = class_exp2cliimp(dev->od_exp);
+ struct cl_io *io = ios->cis_io;
struct osc_io *oio = cl2osc_io(env, ios);
+ struct osc_object *osc = cl2osc(ios->cis_obj);
+ struct cl_page *page;
+ struct cl_page *last_page;
+ struct osc_page *opg;
int result = 0;
- /*
- * This implements OBD_BRW_CHECK logic from old client.
- */
+ LASSERT(qin->pl_nr > 0);
+
+ /* Handle partial page cases */
+ last_page = cl_page_list_last(qin);
+ if (oio->oi_lockless) {
+ page = cl_page_list_first(qin);
+ if (page == last_page) {
+ cl_page_clip(env, page, from, to);
+ } else {
+ if (from != 0)
+ cl_page_clip(env, page, from, PAGE_SIZE);
+ if (to != PAGE_SIZE)
+ cl_page_clip(env, last_page, 0, to);
+ }
+ }
+
+ while (qin->pl_nr > 0) {
+ struct osc_async_page *oap;
+
+ page = cl_page_list_first(qin);
+ opg = osc_cl_page_osc(page, osc);
+ oap = &opg->ops_oap;
+
+ if (!list_empty(&oap->oap_rpc_item)) {
+ CDEBUG(D_CACHE, "Busy oap %p page %p for submit.\n",
+ oap, opg);
+ result = -EBUSY;
+ break;
+ }
+
+ /* The page may be already in dirty cache. */
+ if (list_empty(&oap->oap_pending_item)) {
+ result = osc_page_cache_add(env, &opg->ops_cl, io);
+ if (result != 0)
+ break;
+ }
+
+ osc_page_touch_at(env, osc2cl(osc), osc_index(opg),
+ page == last_page ? to : PAGE_SIZE);
+
+ cl_page_list_del(env, qin, page);
- if (!imp || imp->imp_invalid)
- result = -EIO;
- if (result == 0 && oio->oi_lockless)
- /* this page contains `invalid' data, but who cares?
- * nobody can access the invalid data.
- * in osc_io_commit_write(), we're going to write exact
- * [from, to) bytes of this page to OST. -jay
+ (*cb)(env, io, page);
+ /* Can't access page any more. Page can be in transfer and
+ * complete at any time.
*/
- cl_page_export(env, slice->cpl_page, 1);
+ }
+ /* for sync write, kernel will wait for this page to be flushed before
+ * osc_io_end() is called, so release it earlier.
+ * for mkwrite(), it's known there is no further pages.
+ */
+ if (cl_io_is_sync_write(io) && oio->oi_active) {
+ osc_extent_release(env, oio->oi_active);
+ oio->oi_active = NULL;
+ }
+
+ CDEBUG(D_INFO, "%d %d\n", qin->pl_nr, result);
return result;
}
-static int osc_io_commit_write(const struct lu_env *env,
- const struct cl_io_slice *ios,
- const struct cl_page_slice *slice,
- unsigned from, unsigned to)
+static int osc_io_rw_iter_init(const struct lu_env *env,
+ const struct cl_io_slice *ios)
{
- struct osc_io *oio = cl2osc_io(env, ios);
- struct osc_page *opg = cl2osc_page(slice);
- struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj);
- struct osc_async_page *oap = &opg->ops_oap;
+ struct cl_io *io = ios->cis_io;
+ struct osc_io *oio = osc_env_io(env);
+ struct osc_object *osc = cl2osc(ios->cis_obj);
+ struct client_obd *cli = osc_cli(osc);
+ unsigned long c;
+ unsigned int npages;
+ unsigned int max_pages;
+
+ if (cl_io_is_append(io))
+ return 0;
+
+ npages = io->u.ci_rw.crw_count >> PAGE_SHIFT;
+ if (io->u.ci_rw.crw_pos & ~PAGE_MASK)
+ ++npages;
+
+ max_pages = cli->cl_max_pages_per_rpc * cli->cl_max_rpcs_in_flight;
+ if (npages > max_pages)
+ npages = max_pages;
+
+ c = atomic_read(cli->cl_lru_left);
+ if (c < npages && osc_lru_reclaim(cli) > 0)
+ c = atomic_read(cli->cl_lru_left);
+ while (c >= npages) {
+ if (c == atomic_cmpxchg(cli->cl_lru_left, c, c - npages)) {
+ oio->oi_lru_reserved = npages;
+ break;
+ }
+ c = atomic_read(cli->cl_lru_left);
+ }
- LASSERT(to > 0);
- /*
- * XXX instead of calling osc_page_touch() here and in
- * osc_io_fault_start() it might be more logical to introduce
- * cl_page_touch() method, that generic cl_io_commit_write() and page
- * fault code calls.
- */
- osc_page_touch(env, cl2osc_page(slice), to);
- if (!client_is_remote(osc_export(obj)) &&
- capable(CFS_CAP_SYS_RESOURCE))
- oap->oap_brw_flags |= OBD_BRW_NOQUOTA;
+ return 0;
+}
- if (oio->oi_lockless)
- /* see osc_io_prepare_write() for lockless io handling. */
- cl_page_clip(env, slice->cpl_page, from, to);
+static void osc_io_rw_iter_fini(const struct lu_env *env,
+ const struct cl_io_slice *ios)
+{
+ struct osc_io *oio = osc_env_io(env);
+ struct osc_object *osc = cl2osc(ios->cis_obj);
+ struct client_obd *cli = osc_cli(osc);
- return 0;
+ if (oio->oi_lru_reserved > 0) {
+ atomic_add(oio->oi_lru_reserved, cli->cl_lru_left);
+ oio->oi_lru_reserved = 0;
+ }
+ oio->oi_write_osclock = NULL;
}
static int osc_io_fault_start(const struct lu_env *env,
@@ -342,31 +393,21 @@ static int osc_async_upcall(void *a, int rc)
* Checks that there are no pages being written in the extent being truncated.
*/
static int trunc_check_cb(const struct lu_env *env, struct cl_io *io,
- struct cl_page *page, void *cbdata)
+ struct osc_page *ops, void *cbdata)
{
- const struct cl_page_slice *slice;
- struct osc_page *ops;
+ struct cl_page *page = ops->ops_cl.cpl_page;
struct osc_async_page *oap;
__u64 start = *(__u64 *)cbdata;
- slice = cl_page_at(page, &osc_device_type);
- LASSERT(slice);
- ops = cl2osc_page(slice);
oap = &ops->ops_oap;
-
if (oap->oap_cmd & OBD_BRW_WRITE &&
!list_empty(&oap->oap_pending_item))
CL_PAGE_DEBUG(D_ERROR, env, page, "exists %llu/%s.\n",
start, current->comm);
- {
- struct page *vmpage = cl_page_vmpage(env, page);
-
- if (PageLocked(vmpage))
- CDEBUG(D_CACHE, "page %p index %lu locked for %d.\n",
- ops, page->cp_index,
- (oap->oap_cmd & OBD_BRW_RWMASK));
- }
+ if (PageLocked(page->cp_vmpage))
+ CDEBUG(D_CACHE, "page %p index %lu locked for %d.\n",
+ ops, osc_index(ops), oap->oap_cmd & OBD_BRW_RWMASK);
return CLP_GANG_OKAY;
}
@@ -385,8 +426,9 @@ static void osc_trunc_check(const struct lu_env *env, struct cl_io *io,
/*
* Complain if there are pages in the truncated region.
*/
- cl_page_gang_lookup(env, clob, io, start + partial, CL_PAGE_EOF,
- trunc_check_cb, (void *)&size);
+ osc_page_gang_lookup(env, io, cl2osc(clob),
+ start + partial, CL_PAGE_EOF,
+ trunc_check_cb, (void *)&size);
}
static int osc_io_setattr_start(const struct lu_env *env,
@@ -650,6 +692,8 @@ static const struct cl_io_operations osc_io_ops = {
.cio_fini = osc_io_fini
},
[CIT_WRITE] = {
+ .cio_iter_init = osc_io_rw_iter_init,
+ .cio_iter_fini = osc_io_rw_iter_fini,
.cio_start = osc_io_write_start,
.cio_end = osc_io_end,
.cio_fini = osc_io_fini
@@ -672,16 +716,8 @@ static const struct cl_io_operations osc_io_ops = {
.cio_fini = osc_io_fini
}
},
- .req_op = {
- [CRT_READ] = {
- .cio_submit = osc_io_submit
- },
- [CRT_WRITE] = {
- .cio_submit = osc_io_submit
- }
- },
- .cio_prepare_write = osc_io_prepare_write,
- .cio_commit_write = osc_io_commit_write
+ .cio_submit = osc_io_submit,
+ .cio_commit_async = osc_io_commit_async
};
/*****************************************************************************
@@ -718,8 +754,7 @@ static void osc_req_attr_set(const struct lu_env *env,
struct lov_oinfo *oinfo;
struct cl_req *clerq;
struct cl_page *apage; /* _some_ page in @clerq */
- struct cl_lock *lock; /* _some_ lock protecting @apage */
- struct osc_lock *olck;
+ struct ldlm_lock *lock; /* _some_ lock protecting @apage */
struct osc_page *opg;
struct obdo *oa;
struct ost_lvb *lvb;
@@ -753,31 +788,32 @@ static void osc_req_attr_set(const struct lu_env *env,
LASSERT(!list_empty(&clerq->crq_pages));
apage = container_of(clerq->crq_pages.next,
struct cl_page, cp_flight);
- opg = osc_cl_page_osc(apage);
- apage = opg->ops_cl.cpl_page; /* now apage is a sub-page */
- lock = cl_lock_at_page(env, apage->cp_obj, apage, NULL, 1, 1);
- if (!lock) {
- struct cl_object_header *head;
- struct cl_lock *scan;
-
- head = cl_object_header(apage->cp_obj);
- list_for_each_entry(scan, &head->coh_locks, cll_linkage)
- CL_LOCK_DEBUG(D_ERROR, env, scan,
- "no cover page!\n");
- CL_PAGE_DEBUG(D_ERROR, env, apage,
- "dump uncover page!\n");
+ opg = osc_cl_page_osc(apage, NULL);
+ lock = osc_dlmlock_at_pgoff(env, cl2osc(obj), osc_index(opg),
+ 1, 1);
+ if (!lock && !opg->ops_srvlock) {
+ struct ldlm_resource *res;
+ struct ldlm_res_id *resname;
+
+ CL_PAGE_DEBUG(D_ERROR, env, apage, "uncovered page!\n");
+
+ resname = &osc_env_info(env)->oti_resname;
+ ostid_build_res_name(&oinfo->loi_oi, resname);
+ res = ldlm_resource_get(
+ osc_export(cl2osc(obj))->exp_obd->obd_namespace,
+ NULL, resname, LDLM_EXTENT, 0);
+ ldlm_resource_dump(D_ERROR, res);
+
dump_stack();
LBUG();
}
- olck = osc_lock_at(lock);
- LASSERT(ergo(opg->ops_srvlock, !olck->ols_lock));
/* check for lockless io. */
- if (olck->ols_lock) {
- oa->o_handle = olck->ols_lock->l_remote_handle;
+ if (lock) {
+ oa->o_handle = lock->l_remote_handle;
oa->o_valid |= OBD_MD_FLHANDLE;
+ LDLM_LOCK_PUT(lock);
}
- cl_lock_put(env, lock);
}
}
@@ -807,8 +843,9 @@ int osc_req_init(const struct lu_env *env, struct cl_device *dev,
if (or) {
cl_req_slice_add(req, &or->or_cl, dev, &osc_req_ops);
result = 0;
- } else
+ } else {
result = -ENOMEM;
+ }
return result;
}