diff options
Diffstat (limited to 'drivers/staging/lustre/lustre/osc/osc_request.c')
-rw-r--r-- | drivers/staging/lustre/lustre/osc/osc_request.c | 238 |
1 files changed, 112 insertions, 126 deletions
diff --git a/drivers/staging/lustre/lustre/osc/osc_request.c b/drivers/staging/lustre/lustre/osc/osc_request.c index 12113dfd87b8..367f83af12c0 100644 --- a/drivers/staging/lustre/lustre/osc/osc_request.c +++ b/drivers/staging/lustre/lustre/osc/osc_request.c @@ -38,7 +38,6 @@ #include "../../include/linux/libcfs/libcfs.h" - #include "../include/lustre_dlm.h" #include "../include/lustre_net.h" #include "../include/lustre/lustre_user.h" @@ -50,9 +49,18 @@ #include "../include/lustre_param.h" #include "../include/lustre_fid.h" #include "../include/obd_class.h" +#include "../include/obd.h" #include "osc_internal.h" #include "osc_cl_internal.h" +atomic_t osc_pool_req_count; +unsigned int osc_reqpool_maxreqcount; +struct ptlrpc_request_pool *osc_rq_pool; + +/* max memory used for request pool, unit is MB */ +static unsigned int osc_reqpool_mem_max = 5; +module_param(osc_reqpool_mem_max, uint, 0444); + struct osc_brw_async_args { struct obdo *aa_oa; int aa_requested_nob; @@ -63,7 +71,6 @@ struct osc_brw_async_args { struct client_obd *aa_cli; struct list_head aa_oaps; struct list_head aa_exts; - struct obd_capa *aa_ocapa; struct cl_req *aa_clerq; }; @@ -191,22 +198,6 @@ static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, return lsm_size; } -static inline void osc_pack_capa(struct ptlrpc_request *req, - struct ost_body *body, void *capa) -{ - struct obd_capa *oc = (struct obd_capa *)capa; - struct lustre_capa *c; - - if (!capa) - return; - - c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1); - LASSERT(c); - capa_cpy(c, oc); - body->oa.o_valid |= OBD_MD_FLOSSCAPA; - DEBUG_CAPA(D_SEC, c, "pack"); -} - static inline void osc_pack_req_body(struct ptlrpc_request *req, struct obd_info *oinfo) { @@ -217,18 +208,6 @@ static inline void osc_pack_req_body(struct ptlrpc_request *req, lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oinfo->oi_oa); - osc_pack_capa(req, body, oinfo->oi_capa); -} - -static inline void osc_set_capa_size(struct ptlrpc_request *req, - const struct req_msg_field *field, - struct obd_capa *oc) -{ - if (oc == NULL) - req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0); - else - /* it is already calculated as sizeof struct obd_capa */ - ; } static int osc_getattr_interpret(const struct lu_env *env, @@ -270,7 +249,6 @@ static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo, if (req == NULL) return -ENOMEM; - osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR); if (rc) { ptlrpc_request_free(req); @@ -301,7 +279,6 @@ static int osc_getattr(const struct lu_env *env, struct obd_export *exp, if (req == NULL) return -ENOMEM; - osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR); if (rc) { ptlrpc_request_free(req); @@ -347,7 +324,6 @@ static int osc_setattr(const struct lu_env *env, struct obd_export *exp, if (req == NULL) return -ENOMEM; - osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR); if (rc) { ptlrpc_request_free(req); @@ -411,7 +387,6 @@ int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo, if (req == NULL) return -ENOMEM; - osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR); if (rc) { ptlrpc_request_free(req); @@ -428,19 +403,19 @@ int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo, /* do mds to ost setattr asynchronously */ if (!rqset) { /* Do not wait for response. */ - ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); + ptlrpcd_add_req(req); } else { req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret; - CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args)); + CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args)); sa = ptlrpc_req_async_args(req); sa->sa_oa = oinfo->oi_oa; sa->sa_upcall = upcall; sa->sa_cookie = cookie; if (rqset == PTLRPCD_SET) - ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); + ptlrpcd_add_req(req); else ptlrpc_set_add_req(rqset, req); } @@ -557,7 +532,6 @@ int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo, if (req == NULL) return -ENOMEM; - osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH); if (rc) { ptlrpc_request_free(req); @@ -570,18 +544,17 @@ int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo, LASSERT(body); lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oinfo->oi_oa); - osc_pack_capa(req, body, oinfo->oi_capa); ptlrpc_request_set_replen(req); req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret; - CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args)); + CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args)); sa = ptlrpc_req_async_args(req); sa->sa_oa = oinfo->oi_oa; sa->sa_upcall = upcall; sa->sa_cookie = cookie; if (rqset == PTLRPCD_SET) - ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); + ptlrpcd_add_req(req); else ptlrpc_set_add_req(rqset, req); @@ -600,7 +573,7 @@ static int osc_sync_interpret(const struct lu_env *env, body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); if (body == NULL) { - CERROR ("can't unpack ost_body\n"); + CERROR("can't unpack ost_body\n"); rc = -EPROTO; goto out; } @@ -624,7 +597,6 @@ int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo, if (req == NULL) return -ENOMEM; - osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC); if (rc) { ptlrpc_request_free(req); @@ -636,7 +608,6 @@ int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo, LASSERT(body); lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oinfo->oi_oa); - osc_pack_capa(req, body, oinfo->oi_capa); ptlrpc_request_set_replen(req); req->rq_interpret_reply = osc_sync_interpret; @@ -648,7 +619,7 @@ int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo, fa->fa_cookie = cookie; if (rqset == PTLRPCD_SET) - ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); + ptlrpcd_add_req(req); else ptlrpc_set_add_req(rqset, req); @@ -754,8 +725,7 @@ int osc_create(const struct lu_env *env, struct obd_export *exp, * cookies to the MDS after committing destroy transactions. */ static int osc_destroy(const struct lu_env *env, struct obd_export *exp, struct obdo *oa, struct lov_stripe_md *ea, - struct obd_trans_info *oti, struct obd_export *md_export, - void *capa) + struct obd_trans_info *oti, struct obd_export *md_export) { struct client_obd *cli = &exp->exp_obd->u.cli; struct ptlrpc_request *req; @@ -777,7 +747,6 @@ static int osc_destroy(const struct lu_env *env, struct obd_export *exp, return -ENOMEM; } - osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa); rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY, 0, &cancels, count); if (rc) { @@ -794,7 +763,6 @@ static int osc_destroy(const struct lu_env *env, struct obd_export *exp, LASSERT(body); lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa); - osc_pack_capa(req, body, (struct obd_capa *)capa); ptlrpc_request_set_replen(req); /* If osc_destroy is for destroying the unlink orphan, @@ -817,7 +785,7 @@ static int osc_destroy(const struct lu_env *env, struct obd_export *exp, } /* Do not wait for response */ - ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); + ptlrpcd_add_req(req); return 0; } @@ -910,7 +878,7 @@ static int osc_shrink_grant_interpret(const struct lu_env *env, LASSERT(body); osc_update_grant(cli, body); out: - OBDO_FREE(oa); + kmem_cache_free(obdo_cachep, oa); return rc; } @@ -1100,7 +1068,7 @@ static void handle_short_read(int nob_read, u32 page_count, /* skip bytes read OK */ while (nob_read > 0) { - LASSERT (page_count > 0); + LASSERT(page_count > 0); if (pga[i]->count > nob_read) { /* EOF inside this page */ @@ -1210,6 +1178,7 @@ static u32 osc_checksum_bulk(int nob, u32 pg_count, OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) { unsigned char *ptr = kmap(pga[i]->pg); int off = pga[i]->off & ~CFS_PAGE_MASK; + memcpy(ptr + off, "bad1", min(4, nob)); kunmap(pga[i]->pg); } @@ -1247,7 +1216,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli, struct lov_stripe_md *lsm, u32 page_count, struct brw_page **pga, struct ptlrpc_request **reqp, - struct obd_capa *ocapa, int reserve, + int reserve, int resend) { struct ptlrpc_request *req; @@ -1268,7 +1237,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli, if ((cmd & OBD_BRW_WRITE) != 0) { opc = OST_WRITE; req = ptlrpc_request_alloc_pool(cli->cl_import, - cli->cl_import->imp_rq_pool, + osc_rq_pool, &RQF_OST_BRW_WRITE); } else { opc = OST_READ; @@ -1287,7 +1256,6 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli, sizeof(*ioobj)); req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT, niocount * sizeof(*niobuf)); - osc_set_capa_size(req, &RMF_CAPA1, ocapa); rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc); if (rc) { @@ -1326,7 +1294,6 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli, * "max - 1" for old client compatibility sending "0", and also so the * the actual maximum is a power-of-two number, not one less. LU-1431 */ ioobj_max_brw_set(ioobj, desc->bd_md_max_brw); - osc_pack_capa(req, body, ocapa); LASSERT(page_count > 0); pg_prev = pga[0]; for (requested_nob = i = 0; i < page_count; i++, niobuf++) { @@ -1435,8 +1402,6 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli, aa->aa_ppga = pga; aa->aa_cli = cli; INIT_LIST_HEAD(&aa->aa_oaps); - if (ocapa && reserve) - aa->aa_ocapa = capa_get(ocapa); *reqp = req; return 0; @@ -1571,7 +1536,7 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) } if (rc != req->rq_bulk->bd_nob_transferred) { - CERROR ("Unexpected rc %d (%d transferred)\n", + CERROR("Unexpected rc %d (%d transferred)\n", rc, req->rq_bulk->bd_nob_transferred); return -EPROTO; } @@ -1582,19 +1547,17 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) if (body->oa.o_valid & OBD_MD_FLCKSUM) { static int cksum_counter; __u32 server_cksum = body->oa.o_cksum; - char *via; - char *router; + char *via = ""; + char *router = ""; cksum_type_t cksum_type; - cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS? + cksum_type = cksum_type_unpack(body->oa.o_valid&OBD_MD_FLFLAGS ? body->oa.o_flags : 0); client_cksum = osc_checksum_bulk(rc, aa->aa_page_count, aa->aa_ppga, OST_READ, cksum_type); - if (peer->nid == req->rq_bulk->bd_sender) { - via = router = ""; - } else { + if (peer->nid != req->rq_bulk->bd_sender) { via = " via "; router = libcfs_nid2str(req->rq_bulk->bd_sender); } @@ -1654,11 +1617,11 @@ static int osc_brw_redo_request(struct ptlrpc_request *request, "redo for recoverable error %d", rc); rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) == - OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ, + OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ, aa->aa_cli, aa->aa_oa, NULL /* lsm unused by osc currently */, aa->aa_page_count, aa->aa_ppga, - &new_req, aa->aa_ocapa, 0, 1); + &new_req, 0, 1); if (rc) return rc; @@ -1681,9 +1644,9 @@ static int osc_brw_redo_request(struct ptlrpc_request *request, /* cap resend delay to the current request timeout, this is similar to * what ptlrpc does (see after_reply()) */ if (aa->aa_resends > new_req->rq_timeout) - new_req->rq_sent = get_seconds() + new_req->rq_timeout; + new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout; else - new_req->rq_sent = get_seconds() + aa->aa_resends; + new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends; new_req->rq_generation_set = 1; new_req->rq_import_generation = request->rq_import_generation; @@ -1702,14 +1665,11 @@ static int osc_brw_redo_request(struct ptlrpc_request *request, } } - new_aa->aa_ocapa = aa->aa_ocapa; - aa->aa_ocapa = NULL; - /* XXX: This code will run into problem if we're going to support * to add a series of BRW RPCs into a self-defined ptlrpc_request_set * and wait for all of them to be finished. We should inherit request * set from old request. */ - ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1); + ptlrpcd_add_req(new_req); DEBUG_REQ(D_INFO, new_req, "new request"); return 0; @@ -1786,11 +1746,6 @@ static int brw_interpret(const struct lu_env *env, rc = -EIO; } - if (aa->aa_ocapa) { - capa_put(aa->aa_ocapa); - aa->aa_ocapa = NULL; - } - list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) { if (obj == NULL && rc == 0) { obj = osc2cl(ext->oe_obj); @@ -1832,7 +1787,7 @@ static int brw_interpret(const struct lu_env *env, } cl_object_put(env, obj); } - OBDO_FREE(aa->aa_oa); + kmem_cache_free(obdo_cachep, aa->aa_oa); cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc : req->rq_bulk->bd_nob_transferred); @@ -1850,7 +1805,7 @@ static int brw_interpret(const struct lu_env *env, osc_wake_cache_waiters(cli); client_obd_list_unlock(&cli->cl_loi_list_lock); - osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME); + osc_io_unplug(env, cli, NULL); return rc; } @@ -1860,7 +1815,7 @@ static int brw_interpret(const struct lu_env *env, * Extents in the list must be in OES_RPC state. */ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, - struct list_head *ext_list, int cmd, pdl_policy_t pol) + struct list_head *ext_list, int cmd) { struct ptlrpc_request *req = NULL; struct osc_extent *ext; @@ -1920,7 +1875,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, goto out; } - OBDO_ALLOC(oa); + oa = kmem_cache_alloc(obdo_cachep, GFP_NOFS | __GFP_ZERO); if (oa == NULL) { rc = -ENOMEM; goto out; @@ -1929,6 +1884,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, i = 0; list_for_each_entry(oap, &rpc_list, oap_rpc_item) { struct cl_page *page = oap2cl_page(oap); + if (clerq == NULL) { clerq = cl_req_alloc(env, page, crt, 1 /* only 1-object rpcs for now */); @@ -1966,7 +1922,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, sort_brw_pages(pga, page_count); rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, - pga, &req, crattr->cra_capa, 1, 0); + pga, &req, 1, 0); if (rc != 0) { CERROR("prep_req failed: %d\n", rc); goto out; @@ -2034,35 +1990,20 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight); - /* XXX: Maybe the caller can check the RPC bulk descriptor to - * see which CPU/NUMA node the majority of pages were allocated - * on, and try to assign the async RPC to the CPU core - * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic. - * - * But on the other hand, we expect that multiple ptlrpcd - * threads and the initial write sponsor can run in parallel, - * especially when data checksum is enabled, which is CPU-bound - * operation and single ptlrpcd thread cannot process in time. - * So more ptlrpcd threads sharing BRW load - * (with PDL_POLICY_ROUND) seems better. - */ - ptlrpcd_add_req(req, pol, -1); + ptlrpcd_add_req(req); rc = 0; out: if (mem_tight != 0) cfs_memory_pressure_restore(mpflag); - if (crattr != NULL) { - capa_put(crattr->cra_capa); - kfree(crattr); - } + kfree(crattr); if (rc != 0) { LASSERT(req == NULL); if (oa) - OBDO_FREE(oa); + kmem_cache_free(obdo_cachep, oa); kfree(pga); /* this should happen rarely and is pretty bad, it makes the * pending list not follow the dirty order */ @@ -2149,6 +2090,7 @@ static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb, /* The request was created before ldlm_cli_enqueue call. */ if (rc == ELDLM_LOCK_ABORTED) { struct ldlm_reply *rep; + rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP); @@ -2335,6 +2277,7 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, no_match: if (intent) { LIST_HEAD(cancels); + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE_LVB); if (req == NULL) @@ -2359,6 +2302,7 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, if (rqset) { if (!rc) { struct osc_enqueue_args *aa; + CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args)); aa = ptlrpc_req_async_args(req); aa->oa_ei = einfo; @@ -2373,7 +2317,7 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_enqueue_interpret; if (rqset == PTLRPCD_SET) - ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); + ptlrpcd_add_req(req); else ptlrpc_set_add_req(rqset, req); } else if (intent) { @@ -2645,7 +2589,6 @@ static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump) return rc; } - static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, void *karg, void *uarg) { @@ -2720,7 +2663,7 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, data->ioc_offset); goto out; case OBD_IOC_POLL_QUOTACHECK: - err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg); + err = osc_quota_poll_check(exp, karg); goto out; case OBD_IOC_PING_TARGET: err = ptlrpc_obd_ping(obd); @@ -2787,8 +2730,7 @@ static int osc_get_info(const struct lu_env *env, struct obd_export *exp, ptlrpc_req_finished(req); return rc; } else if (KEY_IS(KEY_FIEMAP)) { - struct ll_fiemap_info_key *fm_key = - (struct ll_fiemap_info_key *)key; + struct ll_fiemap_info_key *fm_key = key; struct ldlm_res_id res_id; ldlm_policy_data_t policy; struct lustre_handle lockh; @@ -2910,7 +2852,7 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, struct client_obd *cli = &obd->u.cli; LASSERT(cli->cl_cache == NULL); /* only once */ - cli->cl_cache = (struct cl_client_cache *)val; + cli->cl_cache = val; atomic_inc(&cli->cl_cache->ccc_users); cli->cl_lru_left = &cli->cl_cache->ccc_lru_left; @@ -2973,7 +2915,7 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); aa = ptlrpc_req_async_args(req); - OBDO_ALLOC(oa); + oa = kmem_cache_alloc(obdo_cachep, GFP_NOFS | __GFP_ZERO); if (!oa) { ptlrpc_req_finished(req); return -ENOMEM; @@ -2988,8 +2930,9 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, LASSERT(set != NULL); ptlrpc_set_add_req(set, req); ptlrpc_check_set(NULL, set); - } else - ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); + } else { + ptlrpcd_add_req(req); + } return 0; } @@ -3081,7 +3024,7 @@ static int osc_import_event(struct obd_device *obd, cli = &obd->u.cli; /* all pages go to failing rpcs due to the invalid * import */ - osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND); + osc_io_unplug(env, cli, NULL); ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY); cl_env_put(env, &refcheck); @@ -3101,7 +3044,7 @@ static int osc_import_event(struct obd_device *obd, /* See bug 7198 */ if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL) - imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL; + imp->imp_client->cli_request_portal = OST_REQUEST_PORTAL; rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL); break; @@ -3153,7 +3096,7 @@ static int brw_queue_work(const struct lu_env *env, void *data) CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli); - osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME); + osc_io_unplug(env, cli, NULL); return 0; } @@ -3163,6 +3106,9 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) struct client_obd *cli = &obd->u.cli; void *handler; int rc; + int adding; + int added; + int req_count; rc = ptlrpcd_addref(); if (rc) @@ -3191,15 +3137,20 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) ptlrpc_lprocfs_register_obd(obd); } - /* We need to allocate a few requests more, because - * brw_interpret tries to create new requests before freeing - * previous ones, Ideally we want to have 2x max_rpcs_in_flight - * reserved, but I'm afraid that might be too much wasted RAM - * in fact, so 2 is just my guess and still should work. */ - cli->cl_import->imp_rq_pool = - ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2, - OST_MAXREQSIZE, - ptlrpc_add_rqs_to_pool); + /* + * We try to control the total number of requests with a upper limit + * osc_reqpool_maxreqcount. There might be some race which will cause + * over-limit allocation, but it is fine. + */ + req_count = atomic_read(&osc_pool_req_count); + if (req_count < osc_reqpool_maxreqcount) { + adding = cli->cl_max_rpcs_in_flight + 2; + if (req_count + adding > osc_reqpool_maxreqcount) + adding = osc_reqpool_maxreqcount - req_count; + + added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding); + atomic_add(added, &osc_pool_req_count); + } INIT_LIST_HEAD(&cli->cl_grant_shrink_list); ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery); @@ -3219,6 +3170,7 @@ static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) switch (stage) { case OBD_CLEANUP_EARLY: { struct obd_import *imp; + imp = obd->u.cli.cl_import; CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name); /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */ @@ -3339,6 +3291,8 @@ extern struct lock_class_key osc_ast_guard_class; static int __init osc_init(void) { struct lprocfs_static_vars lvars = { NULL }; + unsigned int reqpool_size; + unsigned int reqsize; int rc; /* print an address of _any_ initialized kernel symbol from this @@ -3354,14 +3308,45 @@ static int __init osc_init(void) rc = class_register_type(&osc_obd_ops, NULL, LUSTRE_OSC_NAME, &osc_device_type); - if (rc) { - lu_kmem_fini(osc_caches); - return rc; - } + if (rc) + goto out_kmem; spin_lock_init(&osc_ast_guard); lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class); + /* This is obviously too much memory, only prevent overflow here */ + if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0) { + rc = -EINVAL; + goto out_type; + } + + reqpool_size = osc_reqpool_mem_max << 20; + + reqsize = 1; + while (reqsize < OST_MAXREQSIZE) + reqsize = reqsize << 1; + + /* + * We don't enlarge the request count in OSC pool according to + * cl_max_rpcs_in_flight. The allocation from the pool will only be + * tried after normal allocation failed. So a small OSC pool won't + * cause much performance degression in most of cases. + */ + osc_reqpool_maxreqcount = reqpool_size / reqsize; + + atomic_set(&osc_pool_req_count, 0); + osc_rq_pool = ptlrpc_init_rq_pool(0, OST_MAXREQSIZE, + ptlrpc_add_rqs_to_pool); + + if (osc_rq_pool) + return 0; + + rc = -ENOMEM; + +out_type: + class_unregister_type(LUSTRE_OSC_NAME); +out_kmem: + lu_kmem_fini(osc_caches); return rc; } @@ -3369,6 +3354,7 @@ static void /*__exit*/ osc_exit(void) { class_unregister_type(LUSTRE_OSC_NAME); lu_kmem_fini(osc_caches); + ptlrpc_free_rq_pool(osc_rq_pool); } MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>"); |