aboutsummaryrefslogtreecommitdiffstats
path: root/drivers/staging/lustre/lustre/obdclass/lu_object.c
diff options
context:
space:
mode:
Diffstat (limited to 'drivers/staging/lustre/lustre/obdclass/lu_object.c')
-rw-r--r--drivers/staging/lustre/lustre/obdclass/lu_object.c2056
1 files changed, 0 insertions, 2056 deletions
diff --git a/drivers/staging/lustre/lustre/obdclass/lu_object.c b/drivers/staging/lustre/lustre/obdclass/lu_object.c
deleted file mode 100644
index aa9d74e087f4..000000000000
--- a/drivers/staging/lustre/lustre/obdclass/lu_object.c
+++ /dev/null
@@ -1,2056 +0,0 @@
-// SPDX-License-Identifier: GPL-2.0
-/*
- * GPL HEADER START
- *
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 only,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * General Public License version 2 for more details (a copy is included
- * in the LICENSE file that accompanied this code).
- *
- * You should have received a copy of the GNU General Public License
- * version 2 along with this program; If not, see
- * http://www.gnu.org/licenses/gpl-2.0.html
- *
- * GPL HEADER END
- */
-/*
- * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
- * Use is subject to license terms.
- *
- * Copyright (c) 2011, 2015, Intel Corporation.
- */
-/*
- * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
- *
- * lustre/obdclass/lu_object.c
- *
- * Lustre Object.
- * These are the only exported functions, they provide some generic
- * infrastructure for managing object devices
- *
- * Author: Nikita Danilov <nikita.danilov@sun.com>
- */
-
-#define DEBUG_SUBSYSTEM S_CLASS
-
-#include <linux/module.h>
-
-/* hash_long() */
-#include <linux/libcfs/libcfs_hash.h>
-#include <obd_class.h>
-#include <obd_support.h>
-#include <lustre_disk.h>
-#include <lustre_fid.h>
-#include <lu_object.h>
-#include <cl_object.h>
-#include <lu_ref.h>
-#include <linux/list.h>
-
-struct lu_site_bkt_data {
- /**
- * LRU list, updated on each access to object. Protected by
- * bucket lock of lu_site::ls_obj_hash.
- *
- * "Cold" end of LRU is lu_site::ls_lru.next. Accessed object are
- * moved to the lu_site::ls_lru.prev (this is due to the non-existence
- * of list_for_each_entry_safe_reverse()).
- */
- struct list_head lsb_lru;
- /**
- * Wait-queue signaled when an object in this site is ultimately
- * destroyed (lu_object_free()). It is used by lu_object_find() to
- * wait before re-trying when object in the process of destruction is
- * found in the hash table.
- *
- * \see htable_lookup().
- */
- wait_queue_head_t lsb_marche_funebre;
-};
-
-enum {
- LU_CACHE_PERCENT_MAX = 50,
- LU_CACHE_PERCENT_DEFAULT = 20
-};
-
-#define LU_CACHE_NR_MAX_ADJUST 512
-#define LU_CACHE_NR_UNLIMITED -1
-#define LU_CACHE_NR_DEFAULT LU_CACHE_NR_UNLIMITED
-#define LU_CACHE_NR_LDISKFS_LIMIT LU_CACHE_NR_UNLIMITED
-#define LU_CACHE_NR_ZFS_LIMIT 256
-
-#define LU_SITE_BITS_MIN 12
-#define LU_SITE_BITS_MAX 24
-#define LU_SITE_BITS_MAX_CL 19
-/**
- * total 256 buckets, we don't want too many buckets because:
- * - consume too much memory
- * - avoid unbalanced LRU list
- */
-#define LU_SITE_BKT_BITS 8
-
-static unsigned int lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
-module_param(lu_cache_percent, int, 0644);
-MODULE_PARM_DESC(lu_cache_percent, "Percentage of memory to be used as lu_object cache");
-
-static long lu_cache_nr = LU_CACHE_NR_DEFAULT;
-module_param(lu_cache_nr, long, 0644);
-MODULE_PARM_DESC(lu_cache_nr, "Maximum number of objects in lu_object cache");
-
-static void lu_object_free(const struct lu_env *env, struct lu_object *o);
-static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx);
-
-wait_queue_head_t *
-lu_site_wq_from_fid(struct lu_site *site, struct lu_fid *fid)
-{
- struct cfs_hash_bd bd;
- struct lu_site_bkt_data *bkt;
-
- cfs_hash_bd_get(site->ls_obj_hash, fid, &bd);
- bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
- return &bkt->lsb_marche_funebre;
-}
-EXPORT_SYMBOL(lu_site_wq_from_fid);
-
-/**
- * Decrease reference counter on object. If last reference is freed, return
- * object to the cache, unless lu_object_is_dying(o) holds. In the latter
- * case, free object immediately.
- */
-void lu_object_put(const struct lu_env *env, struct lu_object *o)
-{
- struct lu_site_bkt_data *bkt;
- struct lu_object_header *top;
- struct lu_site *site;
- struct lu_object *orig;
- struct cfs_hash_bd bd;
- const struct lu_fid *fid;
-
- top = o->lo_header;
- site = o->lo_dev->ld_site;
- orig = o;
-
- /*
- * till we have full fids-on-OST implemented anonymous objects
- * are possible in OSP. such an object isn't listed in the site
- * so we should not remove it from the site.
- */
- fid = lu_object_fid(o);
- if (fid_is_zero(fid)) {
- LASSERT(!top->loh_hash.next && !top->loh_hash.pprev);
- LASSERT(list_empty(&top->loh_lru));
- if (!atomic_dec_and_test(&top->loh_ref))
- return;
- list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
- if (o->lo_ops->loo_object_release)
- o->lo_ops->loo_object_release(env, o);
- }
- lu_object_free(env, orig);
- return;
- }
-
- cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd);
- bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
-
- if (!cfs_hash_bd_dec_and_lock(site->ls_obj_hash, &bd, &top->loh_ref)) {
- if (lu_object_is_dying(top)) {
- /*
- * somebody may be waiting for this, currently only
- * used for cl_object, see cl_object_put_last().
- */
- wake_up_all(&bkt->lsb_marche_funebre);
- }
- return;
- }
-
- /*
- * When last reference is released, iterate over object
- * layers, and notify them that object is no longer busy.
- */
- list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
- if (o->lo_ops->loo_object_release)
- o->lo_ops->loo_object_release(env, o);
- }
-
- if (!lu_object_is_dying(top)) {
- LASSERT(list_empty(&top->loh_lru));
- list_add_tail(&top->loh_lru, &bkt->lsb_lru);
- percpu_counter_inc(&site->ls_lru_len_counter);
- CDEBUG(D_INODE, "Add %p to site lru. hash: %p, bkt: %p\n",
- o, site->ls_obj_hash, bkt);
- cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
- return;
- }
-
- /*
- * If object is dying (will not be cached), then removed it
- * from hash table and LRU.
- *
- * This is done with hash table and LRU lists locked. As the only
- * way to acquire first reference to previously unreferenced
- * object is through hash-table lookup (lu_object_find()),
- * or LRU scanning (lu_site_purge()), that are done under hash-table
- * and LRU lock, no race with concurrent object lookup is possible
- * and we can safely destroy object below.
- */
- if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags))
- cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash);
- cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
- /*
- * Object was already removed from hash and lru above, can
- * kill it.
- */
- lu_object_free(env, orig);
-}
-EXPORT_SYMBOL(lu_object_put);
-
-/**
- * Kill the object and take it out of LRU cache.
- * Currently used by client code for layout change.
- */
-void lu_object_unhash(const struct lu_env *env, struct lu_object *o)
-{
- struct lu_object_header *top;
-
- top = o->lo_header;
- set_bit(LU_OBJECT_HEARD_BANSHEE, &top->loh_flags);
- if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags)) {
- struct lu_site *site = o->lo_dev->ld_site;
- struct cfs_hash *obj_hash = site->ls_obj_hash;
- struct cfs_hash_bd bd;
-
- cfs_hash_bd_get_and_lock(obj_hash, &top->loh_fid, &bd, 1);
- if (!list_empty(&top->loh_lru)) {
- struct lu_site_bkt_data *bkt;
-
- list_del_init(&top->loh_lru);
- bkt = cfs_hash_bd_extra_get(obj_hash, &bd);
- percpu_counter_dec(&site->ls_lru_len_counter);
- }
- cfs_hash_bd_del_locked(obj_hash, &bd, &top->loh_hash);
- cfs_hash_bd_unlock(obj_hash, &bd, 1);
- }
-}
-EXPORT_SYMBOL(lu_object_unhash);
-
-/**
- * Allocate new object.
- *
- * This follows object creation protocol, described in the comment within
- * struct lu_device_operations definition.
- */
-static struct lu_object *lu_object_alloc(const struct lu_env *env,
- struct lu_device *dev,
- const struct lu_fid *f,
- const struct lu_object_conf *conf)
-{
- struct lu_object *scan;
- struct lu_object *top;
- struct list_head *layers;
- unsigned int init_mask = 0;
- unsigned int init_flag;
- int clean;
- int result;
-
- /*
- * Create top-level object slice. This will also create
- * lu_object_header.
- */
- top = dev->ld_ops->ldo_object_alloc(env, NULL, dev);
- if (!top)
- return ERR_PTR(-ENOMEM);
- if (IS_ERR(top))
- return top;
- /*
- * This is the only place where object fid is assigned. It's constant
- * after this point.
- */
- top->lo_header->loh_fid = *f;
- layers = &top->lo_header->loh_layers;
-
- do {
- /*
- * Call ->loo_object_init() repeatedly, until no more new
- * object slices are created.
- */
- clean = 1;
- init_flag = 1;
- list_for_each_entry(scan, layers, lo_linkage) {
- if (init_mask & init_flag)
- goto next;
- clean = 0;
- scan->lo_header = top->lo_header;
- result = scan->lo_ops->loo_object_init(env, scan, conf);
- if (result != 0) {
- lu_object_free(env, top);
- return ERR_PTR(result);
- }
- init_mask |= init_flag;
-next:
- init_flag <<= 1;
- }
- } while (!clean);
-
- list_for_each_entry_reverse(scan, layers, lo_linkage) {
- if (scan->lo_ops->loo_object_start) {
- result = scan->lo_ops->loo_object_start(env, scan);
- if (result != 0) {
- lu_object_free(env, top);
- return ERR_PTR(result);
- }
- }
- }
-
- lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED);
- return top;
-}
-
-/**
- * Free an object.
- */
-static void lu_object_free(const struct lu_env *env, struct lu_object *o)
-{
- wait_queue_head_t *wq;
- struct lu_site *site;
- struct lu_object *scan;
- struct list_head *layers;
- struct list_head splice;
-
- site = o->lo_dev->ld_site;
- layers = &o->lo_header->loh_layers;
- wq = lu_site_wq_from_fid(site, &o->lo_header->loh_fid);
- /*
- * First call ->loo_object_delete() method to release all resources.
- */
- list_for_each_entry_reverse(scan, layers, lo_linkage) {
- if (scan->lo_ops->loo_object_delete)
- scan->lo_ops->loo_object_delete(env, scan);
- }
-
- /*
- * Then, splice object layers into stand-alone list, and call
- * ->loo_object_free() on all layers to free memory. Splice is
- * necessary, because lu_object_header is freed together with the
- * top-level slice.
- */
- INIT_LIST_HEAD(&splice);
- list_splice_init(layers, &splice);
- while (!list_empty(&splice)) {
- /*
- * Free layers in bottom-to-top order, so that object header
- * lives as long as possible and ->loo_object_free() methods
- * can look at its contents.
- */
- o = container_of(splice.prev, struct lu_object, lo_linkage);
- list_del_init(&o->lo_linkage);
- o->lo_ops->loo_object_free(env, o);
- }
-
- if (waitqueue_active(wq))
- wake_up_all(wq);
-}
-
-/**
- * Free \a nr objects from the cold end of the site LRU list.
- * if canblock is false, then don't block awaiting for another
- * instance of lu_site_purge() to complete
- */
-int lu_site_purge_objects(const struct lu_env *env, struct lu_site *s,
- int nr, bool canblock)
-{
- struct lu_object_header *h;
- struct lu_object_header *temp;
- struct lu_site_bkt_data *bkt;
- struct cfs_hash_bd bd;
- struct cfs_hash_bd bd2;
- struct list_head dispose;
- int did_sth;
- unsigned int start = 0;
- int count;
- int bnr;
- unsigned int i;
-
- if (OBD_FAIL_CHECK(OBD_FAIL_OBD_NO_LRU))
- return 0;
-
- INIT_LIST_HEAD(&dispose);
- /*
- * Under LRU list lock, scan LRU list and move unreferenced objects to
- * the dispose list, removing them from LRU and hash table.
- */
- if (nr != ~0)
- start = s->ls_purge_start;
- bnr = (nr == ~0) ? -1 : nr / (int)CFS_HASH_NBKT(s->ls_obj_hash) + 1;
- again:
- /*
- * It doesn't make any sense to make purge threads parallel, that can
- * only bring troubles to us. See LU-5331.
- */
- if (canblock)
- mutex_lock(&s->ls_purge_mutex);
- else if (!mutex_trylock(&s->ls_purge_mutex))
- goto out;
-
- did_sth = 0;
- cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
- if (i < start)
- continue;
- count = bnr;
- cfs_hash_bd_lock(s->ls_obj_hash, &bd, 1);
- bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
-
- list_for_each_entry_safe(h, temp, &bkt->lsb_lru, loh_lru) {
- LASSERT(atomic_read(&h->loh_ref) == 0);
-
- cfs_hash_bd_get(s->ls_obj_hash, &h->loh_fid, &bd2);
- LASSERT(bd.bd_bucket == bd2.bd_bucket);
-
- cfs_hash_bd_del_locked(s->ls_obj_hash,
- &bd2, &h->loh_hash);
- list_move(&h->loh_lru, &dispose);
- percpu_counter_dec(&s->ls_lru_len_counter);
- if (did_sth == 0)
- did_sth = 1;
-
- if (nr != ~0 && --nr == 0)
- break;
-
- if (count > 0 && --count == 0)
- break;
- }
- cfs_hash_bd_unlock(s->ls_obj_hash, &bd, 1);
- cond_resched();
- /*
- * Free everything on the dispose list. This is safe against
- * races due to the reasons described in lu_object_put().
- */
- while (!list_empty(&dispose)) {
- h = container_of(dispose.next,
- struct lu_object_header, loh_lru);
- list_del_init(&h->loh_lru);
- lu_object_free(env, lu_object_top(h));
- lprocfs_counter_incr(s->ls_stats, LU_SS_LRU_PURGED);
- }
-
- if (nr == 0)
- break;
- }
- mutex_unlock(&s->ls_purge_mutex);
-
- if (nr != 0 && did_sth && start != 0) {
- start = 0; /* restart from the first bucket */
- goto again;
- }
- /* race on s->ls_purge_start, but nobody cares */
- s->ls_purge_start = i % CFS_HASH_NBKT(s->ls_obj_hash);
-out:
- return nr;
-}
-EXPORT_SYMBOL(lu_site_purge_objects);
-
-/*
- * Object printing.
- *
- * Code below has to jump through certain loops to output object description
- * into libcfs_debug_msg-based log. The problem is that lu_object_print()
- * composes object description from strings that are parts of _lines_ of
- * output (i.e., strings that are not terminated by newline). This doesn't fit
- * very well into libcfs_debug_msg() interface that assumes that each message
- * supplied to it is a self-contained output line.
- *
- * To work around this, strings are collected in a temporary buffer
- * (implemented as a value of lu_cdebug_key key), until terminating newline
- * character is detected.
- *
- */
-
-enum {
- /**
- * Maximal line size.
- *
- * XXX overflow is not handled correctly.
- */
- LU_CDEBUG_LINE = 512
-};
-
-struct lu_cdebug_data {
- /**
- * Temporary buffer.
- */
- char lck_area[LU_CDEBUG_LINE];
-};
-
-/* context key constructor/destructor: lu_global_key_init, lu_global_key_fini */
-LU_KEY_INIT_FINI(lu_global, struct lu_cdebug_data);
-
-/**
- * Key, holding temporary buffer. This key is registered very early by
- * lu_global_init().
- */
-static struct lu_context_key lu_global_key = {
- .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD |
- LCT_MG_THREAD | LCT_CL_THREAD | LCT_LOCAL,
- .lct_init = lu_global_key_init,
- .lct_fini = lu_global_key_fini
-};
-
-/**
- * Printer function emitting messages through libcfs_debug_msg().
- */
-int lu_cdebug_printer(const struct lu_env *env,
- void *cookie, const char *format, ...)
-{
- struct libcfs_debug_msg_data *msgdata = cookie;
- struct lu_cdebug_data *key;
- int used;
- int complete;
- va_list args;
-
- va_start(args, format);
-
- key = lu_context_key_get(&env->le_ctx, &lu_global_key);
-
- used = strlen(key->lck_area);
- complete = format[strlen(format) - 1] == '\n';
- /*
- * Append new chunk to the buffer.
- */
- vsnprintf(key->lck_area + used,
- ARRAY_SIZE(key->lck_area) - used, format, args);
- if (complete) {
- if (cfs_cdebug_show(msgdata->msg_mask, msgdata->msg_subsys))
- libcfs_debug_msg(msgdata, "%s\n", key->lck_area);
- key->lck_area[0] = 0;
- }
- va_end(args);
- return 0;
-}
-EXPORT_SYMBOL(lu_cdebug_printer);
-
-/**
- * Print object header.
- */
-void lu_object_header_print(const struct lu_env *env, void *cookie,
- lu_printer_t printer,
- const struct lu_object_header *hdr)
-{
- (*printer)(env, cookie, "header@%p[%#lx, %d, " DFID "%s%s%s]",
- hdr, hdr->loh_flags, atomic_read(&hdr->loh_ref),
- PFID(&hdr->loh_fid),
- hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
- list_empty((struct list_head *)&hdr->loh_lru) ? \
- "" : " lru",
- hdr->loh_attr & LOHA_EXISTS ? " exist":"");
-}
-EXPORT_SYMBOL(lu_object_header_print);
-
-/**
- * Print human readable representation of the \a o to the \a printer.
- */
-void lu_object_print(const struct lu_env *env, void *cookie,
- lu_printer_t printer, const struct lu_object *o)
-{
- static const char ruler[] = "........................................";
- struct lu_object_header *top;
- int depth = 4;
-
- top = o->lo_header;
- lu_object_header_print(env, cookie, printer, top);
- (*printer)(env, cookie, "{\n");
-
- list_for_each_entry(o, &top->loh_layers, lo_linkage) {
- /*
- * print `.' \a depth times followed by type name and address
- */
- (*printer)(env, cookie, "%*.*s%s@%p", depth, depth, ruler,
- o->lo_dev->ld_type->ldt_name, o);
-
- if (o->lo_ops->loo_object_print)
- (*o->lo_ops->loo_object_print)(env, cookie, printer, o);
-
- (*printer)(env, cookie, "\n");
- }
-
- (*printer)(env, cookie, "} header@%p\n", top);
-}
-EXPORT_SYMBOL(lu_object_print);
-
-/*
- * NOTE: htable_lookup() is called with the relevant
- * hash bucket locked, but might drop and re-acquire the lock.
- */
-static struct lu_object *htable_lookup(struct lu_site *s,
- struct cfs_hash_bd *bd,
- const struct lu_fid *f,
- __u64 *version)
-{
- struct lu_site_bkt_data *bkt;
- struct lu_object_header *h;
- struct hlist_node *hnode;
- u64 ver = cfs_hash_bd_version_get(bd);
-
- if (*version == ver)
- return ERR_PTR(-ENOENT);
-
- *version = ver;
- bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd);
- /* cfs_hash_bd_peek_locked is a somehow "internal" function
- * of cfs_hash, it doesn't add refcount on object.
- */
- hnode = cfs_hash_bd_peek_locked(s->ls_obj_hash, bd, (void *)f);
- if (!hnode) {
- lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
- return ERR_PTR(-ENOENT);
- }
-
- h = container_of(hnode, struct lu_object_header, loh_hash);
- cfs_hash_get(s->ls_obj_hash, hnode);
- lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
- if (!list_empty(&h->loh_lru)) {
- list_del_init(&h->loh_lru);
- percpu_counter_dec(&s->ls_lru_len_counter);
- }
- return lu_object_top(h);
-}
-
-/**
- * Search cache for an object with the fid \a f. If such object is found,
- * return it. Otherwise, create new object, insert it into cache and return
- * it. In any case, additional reference is acquired on the returned object.
- */
-static struct lu_object *lu_object_find(const struct lu_env *env,
- struct lu_device *dev,
- const struct lu_fid *f,
- const struct lu_object_conf *conf)
-{
- return lu_object_find_at(env, dev->ld_site->ls_top_dev, f, conf);
-}
-
-/*
- * Limit the lu_object cache to a maximum of lu_cache_nr objects. Because
- * the calculation for the number of objects to reclaim is not covered by
- * a lock the maximum number of objects is capped by LU_CACHE_MAX_ADJUST.
- * This ensures that many concurrent threads will not accidentally purge
- * the entire cache.
- */
-static void lu_object_limit(const struct lu_env *env, struct lu_device *dev)
-{
- __u64 size, nr;
-
- if (lu_cache_nr == LU_CACHE_NR_UNLIMITED)
- return;
-
- size = cfs_hash_size_get(dev->ld_site->ls_obj_hash);
- nr = (__u64)lu_cache_nr;
- if (size <= nr)
- return;
-
- lu_site_purge_objects(env, dev->ld_site,
- min_t(__u64, size - nr, LU_CACHE_NR_MAX_ADJUST),
- false);
-}
-
-/**
- * Core logic of lu_object_find*() functions.
- *
- * Much like lu_object_find(), but top level device of object is specifically
- * \a dev rather than top level device of the site. This interface allows
- * objects of different "stacking" to be created within the same site.
- */
-struct lu_object *lu_object_find_at(const struct lu_env *env,
- struct lu_device *dev,
- const struct lu_fid *f,
- const struct lu_object_conf *conf)
-{
- struct lu_object *o;
- struct lu_object *shadow;
- struct lu_site *s;
- struct cfs_hash *hs;
- struct cfs_hash_bd bd;
- __u64 version = 0;
-
- /*
- * This uses standard index maintenance protocol:
- *
- * - search index under lock, and return object if found;
- * - otherwise, unlock index, allocate new object;
- * - lock index and search again;
- * - if nothing is found (usual case), insert newly created
- * object into index;
- * - otherwise (race: other thread inserted object), free
- * object just allocated.
- * - unlock index;
- * - return object.
- *
- * For "LOC_F_NEW" case, we are sure the object is new established.
- * It is unnecessary to perform lookup-alloc-lookup-insert, instead,
- * just alloc and insert directly.
- *
- */
- s = dev->ld_site;
- hs = s->ls_obj_hash;
-
- cfs_hash_bd_get(hs, f, &bd);
- if (!(conf && conf->loc_flags & LOC_F_NEW)) {
- cfs_hash_bd_lock(hs, &bd, 1);
- o = htable_lookup(s, &bd, f, &version);
- cfs_hash_bd_unlock(hs, &bd, 1);
-
- if (!IS_ERR(o) || PTR_ERR(o) != -ENOENT)
- return o;
- }
- /*
- * Allocate new object. This may result in rather complicated
- * operations, including fld queries, inode loading, etc.
- */
- o = lu_object_alloc(env, dev, f, conf);
- if (IS_ERR(o))
- return o;
-
- LASSERT(lu_fid_eq(lu_object_fid(o), f));
-
- cfs_hash_bd_lock(hs, &bd, 1);
-
- if (conf && conf->loc_flags & LOC_F_NEW)
- shadow = ERR_PTR(-ENOENT);
- else
- shadow = htable_lookup(s, &bd, f, &version);
- if (likely(PTR_ERR(shadow) == -ENOENT)) {
- cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
- cfs_hash_bd_unlock(hs, &bd, 1);
-
- lu_object_limit(env, dev);
-
- return o;
- }
-
- lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE);
- cfs_hash_bd_unlock(hs, &bd, 1);
- lu_object_free(env, o);
- return shadow;
-}
-EXPORT_SYMBOL(lu_object_find_at);
-
-/**
- * Find object with given fid, and return its slice belonging to given device.
- */
-struct lu_object *lu_object_find_slice(const struct lu_env *env,
- struct lu_device *dev,
- const struct lu_fid *f,
- const struct lu_object_conf *conf)
-{
- struct lu_object *top;
- struct lu_object *obj;
-
- top = lu_object_find(env, dev, f, conf);
- if (IS_ERR(top))
- return top;
-
- obj = lu_object_locate(top->lo_header, dev->ld_type);
- if (unlikely(!obj)) {
- lu_object_put(env, top);
- obj = ERR_PTR(-ENOENT);
- }
-
- return obj;
-}
-EXPORT_SYMBOL(lu_object_find_slice);
-
-/**
- * Global list of all device types.
- */
-static LIST_HEAD(lu_device_types);
-
-int lu_device_type_init(struct lu_device_type *ldt)
-{
- int result = 0;
-
- atomic_set(&ldt->ldt_device_nr, 0);
- INIT_LIST_HEAD(&ldt->ldt_linkage);
- if (ldt->ldt_ops->ldto_init)
- result = ldt->ldt_ops->ldto_init(ldt);
-
- if (!result) {
- spin_lock(&obd_types_lock);
- list_add(&ldt->ldt_linkage, &lu_device_types);
- spin_unlock(&obd_types_lock);
- }
-
- return result;
-}
-EXPORT_SYMBOL(lu_device_type_init);
-
-void lu_device_type_fini(struct lu_device_type *ldt)
-{
- spin_lock(&obd_types_lock);
- list_del_init(&ldt->ldt_linkage);
- spin_unlock(&obd_types_lock);
- if (ldt->ldt_ops->ldto_fini)
- ldt->ldt_ops->ldto_fini(ldt);
-}
-EXPORT_SYMBOL(lu_device_type_fini);
-
-/**
- * Global list of all sites on this node
- */
-static LIST_HEAD(lu_sites);
-static DECLARE_RWSEM(lu_sites_guard);
-
-/**
- * Global environment used by site shrinker.
- */
-static struct lu_env lu_shrink_env;
-
-struct lu_site_print_arg {
- struct lu_env *lsp_env;
- void *lsp_cookie;
- lu_printer_t lsp_printer;
-};
-
-static int
-lu_site_obj_print(struct cfs_hash *hs, struct cfs_hash_bd *bd,
- struct hlist_node *hnode, void *data)
-{
- struct lu_site_print_arg *arg = (struct lu_site_print_arg *)data;
- struct lu_object_header *h;
-
- h = hlist_entry(hnode, struct lu_object_header, loh_hash);
- if (!list_empty(&h->loh_layers)) {
- const struct lu_object *o;
-
- o = lu_object_top(h);
- lu_object_print(arg->lsp_env, arg->lsp_cookie,
- arg->lsp_printer, o);
- } else {
- lu_object_header_print(arg->lsp_env, arg->lsp_cookie,
- arg->lsp_printer, h);
- }
- return 0;
-}
-
-/**
- * Print all objects in \a s.
- */
-void lu_site_print(const struct lu_env *env, struct lu_site *s, void *cookie,
- lu_printer_t printer)
-{
- struct lu_site_print_arg arg = {
- .lsp_env = (struct lu_env *)env,
- .lsp_cookie = cookie,
- .lsp_printer = printer,
- };
-
- cfs_hash_for_each(s->ls_obj_hash, lu_site_obj_print, &arg);
-}
-EXPORT_SYMBOL(lu_site_print);
-
-/**
- * Return desired hash table order.
- */
-static unsigned long lu_htable_order(struct lu_device *top)
-{
- unsigned long bits_max = LU_SITE_BITS_MAX;
- unsigned long cache_size;
- unsigned long bits;
-
- if (!strcmp(top->ld_type->ldt_name, LUSTRE_VVP_NAME))
- bits_max = LU_SITE_BITS_MAX_CL;
-
- /*
- * Calculate hash table size, assuming that we want reasonable
- * performance when 20% of total memory is occupied by cache of
- * lu_objects.
- *
- * Size of lu_object is (arbitrary) taken as 1K (together with inode).
- */
- cache_size = totalram_pages;
-
-#if BITS_PER_LONG == 32
- /* limit hashtable size for lowmem systems to low RAM */
- if (cache_size > 1 << (30 - PAGE_SHIFT))
- cache_size = 1 << (30 - PAGE_SHIFT) * 3 / 4;
-#endif
-
- /* clear off unreasonable cache setting. */
- if (lu_cache_percent == 0 || lu_cache_percent > LU_CACHE_PERCENT_MAX) {
- CWARN("obdclass: invalid lu_cache_percent: %u, it must be in the range of (0, %u]. Will use default value: %u.\n",
- lu_cache_percent, LU_CACHE_PERCENT_MAX,
- LU_CACHE_PERCENT_DEFAULT);
-
- lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
- }
- cache_size = cache_size / 100 * lu_cache_percent *
- (PAGE_SIZE / 1024);
-
- for (bits = 1; (1 << bits) < cache_size; ++bits)
- ;
- return clamp_t(typeof(bits), bits, LU_SITE_BITS_MIN, bits_max);
-}
-
-static unsigned int lu_obj_hop_hash(struct cfs_hash *hs,
- const void *key, unsigned int mask)
-{
- struct lu_fid *fid = (struct lu_fid *)key;
- __u32 hash;
-
- hash = fid_flatten32(fid);
- hash += (hash >> 4) + (hash << 12); /* mixing oid and seq */
- hash = hash_long(hash, hs->hs_bkt_bits);
-
- /* give me another random factor */
- hash -= hash_long((unsigned long)hs, fid_oid(fid) % 11 + 3);
-
- hash <<= hs->hs_cur_bits - hs->hs_bkt_bits;
- hash |= (fid_seq(fid) + fid_oid(fid)) & (CFS_HASH_NBKT(hs) - 1);
-
- return hash & mask;
-}
-
-static void *lu_obj_hop_object(struct hlist_node *hnode)
-{
- return hlist_entry(hnode, struct lu_object_header, loh_hash);
-}
-
-static void *lu_obj_hop_key(struct hlist_node *hnode)
-{
- struct lu_object_header *h;
-
- h = hlist_entry(hnode, struct lu_object_header, loh_hash);
- return &h->loh_fid;
-}
-
-static int lu_obj_hop_keycmp(const void *key, struct hlist_node *hnode)
-{
- struct lu_object_header *h;
-
- h = hlist_entry(hnode, struct lu_object_header, loh_hash);
- return lu_fid_eq(&h->loh_fid, (struct lu_fid *)key);
-}
-
-static void lu_obj_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
-{
- struct lu_object_header *h;
-
- h = hlist_entry(hnode, struct lu_object_header, loh_hash);
- atomic_inc(&h->loh_ref);
-}
-
-static void lu_obj_hop_put_locked(struct cfs_hash *hs, struct hlist_node *hnode)
-{
- LBUG(); /* we should never called it */
-}
-
-static struct cfs_hash_ops lu_site_hash_ops = {
- .hs_hash = lu_obj_hop_hash,
- .hs_key = lu_obj_hop_key,
- .hs_keycmp = lu_obj_hop_keycmp,
- .hs_object = lu_obj_hop_object,
- .hs_get = lu_obj_hop_get,
- .hs_put_locked = lu_obj_hop_put_locked,
-};
-
-static void lu_dev_add_linkage(struct lu_site *s, struct lu_device *d)
-{
- spin_lock(&s->ls_ld_lock);
- if (list_empty(&d->ld_linkage))
- list_add(&d->ld_linkage, &s->ls_ld_linkage);
- spin_unlock(&s->ls_ld_lock);
-}
-
-/**
- * Initialize site \a s, with \a d as the top level device.
- */
-int lu_site_init(struct lu_site *s, struct lu_device *top)
-{
- struct lu_site_bkt_data *bkt;
- struct cfs_hash_bd bd;
- unsigned long bits;
- unsigned long i;
- char name[16];
- int rc;
-
- memset(s, 0, sizeof(*s));
- mutex_init(&s->ls_purge_mutex);
-
- rc = percpu_counter_init(&s->ls_lru_len_counter, 0, GFP_NOFS);
- if (rc)
- return -ENOMEM;
-
- snprintf(name, sizeof(name), "lu_site_%s", top->ld_type->ldt_name);
- for (bits = lu_htable_order(top); bits >= LU_SITE_BITS_MIN; bits--) {
- s->ls_obj_hash = cfs_hash_create(name, bits, bits,
- bits - LU_SITE_BKT_BITS,
- sizeof(*bkt), 0, 0,
- &lu_site_hash_ops,
- CFS_HASH_SPIN_BKTLOCK |
- CFS_HASH_NO_ITEMREF |
- CFS_HASH_DEPTH |
- CFS_HASH_ASSERT_EMPTY |
- CFS_HASH_COUNTER);
- if (s->ls_obj_hash)
- break;
- }
-
- if (!s->ls_obj_hash) {
- CERROR("failed to create lu_site hash with bits: %lu\n", bits);
- return -ENOMEM;
- }
-
- cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
- bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
- INIT_LIST_HEAD(&bkt->lsb_lru);
- init_waitqueue_head(&bkt->lsb_marche_funebre);
- }
-
- s->ls_stats = lprocfs_alloc_stats(LU_SS_LAST_STAT, 0);
- if (!s->ls_stats) {
- cfs_hash_putref(s->ls_obj_hash);
- s->ls_obj_hash = NULL;
- return -ENOMEM;
- }
-
- lprocfs_counter_init(s->ls_stats, LU_SS_CREATED,
- 0, "created", "created");
- lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_HIT,
- 0, "cache_hit", "cache_hit");
- lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_MISS,
- 0, "cache_miss", "cache_miss");
- lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_RACE,
- 0, "cache_race", "cache_race");
- lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_DEATH_RACE,
- 0, "cache_death_race", "cache_death_race");
- lprocfs_counter_init(s->ls_stats, LU_SS_LRU_PURGED,
- 0, "lru_purged", "lru_purged");
-
- INIT_LIST_HEAD(&s->ls_linkage);
- s->ls_top_dev = top;
- top->ld_site = s;
- lu_device_get(top);
- lu_ref_add(&top->ld_reference, "site-top", s);
-
- INIT_LIST_HEAD(&s->ls_ld_linkage);
- spin_lock_init(&s->ls_ld_lock);
-
- lu_dev_add_linkage(s, top);
-
- return 0;
-}
-EXPORT_SYMBOL(lu_site_init);
-
-/**
- * Finalize \a s and release its resources.
- */
-void lu_site_fini(struct lu_site *s)
-{
- down_write(&lu_sites_guard);
- list_del_init(&s->ls_linkage);
- up_write(&lu_sites_guard);
-
- percpu_counter_destroy(&s->ls_lru_len_counter);
-
- if (s->ls_obj_hash) {
- cfs_hash_putref(s->ls_obj_hash);
- s->ls_obj_hash = NULL;
- }
-
- if (s->ls_top_dev) {
- s->ls_top_dev->ld_site = NULL;
- lu_ref_del(&s->ls_top_dev->ld_reference, "site-top", s);
- lu_device_put(s->ls_top_dev);
- s->ls_top_dev = NULL;
- }
-
- if (s->ls_stats)
- lprocfs_free_stats(&s->ls_stats);
-}
-EXPORT_SYMBOL(lu_site_fini);
-
-/**
- * Called when initialization of stack for this site is completed.
- */
-int lu_site_init_finish(struct lu_site *s)
-{
- int result;
-
- down_write(&lu_sites_guard);
- result = lu_context_refill(&lu_shrink_env.le_ctx);
- if (result == 0)
- list_add(&s->ls_linkage, &lu_sites);
- up_write(&lu_sites_guard);
- return result;
-}
-EXPORT_SYMBOL(lu_site_init_finish);
-
-/**
- * Acquire additional reference on device \a d
- */
-void lu_device_get(struct lu_device *d)
-{
- atomic_inc(&d->ld_ref);
-}
-EXPORT_SYMBOL(lu_device_get);
-
-/**
- * Release reference on device \a d.
- */
-void lu_device_put(struct lu_device *d)
-{
- LASSERT(atomic_read(&d->ld_ref) > 0);
- atomic_dec(&d->ld_ref);
-}
-EXPORT_SYMBOL(lu_device_put);
-
-/**
- * Initialize device \a d of type \a t.
- */
-int lu_device_init(struct lu_device *d, struct lu_device_type *t)
-{
- if (atomic_inc_return(&t->ldt_device_nr) == 1 &&
- t->ldt_ops->ldto_start)
- t->ldt_ops->ldto_start(t);
-
- memset(d, 0, sizeof(*d));
- atomic_set(&d->ld_ref, 0);
- d->ld_type = t;
- lu_ref_init(&d->ld_reference);
- INIT_LIST_HEAD(&d->ld_linkage);
- return 0;
-}
-EXPORT_SYMBOL(lu_device_init);
-
-/**
- * Finalize device \a d.
- */
-void lu_device_fini(struct lu_device *d)
-{
- struct lu_device_type *t = d->ld_type;
-
- if (d->ld_obd) {
- d->ld_obd->obd_lu_dev = NULL;
- d->ld_obd = NULL;
- }
-
- lu_ref_fini(&d->ld_reference);
- LASSERTF(atomic_read(&d->ld_ref) == 0,
- "Refcount is %u\n", atomic_read(&d->ld_ref));
- LASSERT(atomic_read(&t->ldt_device_nr) > 0);
-
- if (atomic_dec_and_test(&t->ldt_device_nr) &&
- t->ldt_ops->ldto_stop)
- t->ldt_ops->ldto_stop(t);
-}
-EXPORT_SYMBOL(lu_device_fini);
-
-/**
- * Initialize object \a o that is part of compound object \a h and was created
- * by device \a d.
- */
-int lu_object_init(struct lu_object *o, struct lu_object_header *h,
- struct lu_device *d)
-{
- memset(o, 0, sizeof(*o));
- o->lo_header = h;
- o->lo_dev = d;
- lu_device_get(d);
- lu_ref_add_at(&d->ld_reference, &o->lo_dev_ref, "lu_object", o);
- INIT_LIST_HEAD(&o->lo_linkage);
-
- return 0;
-}
-EXPORT_SYMBOL(lu_object_init);
-
-/**
- * Finalize object and release its resources.
- */
-void lu_object_fini(struct lu_object *o)
-{
- struct lu_device *dev = o->lo_dev;
-
- LASSERT(list_empty(&o->lo_linkage));
-
- if (dev) {
- lu_ref_del_at(&dev->ld_reference, &o->lo_dev_ref,
- "lu_object", o);
- lu_device_put(dev);
- o->lo_dev = NULL;
- }
-}
-EXPORT_SYMBOL(lu_object_fini);
-
-/**
- * Add object \a o as first layer of compound object \a h
- *
- * This is typically called by the ->ldo_object_alloc() method of top-level
- * device.
- */
-void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
-{
- list_move(&o->lo_linkage, &h->loh_layers);
-}
-EXPORT_SYMBOL(lu_object_add_top);
-
-/**
- * Add object \a o as a layer of compound object, going after \a before.
- *
- * This is typically called by the ->ldo_object_alloc() method of \a
- * before->lo_dev.
- */
-void lu_object_add(struct lu_object *before, struct lu_object *o)
-{
- list_move(&o->lo_linkage, &before->lo_linkage);
-}
-EXPORT_SYMBOL(lu_object_add);
-
-/**
- * Initialize compound object.
- */
-int lu_object_header_init(struct lu_object_header *h)
-{
- memset(h, 0, sizeof(*h));
- atomic_set(&h->loh_ref, 1);
- INIT_HLIST_NODE(&h->loh_hash);
- INIT_LIST_HEAD(&h->loh_lru);
- INIT_LIST_HEAD(&h->loh_layers);
- lu_ref_init(&h->loh_reference);
- return 0;
-}
-EXPORT_SYMBOL(lu_object_header_init);
-
-/**
- * Finalize compound object.
- */
-void lu_object_header_fini(struct lu_object_header *h)
-{
- LASSERT(list_empty(&h->loh_layers));
- LASSERT(list_empty(&h->loh_lru));
- LASSERT(hlist_unhashed(&h->loh_hash));
- lu_ref_fini(&h->loh_reference);
-}
-EXPORT_SYMBOL(lu_object_header_fini);
-
-/**
- * Given a compound object, find its slice, corresponding to the device type
- * \a dtype.
- */
-struct lu_object *lu_object_locate(struct lu_object_header *h,
- const struct lu_device_type *dtype)
-{
- struct lu_object *o;
-
- list_for_each_entry(o, &h->loh_layers, lo_linkage) {
- if (o->lo_dev->ld_type == dtype)
- return o;
- }
- return NULL;
-}
-EXPORT_SYMBOL(lu_object_locate);
-
-/**
- * Finalize and free devices in the device stack.
- *
- * Finalize device stack by purging object cache, and calling
- * lu_device_type_operations::ldto_device_fini() and
- * lu_device_type_operations::ldto_device_free() on all devices in the stack.
- */
-void lu_stack_fini(const struct lu_env *env, struct lu_device *top)
-{
- struct lu_site *site = top->ld_site;
- struct lu_device *scan;
- struct lu_device *next;
-
- lu_site_purge(env, site, ~0);
- for (scan = top; scan; scan = next) {
- next = scan->ld_type->ldt_ops->ldto_device_fini(env, scan);
- lu_ref_del(&scan->ld_reference, "lu-stack", &lu_site_init);
- lu_device_put(scan);
- }
-
- /* purge again. */
- lu_site_purge(env, site, ~0);
-
- for (scan = top; scan; scan = next) {
- const struct lu_device_type *ldt = scan->ld_type;
- struct obd_type *type;
-
- next = ldt->ldt_ops->ldto_device_free(env, scan);
- type = ldt->ldt_obd_type;
- if (type) {
- type->typ_refcnt--;
- class_put_type(type);
- }
- }
-}
-
-enum {
- /**
- * Maximal number of tld slots.
- */
- LU_CONTEXT_KEY_NR = 40
-};
-
-static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
-
-static DEFINE_RWLOCK(lu_keys_guard);
-static atomic_t lu_key_initing_cnt = ATOMIC_INIT(0);
-
-/**
- * Global counter incremented whenever key is registered, unregistered,
- * revived or quiesced. This is used to void unnecessary calls to
- * lu_context_refill(). No locking is provided, as initialization and shutdown
- * are supposed to be externally serialized.
- */
-static unsigned int key_set_version;
-
-/**
- * Register new key.
- */
-int lu_context_key_register(struct lu_context_key *key)
-{
- int result;
- unsigned int i;
-
- LASSERT(key->lct_init);
- LASSERT(key->lct_fini);
- LASSERT(key->lct_tags != 0);
-
- result = -ENFILE;
- write_lock(&lu_keys_guard);
- for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
- if (!lu_keys[i]) {
- key->lct_index = i;
- atomic_set(&key->lct_used, 1);
- lu_keys[i] = key;
- lu_ref_init(&key->lct_reference);
- result = 0;
- ++key_set_version;
- break;
- }
- }
- write_unlock(&lu_keys_guard);
- return result;
-}
-EXPORT_SYMBOL(lu_context_key_register);
-
-static void key_fini(struct lu_context *ctx, int index)
-{
- if (ctx->lc_value && ctx->lc_value[index]) {
- struct lu_context_key *key;
-
- key = lu_keys[index];
- LASSERT(atomic_read(&key->lct_used) > 1);
-
- key->lct_fini(ctx, key, ctx->lc_value[index]);
- lu_ref_del(&key->lct_reference, "ctx", ctx);
- atomic_dec(&key->lct_used);
-
- if ((ctx->lc_tags & LCT_NOREF) == 0)
- module_put(key->lct_owner);
- ctx->lc_value[index] = NULL;
- }
-}
-
-/**
- * Deregister key.
- */
-void lu_context_key_degister(struct lu_context_key *key)
-{
- LASSERT(atomic_read(&key->lct_used) >= 1);
- LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
-
- lu_context_key_quiesce(key);
-
- write_lock(&lu_keys_guard);
- ++key_set_version;
- key_fini(&lu_shrink_env.le_ctx, key->lct_index);
-
- /**
- * Wait until all transient contexts referencing this key have
- * run lu_context_key::lct_fini() method.
- */
- while (atomic_read(&key->lct_used) > 1) {
- write_unlock(&lu_keys_guard);
- CDEBUG(D_INFO, "%s: \"%s\" %p, %d\n",
- __func__, module_name(key->lct_owner),
- key, atomic_read(&key->lct_used));
- schedule();
- write_lock(&lu_keys_guard);
- }
- if (lu_keys[key->lct_index]) {
- lu_keys[key->lct_index] = NULL;
- lu_ref_fini(&key->lct_reference);
- }
- write_unlock(&lu_keys_guard);
-
- LASSERTF(atomic_read(&key->lct_used) == 1,
- "key has instances: %d\n",
- atomic_read(&key->lct_used));
-}
-EXPORT_SYMBOL(lu_context_key_degister);
-
-/**
- * Register a number of keys. This has to be called after all keys have been
- * initialized by a call to LU_CONTEXT_KEY_INIT().
- */
-int lu_context_key_register_many(struct lu_context_key *k, ...)
-{
- struct lu_context_key *key = k;
- va_list args;
- int result;
-
- va_start(args, k);
- do {
- result = lu_context_key_register(key);
- if (result)
- break;
- key = va_arg(args, struct lu_context_key *);
- } while (key);
- va_end(args);
-
- if (result != 0) {
- va_start(args, k);
- while (k != key) {
- lu_context_key_degister(k);
- k = va_arg(args, struct lu_context_key *);
- }
- va_end(args);
- }
-
- return result;
-}
-EXPORT_SYMBOL(lu_context_key_register_many);
-
-/**
- * De-register a number of keys. This is a dual to
- * lu_context_key_register_many().
- */
-void lu_context_key_degister_many(struct lu_context_key *k, ...)
-{
- va_list args;
-
- va_start(args, k);
- do {
- lu_context_key_degister(k);
- k = va_arg(args, struct lu_context_key*);
- } while (k);
- va_end(args);
-}
-EXPORT_SYMBOL(lu_context_key_degister_many);
-
-/**
- * Revive a number of keys.
- */
-void lu_context_key_revive_many(struct lu_context_key *k, ...)
-{
- va_list args;
-
- va_start(args, k);
- do {
- lu_context_key_revive(k);
- k = va_arg(args, struct lu_context_key*);
- } while (k);
- va_end(args);
-}
-EXPORT_SYMBOL(lu_context_key_revive_many);
-
-/**
- * Quiescent a number of keys.
- */
-void lu_context_key_quiesce_many(struct lu_context_key *k, ...)
-{
- va_list args;
-
- va_start(args, k);
- do {
- lu_context_key_quiesce(k);
- k = va_arg(args, struct lu_context_key*);
- } while (k);
- va_end(args);
-}
-EXPORT_SYMBOL(lu_context_key_quiesce_many);
-
-/**
- * Return value associated with key \a key in context \a ctx.
- */
-void *lu_context_key_get(const struct lu_context *ctx,
- const struct lu_context_key *key)
-{
- LINVRNT(ctx->lc_state == LCS_ENTERED);
- LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
- LASSERT(lu_keys[key->lct_index] == key);
- return ctx->lc_value[key->lct_index];
-}
-EXPORT_SYMBOL(lu_context_key_get);
-
-/**
- * List of remembered contexts. XXX document me.
- */
-static LIST_HEAD(lu_context_remembered);
-
-/**
- * Destroy \a key in all remembered contexts. This is used to destroy key
- * values in "shared" contexts (like service threads), when a module owning
- * the key is about to be unloaded.
- */
-void lu_context_key_quiesce(struct lu_context_key *key)
-{
- struct lu_context *ctx;
-
- if (!(key->lct_tags & LCT_QUIESCENT)) {
- /*
- * XXX memory barrier has to go here.
- */
- write_lock(&lu_keys_guard);
- key->lct_tags |= LCT_QUIESCENT;
-
- /**
- * Wait until all lu_context_key::lct_init() methods
- * have completed.
- */
- while (atomic_read(&lu_key_initing_cnt) > 0) {
- write_unlock(&lu_keys_guard);
- CDEBUG(D_INFO, "%s: \"%s\" %p, %d (%d)\n",
- __func__,
- module_name(key->lct_owner),
- key, atomic_read(&key->lct_used),
- atomic_read(&lu_key_initing_cnt));
- schedule();
- write_lock(&lu_keys_guard);
- }
-
- list_for_each_entry(ctx, &lu_context_remembered, lc_remember)
- key_fini(ctx, key->lct_index);
-
- ++key_set_version;
- write_unlock(&lu_keys_guard);
- }
-}
-
-void lu_context_key_revive(struct lu_context_key *key)
-{
- write_lock(&lu_keys_guard);
- key->lct_tags &= ~LCT_QUIESCENT;
- ++key_set_version;
- write_unlock(&lu_keys_guard);
-}
-
-static void keys_fini(struct lu_context *ctx)
-{
- unsigned int i;
-
- if (!ctx->lc_value)
- return;
-
- for (i = 0; i < ARRAY_SIZE(lu_keys); ++i)
- key_fini(ctx, i);
-
- kfree(ctx->lc_value);
- ctx->lc_value = NULL;
-}
-
-static int keys_fill(struct lu_context *ctx)
-{
- unsigned int pre_version;
- unsigned int i;
-
- /*
- * A serialisation with lu_context_key_quiesce() is needed, but some
- * "key->lct_init()" are calling kernel memory allocation routine and
- * can't be called while holding a spin_lock.
- * "lu_keys_guard" is held while incrementing "lu_key_initing_cnt"
- * to ensure the start of the serialisation.
- * An atomic_t variable is still used, in order not to reacquire the
- * lock when decrementing the counter.
- */
- read_lock(&lu_keys_guard);
- atomic_inc(&lu_key_initing_cnt);
- pre_version = key_set_version;
- read_unlock(&lu_keys_guard);
-
-refill:
- LINVRNT(ctx->lc_value);
- for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
- struct lu_context_key *key;
-
- key = lu_keys[i];
- if (!ctx->lc_value[i] && key &&
- (key->lct_tags & ctx->lc_tags) &&
- /*
- * Don't create values for a LCT_QUIESCENT key, as this
- * will pin module owning a key.
- */
- !(key->lct_tags & LCT_QUIESCENT)) {
- void *value;
-
- LINVRNT(key->lct_init);
- LINVRNT(key->lct_index == i);
-
- if (!(ctx->lc_tags & LCT_NOREF) &&
- !try_module_get(key->lct_owner)) {
- /* module is unloading, skip this key */
- continue;
- }
-
- value = key->lct_init(ctx, key);
- if (unlikely(IS_ERR(value))) {
- atomic_dec(&lu_key_initing_cnt);
- return PTR_ERR(value);
- }
-
- lu_ref_add_atomic(&key->lct_reference, "ctx", ctx);
- atomic_inc(&key->lct_used);
- /*
- * This is the only place in the code, where an
- * element of ctx->lc_value[] array is set to non-NULL
- * value.
- */
- ctx->lc_value[i] = value;
- if (key->lct_exit)
- ctx->lc_tags |= LCT_HAS_EXIT;
- }
- }
-
- read_lock(&lu_keys_guard);
- if (pre_version != key_set_version) {
- pre_version = key_set_version;
- read_unlock(&lu_keys_guard);
- goto refill;
- }
- ctx->lc_version = key_set_version;
- atomic_dec(&lu_key_initing_cnt);
- read_unlock(&lu_keys_guard);
- return 0;
-}
-
-static int keys_init(struct lu_context *ctx)
-{
- ctx->lc_value = kcalloc(ARRAY_SIZE(lu_keys), sizeof(ctx->lc_value[0]),
- GFP_NOFS);
- if (likely(ctx->lc_value))
- return keys_fill(ctx);
-
- return -ENOMEM;
-}
-
-/**
- * Initialize context data-structure. Create values for all keys.
- */
-int lu_context_init(struct lu_context *ctx, __u32 tags)
-{
- int rc;
-
- memset(ctx, 0, sizeof(*ctx));
- ctx->lc_state = LCS_INITIALIZED;
- ctx->lc_tags = tags;
- if (tags & LCT_REMEMBER) {
- write_lock(&lu_keys_guard);
- list_add(&ctx->lc_remember, &lu_context_remembered);
- write_unlock(&lu_keys_guard);
- } else {
- INIT_LIST_HEAD(&ctx->lc_remember);
- }
-
- rc = keys_init(ctx);
- if (rc != 0)
- lu_context_fini(ctx);
-
- return rc;
-}
-EXPORT_SYMBOL(lu_context_init);
-
-/**
- * Finalize context data-structure. Destroy key values.
- */
-void lu_context_fini(struct lu_context *ctx)
-{
- LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
- ctx->lc_state = LCS_FINALIZED;
-
- if ((ctx->lc_tags & LCT_REMEMBER) == 0) {
- LASSERT(list_empty(&ctx->lc_remember));
- keys_fini(ctx);
-
- } else { /* could race with key degister */
- write_lock(&lu_keys_guard);
- keys_fini(ctx);
- list_del_init(&ctx->lc_remember);
- write_unlock(&lu_keys_guard);
- }
-}
-EXPORT_SYMBOL(lu_context_fini);
-
-/**
- * Called before entering context.
- */
-void lu_context_enter(struct lu_context *ctx)
-{
- LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
- ctx->lc_state = LCS_ENTERED;
-}
-EXPORT_SYMBOL(lu_context_enter);
-
-/**
- * Called after exiting from \a ctx
- */
-void lu_context_exit(struct lu_context *ctx)
-{
- unsigned int i;
-
- LINVRNT(ctx->lc_state == LCS_ENTERED);
- ctx->lc_state = LCS_LEFT;
- if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value) {
- /* could race with key quiescency */
- if (ctx->lc_tags & LCT_REMEMBER)
- read_lock(&lu_keys_guard);
-
- for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
- if (ctx->lc_value[i]) {
- struct lu_context_key *key;
-
- key = lu_keys[i];
- if (key->lct_exit)
- key->lct_exit(ctx,
- key, ctx->lc_value[i]);
- }
- }
-
- if (ctx->lc_tags & LCT_REMEMBER)
- read_unlock(&lu_keys_guard);
- }
-}
-EXPORT_SYMBOL(lu_context_exit);
-
-/**
- * Allocate for context all missing keys that were registered after context
- * creation. key_set_version is only changed in rare cases when modules
- * are loaded and removed.
- */
-int lu_context_refill(struct lu_context *ctx)
-{
- read_lock(&lu_keys_guard);
- if (likely(ctx->lc_version == key_set_version)) {
- read_unlock(&lu_keys_guard);
- return 0;
- }
-
- read_unlock(&lu_keys_guard);
- return keys_fill(ctx);
-}
-
-/**
- * lu_ctx_tags/lu_ses_tags will be updated if there are new types of
- * obd being added. Currently, this is only used on client side, specifically
- * for echo device client, for other stack (like ptlrpc threads), context are
- * predefined when the lu_device type are registered, during the module probe
- * phase.
- */
-__u32 lu_context_tags_default;
-__u32 lu_session_tags_default;
-
-int lu_env_init(struct lu_env *env, __u32 tags)
-{
- int result;
-
- env->le_ses = NULL;
- result = lu_context_init(&env->le_ctx, tags);
- if (likely(result == 0))
- lu_context_enter(&env->le_ctx);
- return result;
-}
-EXPORT_SYMBOL(lu_env_init);
-
-void lu_env_fini(struct lu_env *env)
-{
- lu_context_exit(&env->le_ctx);
- lu_context_fini(&env->le_ctx);
- env->le_ses = NULL;
-}
-EXPORT_SYMBOL(lu_env_fini);
-
-int lu_env_refill(struct lu_env *env)
-{
- int result;
-
- result = lu_context_refill(&env->le_ctx);
- if (result == 0 && env->le_ses)
- result = lu_context_refill(env->le_ses);
- return result;
-}
-EXPORT_SYMBOL(lu_env_refill);
-
-struct lu_site_stats {
- unsigned int lss_populated;
- unsigned int lss_max_search;
- unsigned int lss_total;
- unsigned int lss_busy;
-};
-
-static void lu_site_stats_get(const struct lu_site *s,
- struct lu_site_stats *stats, int populated)
-{
- struct cfs_hash *hs = s->ls_obj_hash;
- struct cfs_hash_bd bd;
- unsigned int i;
- /*
- * percpu_counter_sum_positive() won't accept a const pointer
- * as it does modify the struct by taking a spinlock
- */
- struct lu_site *s2 = (struct lu_site *)s;
-
- stats->lss_busy += cfs_hash_size_get(hs) -
- percpu_counter_sum_positive(&s2->ls_lru_len_counter);
- cfs_hash_for_each_bucket(hs, &bd, i) {
- struct hlist_head *hhead;
-
- cfs_hash_bd_lock(hs, &bd, 1);
- stats->lss_total += cfs_hash_bd_count_get(&bd);
- stats->lss_max_search = max((int)stats->lss_max_search,
- cfs_hash_bd_depmax_get(&bd));
- if (!populated) {
- cfs_hash_bd_unlock(hs, &bd, 1);
- continue;
- }
-
- cfs_hash_bd_for_each_hlist(hs, &bd, hhead) {
- if (!hlist_empty(hhead))
- stats->lss_populated++;
- }
- cfs_hash_bd_unlock(hs, &bd, 1);
- }
-}
-
-/*
- * lu_cache_shrink_count() returns an approximate number of cached objects
- * that can be freed by shrink_slab(). A counter, which tracks the
- * number of items in the site's lru, is maintained in a percpu_counter
- * for each site. The percpu values are incremented and decremented as
- * objects are added or removed from the lru. The percpu values are summed
- * and saved whenever a percpu value exceeds a threshold. Thus the saved,
- * summed value at any given time may not accurately reflect the current
- * lru length. But this value is sufficiently accurate for the needs of
- * a shrinker.
- *
- * Using a per cpu counter is a compromise solution to concurrent access:
- * lu_object_put() can update the counter without locking the site and
- * lu_cache_shrink_count can sum the counters without locking each
- * ls_obj_hash bucket.
- */
-static unsigned long lu_cache_shrink_count(struct shrinker *sk,
- struct shrink_control *sc)
-{
- struct lu_site *s;
- struct lu_site *tmp;
- unsigned long cached = 0;
-
- if (!(sc->gfp_mask & __GFP_FS))
- return 0;
-
- down_read(&lu_sites_guard);
- list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage)
- cached += percpu_counter_read_positive(&s->ls_lru_len_counter);
- up_read(&lu_sites_guard);
-
- cached = (cached / 100) * sysctl_vfs_cache_pressure;
- CDEBUG(D_INODE, "%ld objects cached, cache pressure %d\n",
- cached, sysctl_vfs_cache_pressure);
-
- return cached;
-}
-
-static unsigned long lu_cache_shrink_scan(struct shrinker *sk,
- struct shrink_control *sc)
-{
- struct lu_site *s;
- struct lu_site *tmp;
- unsigned long remain = sc->nr_to_scan, freed = 0;
- LIST_HEAD(splice);
-
- if (!(sc->gfp_mask & __GFP_FS))
- /* We must not take the lu_sites_guard lock when
- * __GFP_FS is *not* set because of the deadlock
- * possibility detailed above. Additionally,
- * since we cannot determine the number of
- * objects in the cache without taking this
- * lock, we're in a particularly tough spot. As
- * a result, we'll just lie and say our cache is
- * empty. This _should_ be ok, as we can't
- * reclaim objects when __GFP_FS is *not* set
- * anyways.
- */
- return SHRINK_STOP;
-
- down_write(&lu_sites_guard);
- list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
- freed = lu_site_purge(&lu_shrink_env, s, remain);
- remain -= freed;
- /*
- * Move just shrunk site to the tail of site list to
- * assure shrinking fairness.
- */
- list_move_tail(&s->ls_linkage, &splice);
- }
- list_splice(&splice, lu_sites.prev);
- up_write(&lu_sites_guard);
-
- return sc->nr_to_scan - remain;
-}
-
-/**
- * Debugging printer function using printk().
- */
-static struct shrinker lu_site_shrinker = {
- .count_objects = lu_cache_shrink_count,
- .scan_objects = lu_cache_shrink_scan,
- .seeks = DEFAULT_SEEKS,
-};
-
-/**
- * Initialization of global lu_* data.
- */
-int lu_global_init(void)
-{
- int result;
-
- CDEBUG(D_INFO, "Lustre LU module (%p).\n", &lu_keys);
-
- result = lu_ref_global_init();
- if (result != 0)
- return result;
-
- LU_CONTEXT_KEY_INIT(&lu_global_key);
- result = lu_context_key_register(&lu_global_key);
- if (result != 0) {
- lu_ref_global_fini();
- return result;
- }
-
- /*
- * At this level, we don't know what tags are needed, so allocate them
- * conservatively. This should not be too bad, because this
- * environment is global.
- */
- down_write(&lu_sites_guard);
- result = lu_env_init(&lu_shrink_env, LCT_SHRINKER);
- up_write(&lu_sites_guard);
- if (result != 0) {
- lu_context_key_degister(&lu_global_key);
- lu_ref_global_fini();
- return result;
- }
-
- /*
- * seeks estimation: 3 seeks to read a record from oi, one to read
- * inode, one for ea. Unfortunately setting this high value results in
- * lu_object/inode cache consuming all the memory.
- */
- result = register_shrinker(&lu_site_shrinker);
- if (result != 0) {
- /* Order explained in lu_global_fini(). */
- lu_context_key_degister(&lu_global_key);
-
- down_write(&lu_sites_guard);
- lu_env_fini(&lu_shrink_env);
- up_write(&lu_sites_guard);
-
- lu_ref_global_fini();
- return result;
- }
-
- return 0;
-}
-
-/**
- * Dual to lu_global_init().
- */
-void lu_global_fini(void)
-{
- unregister_shrinker(&lu_site_shrinker);
- lu_context_key_degister(&lu_global_key);
-
- /*
- * Tear shrinker environment down _after_ de-registering
- * lu_global_key, because the latter has a value in the former.
- */
- down_write(&lu_sites_guard);
- lu_env_fini(&lu_shrink_env);
- up_write(&lu_sites_guard);
-
- lu_ref_global_fini();
-}
-
-static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx)
-{
- struct lprocfs_counter ret;
-
- lprocfs_stats_collect(stats, idx, &ret);
- return (__u32)ret.lc_count;
-}
-
-/**
- * Output site statistical counters into a buffer. Suitable for
- * lprocfs_rd_*()-style functions.
- */
-int lu_site_stats_print(const struct lu_site *s, struct seq_file *m)
-{
- struct lu_site_stats stats;
-
- memset(&stats, 0, sizeof(stats));
- lu_site_stats_get(s, &stats, 1);
-
- seq_printf(m, "%d/%d %d/%ld %d %d %d %d %d %d %d\n",
- stats.lss_busy,
- stats.lss_total,
- stats.lss_populated,
- CFS_HASH_NHLIST(s->ls_obj_hash),
- stats.lss_max_search,
- ls_stats_read(s->ls_stats, LU_SS_CREATED),
- ls_stats_read(s->ls_stats, LU_SS_CACHE_HIT),
- ls_stats_read(s->ls_stats, LU_SS_CACHE_MISS),
- ls_stats_read(s->ls_stats, LU_SS_CACHE_RACE),
- ls_stats_read(s->ls_stats, LU_SS_CACHE_DEATH_RACE),
- ls_stats_read(s->ls_stats, LU_SS_LRU_PURGED));
- return 0;
-}
-EXPORT_SYMBOL(lu_site_stats_print);
-
-/**
- * Helper function to initialize a number of kmem slab caches at once.
- */
-int lu_kmem_init(struct lu_kmem_descr *caches)
-{
- int result;
- struct lu_kmem_descr *iter = caches;
-
- for (result = 0; iter->ckd_cache; ++iter) {
- *iter->ckd_cache = kmem_cache_create(iter->ckd_name,
- iter->ckd_size,
- 0, 0, NULL);
- if (!*iter->ckd_cache) {
- result = -ENOMEM;
- /* free all previously allocated caches */
- lu_kmem_fini(caches);
- break;
- }
- }
- return result;
-}
-EXPORT_SYMBOL(lu_kmem_init);
-
-/**
- * Helper function to finalize a number of kmem slab cached at once. Dual to
- * lu_kmem_init().
- */
-void lu_kmem_fini(struct lu_kmem_descr *caches)
-{
- for (; caches->ckd_cache; ++caches) {
- kmem_cache_destroy(*caches->ckd_cache);
- *caches->ckd_cache = NULL;
- }
-}
-EXPORT_SYMBOL(lu_kmem_fini);