aboutsummaryrefslogtreecommitdiffstats
path: root/drivers/staging/lustre/lustre/ptlrpc/ptlrpcd.c
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2018-06-09 10:32:39 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2018-06-09 10:32:39 -0700
commiteafdca4d7010a0e019aaaace3dd71b432a69b54c (patch)
tree0206168276ece10426dbbef7b3de7e8d84c8674d /drivers/staging/lustre/lustre/ptlrpc/ptlrpcd.c
parentMerge tag 'libnvdimm-for-4.18' of git://git.kernel.org/pub/scm/linux/kernel/git/nvdimm/nvdimm (diff)
parentstaging: ipx: delete it from the tree (diff)
downloadlinux-dev-eafdca4d7010a0e019aaaace3dd71b432a69b54c.tar.xz
linux-dev-eafdca4d7010a0e019aaaace3dd71b432a69b54c.zip
Merge tag 'staging-4.18-rc1' of git://git.kernel.org/pub/scm/linux/kernel/git/gregkh/staging
Pull staging/IIO updates from Greg KH: "Here is the big staging and IIO driver update for 4.18-rc1. It was delayed as I wanted to make sure the final driver deletions did not cause any major merge issues, and all now looks good. There are a lot of patches here, just over 1000. The diffstat summary shows the major changes here: 1007 files changed, 16828 insertions(+), 227770 deletions(-) Because of this, we might be close to shrinking the overall kernel source code size for two releases in a row. There was loads of work in this release cycle, primarily: - tons of ks7010 driver cleanups - lots of mt7621 driver fixes and cleanups - most driver cleanups - wilc1000 fixes and cleanups - lots and lots of IIO driver cleanups and new additions - debugfs cleanups for all staging drivers - lots of other staging driver cleanups and fixes, the shortlog has the full details. but the big user-visable things here are the removal of 3 chunks of code: - ncpfs and ipx were removed on schedule, no one has cared about this code since it moved to staging last year, and if it needs to come back, it can be reverted. - lustre file system is removed. I've ranted at the lustre developers about once a year for the past 5 years, with no real forward progress at all to clean things up and get the code into the "real" part of the kernel. Given that the lustre developers continue to work on an external tree and try to port those changes to the in-kernel tree every once in a while, this whole thing really really is not working out at all. So I'm deleting it so that the developers can spend the time working in their out-of-tree location and get things cleaned up properly to get merged into the tree correctly at a later date. Because of these file removals, you will have merge issues on some of these files (2 in the ipx code, 1 in the ncpfs code, and 1 in the atomisp driver). Just delete those files, it's a simple merge :) All of this has been in linux-next for a while with no reported problems" * tag 'staging-4.18-rc1' of git://git.kernel.org/pub/scm/linux/kernel/git/gregkh/staging: (1011 commits) staging: ipx: delete it from the tree ncpfs: remove uapi .h files ncpfs: remove Documentation ncpfs: remove compat functionality staging: ncpfs: delete it staging: lustre: delete the filesystem from the tree. staging: vc04_services: no need to save the log debufs dentries staging: vc04_services: vchiq_debugfs_log_entry can be a void * staging: vc04_services: remove struct vchiq_debugfs_info staging: vc04_services: move client dbg directory into static variable staging: vc04_services: remove odd vchiq_debugfs_top() wrapper staging: vc04_services: no need to check debugfs return values staging: mt7621-gpio: reorder includes alphabetically staging: mt7621-gpio: change gc_map to don't use pointers staging: mt7621-gpio: use GPIOF_DIR_OUT and GPIOF_DIR_IN macros instead of custom values staging: mt7621-gpio: change 'to_mediatek_gpio' to make just a one line return staging: mt7621-gpio: dt-bindings: update documentation for #interrupt-cells property staging: mt7621-gpio: update #interrupt-cells for the gpio node staging: mt7621-gpio: dt-bindings: complete documentation for the gpio staging: mt7621-dts: add missing properties to gpio node ...
Diffstat (limited to 'drivers/staging/lustre/lustre/ptlrpc/ptlrpcd.c')
-rw-r--r--drivers/staging/lustre/lustre/ptlrpc/ptlrpcd.c912
1 files changed, 0 insertions, 912 deletions
diff --git a/drivers/staging/lustre/lustre/ptlrpc/ptlrpcd.c b/drivers/staging/lustre/lustre/ptlrpc/ptlrpcd.c
deleted file mode 100644
index c0fa13942bd8..000000000000
--- a/drivers/staging/lustre/lustre/ptlrpc/ptlrpcd.c
+++ /dev/null
@@ -1,912 +0,0 @@
-// SPDX-License-Identifier: GPL-2.0
-/*
- * GPL HEADER START
- *
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 only,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * General Public License version 2 for more details (a copy is included
- * in the LICENSE file that accompanied this code).
- *
- * You should have received a copy of the GNU General Public License
- * version 2 along with this program; If not, see
- * http://www.gnu.org/licenses/gpl-2.0.html
- *
- * GPL HEADER END
- */
-/*
- * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
- * Use is subject to license terms.
- *
- * Copyright (c) 2011, 2015, Intel Corporation.
- */
-/*
- * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
- *
- * lustre/ptlrpc/ptlrpcd.c
- */
-
-/** \defgroup ptlrpcd PortalRPC daemon
- *
- * ptlrpcd is a special thread with its own set where other user might add
- * requests when they don't want to wait for their completion.
- * PtlRPCD will take care of sending such requests and then processing their
- * replies and calling completion callbacks as necessary.
- * The callbacks are called directly from ptlrpcd context.
- * It is important to never significantly block (esp. on RPCs!) within such
- * completion handler or a deadlock might occur where ptlrpcd enters some
- * callback that attempts to send another RPC and wait for it to return,
- * during which time ptlrpcd is completely blocked, so e.g. if import
- * fails, recovery cannot progress because connection requests are also
- * sent by ptlrpcd.
- *
- * @{
- */
-
-#define DEBUG_SUBSYSTEM S_RPC
-
-#include <linux/libcfs/libcfs.h>
-
-#include <lustre_net.h>
-#include <lustre_lib.h>
-#include <lustre_ha.h>
-#include <obd_class.h> /* for obd_zombie */
-#include <obd_support.h> /* for OBD_FAIL_CHECK */
-#include <cl_object.h> /* cl_env_{get,put}() */
-#include <lprocfs_status.h>
-
-#include "ptlrpc_internal.h"
-
-/* One of these per CPT. */
-struct ptlrpcd {
- int pd_size;
- int pd_index;
- int pd_cpt;
- int pd_cursor;
- int pd_nthreads;
- int pd_groupsize;
- struct ptlrpcd_ctl pd_threads[0];
-};
-
-/*
- * max_ptlrpcds is obsolete, but retained to ensure that the kernel
- * module will load on a system where it has been tuned.
- * A value other than 0 implies it was tuned, in which case the value
- * is used to derive a setting for ptlrpcd_per_cpt_max.
- */
-static int max_ptlrpcds;
-module_param(max_ptlrpcds, int, 0644);
-MODULE_PARM_DESC(max_ptlrpcds,
- "Max ptlrpcd thread count to be started (obsolete).");
-
-/*
- * ptlrpcd_bind_policy is obsolete, but retained to ensure that
- * the kernel module will load on a system where it has been tuned.
- * A value other than 0 implies it was tuned, in which case the value
- * is used to derive a setting for ptlrpcd_partner_group_size.
- */
-static int ptlrpcd_bind_policy;
-module_param(ptlrpcd_bind_policy, int, 0644);
-MODULE_PARM_DESC(ptlrpcd_bind_policy,
- "Ptlrpcd threads binding mode (obsolete).");
-
-/*
- * ptlrpcd_per_cpt_max: The maximum number of ptlrpcd threads to run
- * in a CPT.
- */
-static int ptlrpcd_per_cpt_max;
-module_param(ptlrpcd_per_cpt_max, int, 0644);
-MODULE_PARM_DESC(ptlrpcd_per_cpt_max,
- "Max ptlrpcd thread count to be started per CPT.");
-
-/*
- * ptlrpcd_partner_group_size: The desired number of threads in each
- * ptlrpcd partner thread group. Default is 2, corresponding to the
- * old PDB_POLICY_PAIR. A negative value makes all ptlrpcd threads in
- * a CPT partners of each other.
- */
-static int ptlrpcd_partner_group_size;
-module_param(ptlrpcd_partner_group_size, int, 0644);
-MODULE_PARM_DESC(ptlrpcd_partner_group_size,
- "Number of ptlrpcd threads in a partner group.");
-
-/*
- * ptlrpcd_cpts: A CPT string describing the CPU partitions that
- * ptlrpcd threads should run on. Used to make ptlrpcd threads run on
- * a subset of all CPTs.
- *
- * ptlrpcd_cpts=2
- * ptlrpcd_cpts=[2]
- * run ptlrpcd threads only on CPT 2.
- *
- * ptlrpcd_cpts=0-3
- * ptlrpcd_cpts=[0-3]
- * run ptlrpcd threads on CPTs 0, 1, 2, and 3.
- *
- * ptlrpcd_cpts=[0-3,5,7]
- * run ptlrpcd threads on CPTS 0, 1, 2, 3, 5, and 7.
- */
-static char *ptlrpcd_cpts;
-module_param(ptlrpcd_cpts, charp, 0644);
-MODULE_PARM_DESC(ptlrpcd_cpts,
- "CPU partitions ptlrpcd threads should run in");
-
-/* ptlrpcds_cpt_idx maps cpt numbers to an index in the ptlrpcds array. */
-static int *ptlrpcds_cpt_idx;
-
-/* ptlrpcds_num is the number of entries in the ptlrpcds array. */
-static int ptlrpcds_num;
-static struct ptlrpcd **ptlrpcds;
-
-/*
- * In addition to the regular thread pool above, there is a single
- * global recovery thread. Recovery isn't critical for performance,
- * and doesn't block, but must always be able to proceed, and it is
- * possible that all normal ptlrpcd threads are blocked. Hence the
- * need for a dedicated thread.
- */
-static struct ptlrpcd_ctl ptlrpcd_rcv;
-
-struct mutex ptlrpcd_mutex;
-static int ptlrpcd_users;
-
-void ptlrpcd_wake(struct ptlrpc_request *req)
-{
- struct ptlrpc_request_set *set = req->rq_set;
-
- wake_up(&set->set_waitq);
-}
-EXPORT_SYMBOL(ptlrpcd_wake);
-
-static struct ptlrpcd_ctl *
-ptlrpcd_select_pc(struct ptlrpc_request *req)
-{
- struct ptlrpcd *pd;
- int cpt;
- int idx;
-
- if (req && req->rq_send_state != LUSTRE_IMP_FULL)
- return &ptlrpcd_rcv;
-
- cpt = cfs_cpt_current(cfs_cpt_table, 1);
- if (!ptlrpcds_cpt_idx)
- idx = cpt;
- else
- idx = ptlrpcds_cpt_idx[cpt];
- pd = ptlrpcds[idx];
-
- /* We do not care whether it is strict load balance. */
- idx = pd->pd_cursor;
- if (++idx == pd->pd_nthreads)
- idx = 0;
- pd->pd_cursor = idx;
-
- return &pd->pd_threads[idx];
-}
-
-/**
- * Return transferred RPCs count.
- */
-static int ptlrpcd_steal_rqset(struct ptlrpc_request_set *des,
- struct ptlrpc_request_set *src)
-{
- struct ptlrpc_request *req, *tmp;
- int rc = 0;
-
- spin_lock(&src->set_new_req_lock);
- if (likely(!list_empty(&src->set_new_requests))) {
- list_for_each_entry_safe(req, tmp, &src->set_new_requests, rq_set_chain)
- req->rq_set = des;
-
- list_splice_init(&src->set_new_requests, &des->set_requests);
- rc = atomic_read(&src->set_new_count);
- atomic_add(rc, &des->set_remaining);
- atomic_set(&src->set_new_count, 0);
- }
- spin_unlock(&src->set_new_req_lock);
- return rc;
-}
-
-/**
- * Requests that are added to the ptlrpcd queue are sent via
- * ptlrpcd_check->ptlrpc_check_set().
- */
-void ptlrpcd_add_req(struct ptlrpc_request *req)
-{
- struct ptlrpcd_ctl *pc;
-
- if (req->rq_reqmsg)
- lustre_msg_set_jobid(req->rq_reqmsg, NULL);
-
- spin_lock(&req->rq_lock);
- if (req->rq_invalid_rqset) {
- req->rq_invalid_rqset = 0;
- spin_unlock(&req->rq_lock);
- if (wait_event_idle_timeout(req->rq_set_waitq,
- !req->rq_set,
- 5 * HZ) == 0)
- wait_event_idle(req->rq_set_waitq,
- !req->rq_set);
- } else if (req->rq_set) {
- /* If we have a valid "rq_set", just reuse it to avoid double
- * linked.
- */
- LASSERT(req->rq_phase == RQ_PHASE_NEW);
- LASSERT(req->rq_send_state == LUSTRE_IMP_REPLAY);
-
- /* ptlrpc_check_set will decrease the count */
- atomic_inc(&req->rq_set->set_remaining);
- spin_unlock(&req->rq_lock);
- wake_up(&req->rq_set->set_waitq);
- return;
- } else {
- spin_unlock(&req->rq_lock);
- }
-
- pc = ptlrpcd_select_pc(req);
-
- DEBUG_REQ(D_INFO, req, "add req [%p] to pc [%s:%d]",
- req, pc->pc_name, pc->pc_index);
-
- ptlrpc_set_add_new_req(pc, req);
-}
-EXPORT_SYMBOL(ptlrpcd_add_req);
-
-static inline void ptlrpc_reqset_get(struct ptlrpc_request_set *set)
-{
- atomic_inc(&set->set_refcount);
-}
-
-/**
- * Check if there is more work to do on ptlrpcd set.
- * Returns 1 if yes.
- */
-static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc)
-{
- struct ptlrpc_request *req, *tmp;
- struct ptlrpc_request_set *set = pc->pc_set;
- int rc = 0;
- int rc2;
-
- if (atomic_read(&set->set_new_count)) {
- spin_lock(&set->set_new_req_lock);
- if (likely(!list_empty(&set->set_new_requests))) {
- list_splice_init(&set->set_new_requests,
- &set->set_requests);
- atomic_add(atomic_read(&set->set_new_count),
- &set->set_remaining);
- atomic_set(&set->set_new_count, 0);
- /*
- * Need to calculate its timeout.
- */
- rc = 1;
- }
- spin_unlock(&set->set_new_req_lock);
- }
-
- /* We should call lu_env_refill() before handling new requests to make
- * sure that env key the requests depending on really exists.
- */
- rc2 = lu_env_refill(env);
- if (rc2 != 0) {
- /*
- * XXX This is very awkward situation, because
- * execution can neither continue (request
- * interpreters assume that env is set up), nor repeat
- * the loop (as this potentially results in a tight
- * loop of -ENOMEM's).
- *
- * Fortunately, refill only ever does something when
- * new modules are loaded, i.e., early during boot up.
- */
- CERROR("Failure to refill session: %d\n", rc2);
- return rc;
- }
-
- if (atomic_read(&set->set_remaining))
- rc |= ptlrpc_check_set(env, set);
-
- /* NB: ptlrpc_check_set has already moved completed request at the
- * head of seq::set_requests
- */
- list_for_each_entry_safe(req, tmp, &set->set_requests, rq_set_chain) {
- if (req->rq_phase != RQ_PHASE_COMPLETE)
- break;
-
- list_del_init(&req->rq_set_chain);
- req->rq_set = NULL;
- ptlrpc_req_finished(req);
- }
-
- if (rc == 0) {
- /*
- * If new requests have been added, make sure to wake up.
- */
- rc = atomic_read(&set->set_new_count);
-
- /* If we have nothing to do, check whether we can take some
- * work from our partner threads.
- */
- if (rc == 0 && pc->pc_npartners > 0) {
- struct ptlrpcd_ctl *partner;
- struct ptlrpc_request_set *ps;
- int first = pc->pc_cursor;
-
- do {
- partner = pc->pc_partners[pc->pc_cursor++];
- if (pc->pc_cursor >= pc->pc_npartners)
- pc->pc_cursor = 0;
- if (!partner)
- continue;
-
- spin_lock(&partner->pc_lock);
- ps = partner->pc_set;
- if (!ps) {
- spin_unlock(&partner->pc_lock);
- continue;
- }
-
- ptlrpc_reqset_get(ps);
- spin_unlock(&partner->pc_lock);
-
- if (atomic_read(&ps->set_new_count)) {
- rc = ptlrpcd_steal_rqset(set, ps);
- if (rc > 0)
- CDEBUG(D_RPCTRACE, "transfer %d async RPCs [%d->%d]\n",
- rc, partner->pc_index,
- pc->pc_index);
- }
- ptlrpc_reqset_put(ps);
- } while (rc == 0 && pc->pc_cursor != first);
- }
- }
-
- return rc;
-}
-
-/**
- * Main ptlrpcd thread.
- * ptlrpc's code paths like to execute in process context, so we have this
- * thread which spins on a set which contains the rpcs and sends them.
- *
- */
-static int ptlrpcd(void *arg)
-{
- struct ptlrpcd_ctl *pc = arg;
- struct ptlrpc_request_set *set;
- struct lu_context ses = { 0 };
- struct lu_env env = { .le_ses = &ses };
- int rc = 0;
- int exit = 0;
-
- unshare_fs_struct();
- if (cfs_cpt_bind(cfs_cpt_table, pc->pc_cpt) != 0)
- CWARN("Failed to bind %s on CPT %d\n", pc->pc_name, pc->pc_cpt);
-
- /*
- * Allocate the request set after the thread has been bound
- * above. This is safe because no requests will be queued
- * until all ptlrpcd threads have confirmed that they have
- * successfully started.
- */
- set = ptlrpc_prep_set();
- if (!set) {
- rc = -ENOMEM;
- goto failed;
- }
- spin_lock(&pc->pc_lock);
- pc->pc_set = set;
- spin_unlock(&pc->pc_lock);
- /*
- * XXX So far only "client" ptlrpcd uses an environment. In
- * the future, ptlrpcd thread (or a thread-set) has to given
- * an argument, describing its "scope".
- */
- rc = lu_context_init(&env.le_ctx,
- LCT_CL_THREAD | LCT_REMEMBER | LCT_NOREF);
- if (rc == 0) {
- rc = lu_context_init(env.le_ses,
- LCT_SESSION | LCT_REMEMBER | LCT_NOREF);
- if (rc != 0)
- lu_context_fini(&env.le_ctx);
- }
-
- if (rc != 0)
- goto failed;
-
- complete(&pc->pc_starting);
-
- /*
- * This mainloop strongly resembles ptlrpc_set_wait() except that our
- * set never completes. ptlrpcd_check() calls ptlrpc_check_set() when
- * there are requests in the set. New requests come in on the set's
- * new_req_list and ptlrpcd_check() moves them into the set.
- */
- do {
- int timeout;
-
- timeout = ptlrpc_set_next_timeout(set);
-
- lu_context_enter(&env.le_ctx);
- lu_context_enter(env.le_ses);
- if (wait_event_idle_timeout(set->set_waitq,
- ptlrpcd_check(&env, pc),
- (timeout ? timeout : 1) * HZ) == 0)
- ptlrpc_expired_set(set);
-
- lu_context_exit(&env.le_ctx);
- lu_context_exit(env.le_ses);
-
- /*
- * Abort inflight rpcs for forced stop case.
- */
- if (test_bit(LIOD_STOP, &pc->pc_flags)) {
- if (test_bit(LIOD_FORCE, &pc->pc_flags))
- ptlrpc_abort_set(set);
- exit++;
- }
-
- /*
- * Let's make one more loop to make sure that ptlrpcd_check()
- * copied all raced new rpcs into the set so we can kill them.
- */
- } while (exit < 2);
-
- /*
- * Wait for inflight requests to drain.
- */
- if (!list_empty(&set->set_requests))
- ptlrpc_set_wait(set);
- lu_context_fini(&env.le_ctx);
- lu_context_fini(env.le_ses);
-
- complete(&pc->pc_finishing);
-
- return 0;
-failed:
- pc->pc_error = rc;
- complete(&pc->pc_starting);
- return rc;
-}
-
-static void ptlrpcd_ctl_init(struct ptlrpcd_ctl *pc, int index, int cpt)
-{
- pc->pc_index = index;
- pc->pc_cpt = cpt;
- init_completion(&pc->pc_starting);
- init_completion(&pc->pc_finishing);
- spin_lock_init(&pc->pc_lock);
-
- if (index < 0) {
- /* Recovery thread. */
- snprintf(pc->pc_name, sizeof(pc->pc_name), "ptlrpcd_rcv");
- } else {
- /* Regular thread. */
- snprintf(pc->pc_name, sizeof(pc->pc_name),
- "ptlrpcd_%02d_%02d", cpt, index);
- }
-}
-
-/* XXX: We want multiple CPU cores to share the async RPC load. So we
- * start many ptlrpcd threads. We also want to reduce the ptlrpcd
- * overhead caused by data transfer cross-CPU cores. So we bind
- * all ptlrpcd threads to a CPT, in the expectation that CPTs
- * will be defined in a way that matches these boundaries. Within
- * a CPT a ptlrpcd thread can be scheduled on any available core.
- *
- * Each ptlrpcd thread has its own request queue. This can cause
- * response delay if the thread is already busy. To help with
- * this we define partner threads: these are other threads bound
- * to the same CPT which will check for work in each other's
- * request queues if they have no work to do.
- *
- * The desired number of partner threads can be tuned by setting
- * ptlrpcd_partner_group_size. The default is to create pairs of
- * partner threads.
- */
-static int ptlrpcd_partners(struct ptlrpcd *pd, int index)
-{
- struct ptlrpcd_ctl *pc;
- struct ptlrpcd_ctl **ppc;
- int first;
- int i;
- int rc = 0;
- int size;
-
- LASSERT(index >= 0 && index < pd->pd_nthreads);
- pc = &pd->pd_threads[index];
- pc->pc_npartners = pd->pd_groupsize - 1;
-
- if (pc->pc_npartners <= 0)
- goto out;
-
- size = sizeof(struct ptlrpcd_ctl *) * pc->pc_npartners;
- pc->pc_partners = kzalloc_node(size, GFP_NOFS,
- cfs_cpt_spread_node(cfs_cpt_table,
- pc->pc_cpt));
- if (!pc->pc_partners) {
- pc->pc_npartners = 0;
- rc = -ENOMEM;
- goto out;
- }
-
- first = index - index % pd->pd_groupsize;
- ppc = pc->pc_partners;
- for (i = first; i < first + pd->pd_groupsize; i++) {
- if (i != index)
- *ppc++ = &pd->pd_threads[i];
- }
-out:
- return rc;
-}
-
-int ptlrpcd_start(struct ptlrpcd_ctl *pc)
-{
- struct task_struct *task;
- int rc = 0;
-
- /*
- * Do not allow start second thread for one pc.
- */
- if (test_and_set_bit(LIOD_START, &pc->pc_flags)) {
- CWARN("Starting second thread (%s) for same pc %p\n",
- pc->pc_name, pc);
- return 0;
- }
-
- task = kthread_run(ptlrpcd, pc, "%s", pc->pc_name);
- if (IS_ERR(task)) {
- rc = PTR_ERR(task);
- goto out_set;
- }
-
- wait_for_completion(&pc->pc_starting);
- rc = pc->pc_error;
- if (rc != 0)
- goto out_set;
-
- return 0;
-
-out_set:
- if (pc->pc_set) {
- struct ptlrpc_request_set *set = pc->pc_set;
-
- spin_lock(&pc->pc_lock);
- pc->pc_set = NULL;
- spin_unlock(&pc->pc_lock);
- ptlrpc_set_destroy(set);
- }
- clear_bit(LIOD_START, &pc->pc_flags);
- return rc;
-}
-
-void ptlrpcd_stop(struct ptlrpcd_ctl *pc, int force)
-{
- if (!test_bit(LIOD_START, &pc->pc_flags)) {
- CWARN("Thread for pc %p was not started\n", pc);
- return;
- }
-
- set_bit(LIOD_STOP, &pc->pc_flags);
- if (force)
- set_bit(LIOD_FORCE, &pc->pc_flags);
- wake_up(&pc->pc_set->set_waitq);
-}
-
-void ptlrpcd_free(struct ptlrpcd_ctl *pc)
-{
- struct ptlrpc_request_set *set = pc->pc_set;
-
- if (!test_bit(LIOD_START, &pc->pc_flags)) {
- CWARN("Thread for pc %p was not started\n", pc);
- goto out;
- }
-
- wait_for_completion(&pc->pc_finishing);
-
- spin_lock(&pc->pc_lock);
- pc->pc_set = NULL;
- spin_unlock(&pc->pc_lock);
- ptlrpc_set_destroy(set);
-
- clear_bit(LIOD_START, &pc->pc_flags);
- clear_bit(LIOD_STOP, &pc->pc_flags);
- clear_bit(LIOD_FORCE, &pc->pc_flags);
-
-out:
- if (pc->pc_npartners > 0) {
- LASSERT(pc->pc_partners);
-
- kfree(pc->pc_partners);
- pc->pc_partners = NULL;
- }
- pc->pc_npartners = 0;
- pc->pc_error = 0;
-}
-
-static void ptlrpcd_fini(void)
-{
- int i;
- int j;
-
- if (ptlrpcds) {
- for (i = 0; i < ptlrpcds_num; i++) {
- if (!ptlrpcds[i])
- break;
- for (j = 0; j < ptlrpcds[i]->pd_nthreads; j++)
- ptlrpcd_stop(&ptlrpcds[i]->pd_threads[j], 0);
- for (j = 0; j < ptlrpcds[i]->pd_nthreads; j++)
- ptlrpcd_free(&ptlrpcds[i]->pd_threads[j]);
- kfree(ptlrpcds[i]);
- ptlrpcds[i] = NULL;
- }
- kfree(ptlrpcds);
- }
- ptlrpcds_num = 0;
-
- ptlrpcd_stop(&ptlrpcd_rcv, 0);
- ptlrpcd_free(&ptlrpcd_rcv);
-
- kfree(ptlrpcds_cpt_idx);
- ptlrpcds_cpt_idx = NULL;
-}
-
-static int ptlrpcd_init(void)
-{
- int nthreads;
- int groupsize;
- int size;
- int i;
- int j;
- int rc = 0;
- struct cfs_cpt_table *cptable;
- __u32 *cpts = NULL;
- int ncpts;
- int cpt;
- struct ptlrpcd *pd;
-
- /*
- * Determine the CPTs that ptlrpcd threads will run on.
- */
- cptable = cfs_cpt_table;
- ncpts = cfs_cpt_number(cptable);
- if (ptlrpcd_cpts) {
- struct cfs_expr_list *el;
-
- size = ncpts * sizeof(ptlrpcds_cpt_idx[0]);
- ptlrpcds_cpt_idx = kzalloc(size, GFP_KERNEL);
- if (!ptlrpcds_cpt_idx) {
- rc = -ENOMEM;
- goto out;
- }
-
- rc = cfs_expr_list_parse(ptlrpcd_cpts,
- strlen(ptlrpcd_cpts),
- 0, ncpts - 1, &el);
-
- if (rc != 0) {
- CERROR("ptlrpcd_cpts: invalid CPT pattern string: %s",
- ptlrpcd_cpts);
- rc = -EINVAL;
- goto out;
- }
-
- rc = cfs_expr_list_values(el, ncpts, &cpts);
- cfs_expr_list_free(el);
- if (rc <= 0) {
- CERROR("ptlrpcd_cpts: failed to parse CPT array %s: %d\n",
- ptlrpcd_cpts, rc);
- if (rc == 0)
- rc = -EINVAL;
- goto out;
- }
-
- /*
- * Create the cpt-to-index map. When there is no match
- * in the cpt table, pick a cpt at random. This could
- * be changed to take the topology of the system into
- * account.
- */
- for (cpt = 0; cpt < ncpts; cpt++) {
- for (i = 0; i < rc; i++)
- if (cpts[i] == cpt)
- break;
- if (i >= rc)
- i = cpt % rc;
- ptlrpcds_cpt_idx[cpt] = i;
- }
-
- cfs_expr_list_values_free(cpts, rc);
- ncpts = rc;
- }
- ptlrpcds_num = ncpts;
-
- size = ncpts * sizeof(ptlrpcds[0]);
- ptlrpcds = kzalloc(size, GFP_KERNEL);
- if (!ptlrpcds) {
- rc = -ENOMEM;
- goto out;
- }
-
- /*
- * The max_ptlrpcds parameter is obsolete, but do something
- * sane if it has been tuned, and complain if
- * ptlrpcd_per_cpt_max has also been tuned.
- */
- if (max_ptlrpcds != 0) {
- CWARN("max_ptlrpcds is obsolete.\n");
- if (ptlrpcd_per_cpt_max == 0) {
- ptlrpcd_per_cpt_max = max_ptlrpcds / ncpts;
- /* Round up if there is a remainder. */
- if (max_ptlrpcds % ncpts != 0)
- ptlrpcd_per_cpt_max++;
- CWARN("Setting ptlrpcd_per_cpt_max = %d\n",
- ptlrpcd_per_cpt_max);
- } else {
- CWARN("ptlrpd_per_cpt_max is also set!\n");
- }
- }
-
- /*
- * The ptlrpcd_bind_policy parameter is obsolete, but do
- * something sane if it has been tuned, and complain if
- * ptlrpcd_partner_group_size is also tuned.
- */
- if (ptlrpcd_bind_policy != 0) {
- CWARN("ptlrpcd_bind_policy is obsolete.\n");
- if (ptlrpcd_partner_group_size == 0) {
- switch (ptlrpcd_bind_policy) {
- case 1: /* PDB_POLICY_NONE */
- case 2: /* PDB_POLICY_FULL */
- ptlrpcd_partner_group_size = 1;
- break;
- case 3: /* PDB_POLICY_PAIR */
- ptlrpcd_partner_group_size = 2;
- break;
- case 4: /* PDB_POLICY_NEIGHBOR */
-#ifdef CONFIG_NUMA
- ptlrpcd_partner_group_size = -1; /* CPT */
-#else
- ptlrpcd_partner_group_size = 3; /* Triplets */
-#endif
- break;
- default: /* Illegal value, use the default. */
- ptlrpcd_partner_group_size = 2;
- break;
- }
- CWARN("Setting ptlrpcd_partner_group_size = %d\n",
- ptlrpcd_partner_group_size);
- } else {
- CWARN("ptlrpcd_partner_group_size is also set!\n");
- }
- }
-
- if (ptlrpcd_partner_group_size == 0)
- ptlrpcd_partner_group_size = 2;
- else if (ptlrpcd_partner_group_size < 0)
- ptlrpcd_partner_group_size = -1;
- else if (ptlrpcd_per_cpt_max > 0 &&
- ptlrpcd_partner_group_size > ptlrpcd_per_cpt_max)
- ptlrpcd_partner_group_size = ptlrpcd_per_cpt_max;
-
- /*
- * Start the recovery thread first.
- */
- set_bit(LIOD_RECOVERY, &ptlrpcd_rcv.pc_flags);
- ptlrpcd_ctl_init(&ptlrpcd_rcv, -1, CFS_CPT_ANY);
- rc = ptlrpcd_start(&ptlrpcd_rcv);
- if (rc < 0)
- goto out;
-
- for (i = 0; i < ncpts; i++) {
- if (!cpts)
- cpt = i;
- else
- cpt = cpts[i];
-
- nthreads = cfs_cpt_weight(cptable, cpt);
- if (ptlrpcd_per_cpt_max > 0 && ptlrpcd_per_cpt_max < nthreads)
- nthreads = ptlrpcd_per_cpt_max;
- if (nthreads < 2)
- nthreads = 2;
-
- if (ptlrpcd_partner_group_size <= 0) {
- groupsize = nthreads;
- } else if (nthreads <= ptlrpcd_partner_group_size) {
- groupsize = nthreads;
- } else {
- groupsize = ptlrpcd_partner_group_size;
- if (nthreads % groupsize != 0)
- nthreads += groupsize - (nthreads % groupsize);
- }
-
- size = offsetof(struct ptlrpcd, pd_threads[nthreads]);
- pd = kzalloc_node(size, GFP_NOFS,
- cfs_cpt_spread_node(cfs_cpt_table, cpt));
- if (!pd) {
- rc = -ENOMEM;
- goto out;
- }
- pd->pd_size = size;
- pd->pd_index = i;
- pd->pd_cpt = cpt;
- pd->pd_cursor = 0;
- pd->pd_nthreads = nthreads;
- pd->pd_groupsize = groupsize;
- ptlrpcds[i] = pd;
-
- /*
- * The ptlrpcd threads in a partner group can access
- * each other's struct ptlrpcd_ctl, so these must be
- * initialized before any thread is started.
- */
- for (j = 0; j < nthreads; j++) {
- ptlrpcd_ctl_init(&pd->pd_threads[j], j, cpt);
- rc = ptlrpcd_partners(pd, j);
- if (rc < 0)
- goto out;
- }
-
- /* XXX: We start nthreads ptlrpc daemons.
- * Each of them can process any non-recovery
- * async RPC to improve overall async RPC
- * efficiency.
- *
- * But there are some issues with async I/O RPCs
- * and async non-I/O RPCs processed in the same
- * set under some cases. The ptlrpcd may be
- * blocked by some async I/O RPC(s), then will
- * cause other async non-I/O RPC(s) can not be
- * processed in time.
- *
- * Maybe we should distinguish blocked async RPCs
- * from non-blocked async RPCs, and process them
- * in different ptlrpcd sets to avoid unnecessary
- * dependency. But how to distribute async RPCs
- * load among all the ptlrpc daemons becomes
- * another trouble.
- */
- for (j = 0; j < nthreads; j++) {
- rc = ptlrpcd_start(&pd->pd_threads[j]);
- if (rc < 0)
- goto out;
- }
- }
-out:
- if (rc != 0)
- ptlrpcd_fini();
-
- return rc;
-}
-
-int ptlrpcd_addref(void)
-{
- int rc = 0;
-
- mutex_lock(&ptlrpcd_mutex);
- if (++ptlrpcd_users == 1) {
- rc = ptlrpcd_init();
- if (rc < 0)
- ptlrpcd_users--;
- }
- mutex_unlock(&ptlrpcd_mutex);
- return rc;
-}
-EXPORT_SYMBOL(ptlrpcd_addref);
-
-void ptlrpcd_decref(void)
-{
- mutex_lock(&ptlrpcd_mutex);
- if (--ptlrpcd_users == 0)
- ptlrpcd_fini();
- mutex_unlock(&ptlrpcd_mutex);
-}
-EXPORT_SYMBOL(ptlrpcd_decref);
-/** @} ptlrpcd */