diff options
Diffstat (limited to 'net/sunrpc/sched.c')
-rw-r--r-- | net/sunrpc/sched.c | 297 |
1 files changed, 167 insertions, 130 deletions
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index 55e900255b0c..be587a308e05 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -27,10 +27,6 @@ #include "sunrpc.h" -#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) -#define RPCDBG_FACILITY RPCDBG_SCHED -#endif - #define CREATE_TRACE_POINTS #include <trace/events/sunrpc.h> @@ -61,6 +57,21 @@ struct workqueue_struct *rpciod_workqueue __read_mostly; struct workqueue_struct *xprtiod_workqueue __read_mostly; EXPORT_SYMBOL_GPL(xprtiod_workqueue); +gfp_t rpc_task_gfp_mask(void) +{ + if (current->flags & PF_WQ_WORKER) + return GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN; + return GFP_KERNEL; +} +EXPORT_SYMBOL_GPL(rpc_task_gfp_mask); + +bool rpc_task_set_rpc_status(struct rpc_task *task, int rpc_status) +{ + if (cmpxchg(&task->tk_rpc_status, 0, rpc_status) == 0) + return true; + return false; +} + unsigned long rpc_task_timeout(const struct rpc_task *task) { @@ -85,7 +96,6 @@ __rpc_disable_timer(struct rpc_wait_queue *queue, struct rpc_task *task) { if (list_empty(&task->u.tk_wait.timer_list)) return; - dprintk("RPC: %5u disabling timer\n", task->tk_pid); task->tk_timeout = 0; list_del(&task->u.tk_wait.timer_list); if (list_empty(&queue->timer_list.list)) @@ -111,9 +121,6 @@ static void __rpc_add_timer(struct rpc_wait_queue *queue, struct rpc_task *task, unsigned long timeout) { - dprintk("RPC: %5u setting alarm for %u ms\n", - task->tk_pid, jiffies_to_msecs(timeout - jiffies)); - task->tk_timeout = timeout; if (list_empty(&queue->timer_list.list) || time_before(timeout, queue->timer_list.expires)) rpc_set_queue_timer(queue, timeout); @@ -194,25 +201,14 @@ static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue, /* * Add new request to wait queue. - * - * Swapper tasks always get inserted at the head of the queue. - * This should avoid many nasty memory deadlocks and hopefully - * improve overall performance. - * Everyone else gets appended to the queue to ensure proper FIFO behavior. */ static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task, unsigned char queue_priority) { - WARN_ON_ONCE(RPC_IS_QUEUED(task)); - if (RPC_IS_QUEUED(task)) - return; - INIT_LIST_HEAD(&task->u.tk_wait.timer_list); if (RPC_IS_PRIORITY(queue)) __rpc_add_wait_queue_priority(queue, task, queue_priority); - else if (RPC_IS_SWAPPER(task)) - list_add(&task->u.tk_wait.list, &queue->tasks[0]); else list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]); task->tk_waitqueue = queue; @@ -220,9 +216,6 @@ static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, /* barrier matches the read in rpc_wake_up_task_queue_locked() */ smp_wmb(); rpc_set_queued(task); - - dprintk("RPC: %5u added to queue %p \"%s\"\n", - task->tk_pid, queue, rpc_qname(queue)); } /* @@ -245,8 +238,6 @@ static void __rpc_remove_wait_queue(struct rpc_wait_queue *queue, struct rpc_tas else list_del(&task->u.tk_wait.list); queue->qlen--; - dprintk("RPC: %5u removed from queue %p \"%s\"\n", - task->tk_pid, queue, rpc_qname(queue)); } static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, unsigned char nr_queues) @@ -285,7 +276,7 @@ EXPORT_SYMBOL_GPL(rpc_destroy_wait_queue); static int rpc_wait_bit_killable(struct wait_bit_key *key, int mode) { - freezable_schedule_unsafe(); + schedule(); if (signal_pending_state(mode, current)) return -ERESTARTSYS; return 0; @@ -294,9 +285,17 @@ static int rpc_wait_bit_killable(struct wait_bit_key *key, int mode) #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) || IS_ENABLED(CONFIG_TRACEPOINTS) static void rpc_task_set_debuginfo(struct rpc_task *task) { - static atomic_t rpc_pid; + struct rpc_clnt *clnt = task->tk_client; + + /* Might be a task carrying a reverse-direction operation */ + if (!clnt) { + static atomic_t rpc_pid; - task->tk_pid = atomic_inc_return(&rpc_pid); + task->tk_pid = atomic_inc_return(&rpc_pid); + return; + } + + task->tk_pid = atomic_inc_return(&clnt->cl_pid); } #else static inline void rpc_task_set_debuginfo(struct rpc_task *task) @@ -341,14 +340,12 @@ static int rpc_complete_task(struct rpc_task *task) * to enforce taking of the wq->lock and hence avoid races with * rpc_complete_task(). */ -int __rpc_wait_for_completion_task(struct rpc_task *task, wait_bit_action_f *action) +int rpc_wait_for_completion_task(struct rpc_task *task) { - if (action == NULL) - action = rpc_wait_bit_killable; return out_of_line_wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE, - action, TASK_KILLABLE); + rpc_wait_bit_killable, TASK_KILLABLE|TASK_FREEZABLE_UNSAFE); } -EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task); +EXPORT_SYMBOL_GPL(rpc_wait_for_completion_task); /* * Make an RPC task runnable. @@ -382,25 +379,32 @@ static void rpc_make_runnable(struct workqueue_struct *wq, * NB: An RPC task will only receive interrupt-driven events as long * as it's on a wait queue. */ -static void __rpc_sleep_on_priority(struct rpc_wait_queue *q, +static void __rpc_do_sleep_on_priority(struct rpc_wait_queue *q, struct rpc_task *task, unsigned char queue_priority) { - dprintk("RPC: %5u sleep_on(queue \"%s\" time %lu)\n", - task->tk_pid, rpc_qname(q), jiffies); - trace_rpc_task_sleep(task, q); __rpc_add_wait_queue(q, task, queue_priority); +} +static void __rpc_sleep_on_priority(struct rpc_wait_queue *q, + struct rpc_task *task, + unsigned char queue_priority) +{ + if (WARN_ON_ONCE(RPC_IS_QUEUED(task))) + return; + __rpc_do_sleep_on_priority(q, task, queue_priority); } static void __rpc_sleep_on_priority_timeout(struct rpc_wait_queue *q, struct rpc_task *task, unsigned long timeout, unsigned char queue_priority) { + if (WARN_ON_ONCE(RPC_IS_QUEUED(task))) + return; if (time_is_after_jiffies(timeout)) { - __rpc_sleep_on_priority(q, task, queue_priority); + __rpc_do_sleep_on_priority(q, task, queue_priority); __rpc_add_timer(q, task, timeout); } else task->tk_status = -ETIMEDOUT; @@ -503,9 +507,6 @@ static void __rpc_do_wake_up_task_on_wq(struct workqueue_struct *wq, struct rpc_wait_queue *queue, struct rpc_task *task) { - dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n", - task->tk_pid, jiffies); - /* Has the task been executed yet? If not, we cannot wake it up! */ if (!RPC_IS_ACTIVATED(task)) { printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task); @@ -517,8 +518,6 @@ static void __rpc_do_wake_up_task_on_wq(struct workqueue_struct *wq, __rpc_remove_wait_queue(queue, task); rpc_make_runnable(wq, task); - - dprintk("RPC: __rpc_wake_up_task done\n"); } /* @@ -607,10 +606,20 @@ static struct rpc_task *__rpc_find_next_queued_priority(struct rpc_wait_queue *q struct rpc_task *task; /* + * Service the privileged queue. + */ + q = &queue->tasks[RPC_NR_PRIORITY - 1]; + if (queue->maxpriority > RPC_PRIORITY_PRIVILEGED && !list_empty(q)) { + task = list_first_entry(q, struct rpc_task, u.tk_wait.list); + goto out; + } + + /* * Service a batch of tasks from a single owner. */ q = &queue->tasks[queue->priority]; - if (!list_empty(q) && --queue->nr) { + if (!list_empty(q) && queue->nr) { + queue->nr--; task = list_first_entry(q, struct rpc_task, u.tk_wait.list); goto out; } @@ -656,8 +665,6 @@ struct rpc_task *rpc_wake_up_first_on_wq(struct workqueue_struct *wq, { struct rpc_task *task = NULL; - dprintk("RPC: wake_up_first(%p \"%s\")\n", - queue, rpc_qname(queue)); spin_lock(&queue->lock); task = __rpc_find_next_queued(queue); if (task != NULL) @@ -693,6 +700,23 @@ struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *queue) EXPORT_SYMBOL_GPL(rpc_wake_up_next); /** + * rpc_wake_up_locked - wake up all rpc_tasks + * @queue: rpc_wait_queue on which the tasks are sleeping + * + */ +static void rpc_wake_up_locked(struct rpc_wait_queue *queue) +{ + struct rpc_task *task; + + for (;;) { + task = __rpc_find_next_queued(queue); + if (task == NULL) + break; + rpc_wake_up_task_queue_locked(queue, task); + } +} + +/** * rpc_wake_up - wake up all rpc_tasks * @queue: rpc_wait_queue on which the tasks are sleeping * @@ -700,25 +724,28 @@ EXPORT_SYMBOL_GPL(rpc_wake_up_next); */ void rpc_wake_up(struct rpc_wait_queue *queue) { - struct list_head *head; - spin_lock(&queue->lock); - head = &queue->tasks[queue->maxpriority]; + rpc_wake_up_locked(queue); + spin_unlock(&queue->lock); +} +EXPORT_SYMBOL_GPL(rpc_wake_up); + +/** + * rpc_wake_up_status_locked - wake up all rpc_tasks and set their status value. + * @queue: rpc_wait_queue on which the tasks are sleeping + * @status: status value to set + */ +static void rpc_wake_up_status_locked(struct rpc_wait_queue *queue, int status) +{ + struct rpc_task *task; + for (;;) { - while (!list_empty(head)) { - struct rpc_task *task; - task = list_first_entry(head, - struct rpc_task, - u.tk_wait.list); - rpc_wake_up_task_queue_locked(queue, task); - } - if (head == &queue->tasks[0]) + task = __rpc_find_next_queued(queue); + if (task == NULL) break; - head--; + rpc_wake_up_task_queue_set_status_locked(queue, task, status); } - spin_unlock(&queue->lock); } -EXPORT_SYMBOL_GPL(rpc_wake_up); /** * rpc_wake_up_status - wake up all rpc_tasks and set their status value. @@ -729,23 +756,8 @@ EXPORT_SYMBOL_GPL(rpc_wake_up); */ void rpc_wake_up_status(struct rpc_wait_queue *queue, int status) { - struct list_head *head; - spin_lock(&queue->lock); - head = &queue->tasks[queue->maxpriority]; - for (;;) { - while (!list_empty(head)) { - struct rpc_task *task; - task = list_first_entry(head, - struct rpc_task, - u.tk_wait.list); - task->tk_status = status; - rpc_wake_up_task_queue_locked(queue, task); - } - if (head == &queue->tasks[0]) - break; - head--; - } + rpc_wake_up_status_locked(queue, status); spin_unlock(&queue->lock); } EXPORT_SYMBOL_GPL(rpc_wake_up_status); @@ -763,7 +775,7 @@ static void __rpc_queue_timer_fn(struct work_struct *work) list_for_each_entry_safe(task, n, &queue->timer_list.list, u.tk_wait.timer_list) { timeo = task->tk_timeout; if (time_after_eq(now, timeo)) { - dprintk("RPC: %5u timeout\n", task->tk_pid); + trace_rpc_task_timeout(task, task->tk_action); task->tk_status = -ETIMEDOUT; rpc_wake_up_task_queue_locked(queue, task); continue; @@ -831,6 +843,7 @@ void rpc_exit_task(struct rpc_task *task) else if (task->tk_client) rpc_count_iostats(task, task->tk_client->cl_metrics); if (task->tk_ops->rpc_call_done != NULL) { + trace_rpc_task_call_done(task, task->tk_ops->rpc_call_done); task->tk_ops->rpc_call_done(task, task->tk_calldata); if (task->tk_action != NULL) { /* Always release the RPC slot and buffer memory */ @@ -847,12 +860,25 @@ void rpc_signal_task(struct rpc_task *task) if (!RPC_IS_ACTIVATED(task)) return; + if (!rpc_task_set_rpc_status(task, -ERESTARTSYS)) + return; trace_rpc_task_signalled(task, task->tk_action); set_bit(RPC_TASK_SIGNALLED, &task->tk_runstate); smp_mb__after_atomic(); queue = READ_ONCE(task->tk_waitqueue); if (queue) - rpc_wake_up_queued_task_set_status(queue, task, -ERESTARTSYS); + rpc_wake_up_queued_task(queue, task); +} + +void rpc_task_try_cancel(struct rpc_task *task, int error) +{ + struct rpc_wait_queue *queue; + + if (!rpc_task_set_rpc_status(task, error)) + return; + queue = READ_ONCE(task->tk_waitqueue); + if (queue) + rpc_wake_up_queued_task(queue, task); } void rpc_exit(struct rpc_task *task, int status) @@ -869,6 +895,15 @@ void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata) ops->rpc_release(calldata); } +static bool xprt_needs_memalloc(struct rpc_xprt *xprt, struct rpc_task *tk) +{ + if (!xprt) + return false; + if (!atomic_read(&xprt->swapper)) + return false; + return test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == tk; +} + /* * This is the RPC `scheduler' (or rather, the finite state machine). */ @@ -877,9 +912,7 @@ static void __rpc_execute(struct rpc_task *task) struct rpc_wait_queue *queue; int task_is_async = RPC_IS_ASYNC(task); int status = 0; - - dprintk("RPC: %5u __rpc_execute flags=0x%x\n", - task->tk_pid, task->tk_flags); + unsigned long pflags = current->flags; WARN_ON_ONCE(RPC_IS_QUEUED(task)); if (RPC_IS_QUEUED(task)) @@ -892,31 +925,35 @@ static void __rpc_execute(struct rpc_task *task) * Perform the next FSM step or a pending callback. * * tk_action may be NULL if the task has been killed. - * In particular, note that rpc_killall_tasks may - * do this at any time, so beware when dereferencing. */ do_action = task->tk_action; + /* Tasks with an RPC error status should exit */ + if (do_action != rpc_exit_task && + (status = READ_ONCE(task->tk_rpc_status)) != 0) { + task->tk_status = status; + if (do_action != NULL) + do_action = rpc_exit_task; + } + /* Callbacks override all actions */ if (task->tk_callback) { do_action = task->tk_callback; task->tk_callback = NULL; } if (!do_action) break; + if (RPC_IS_SWAPPER(task) || + xprt_needs_memalloc(task->tk_xprt, task)) + current->flags |= PF_MEMALLOC; + trace_rpc_task_run_action(task, do_action); do_action(task); /* * Lockless check for whether task is sleeping or not. */ - if (!RPC_IS_QUEUED(task)) + if (!RPC_IS_QUEUED(task)) { + cond_resched(); continue; - - /* - * Signalled tasks should exit rather than sleep. - */ - if (RPC_SIGNALLED(task)) { - task->tk_rpc_status = -ERESTARTSYS; - rpc_exit(task, -ERESTARTSYS); } /* @@ -934,16 +971,22 @@ static void __rpc_execute(struct rpc_task *task) spin_unlock(&queue->lock); continue; } + /* Wake up any task that has an exit status */ + if (READ_ONCE(task->tk_rpc_status) != 0) { + rpc_wake_up_task_queue_locked(queue, task); + spin_unlock(&queue->lock); + continue; + } rpc_clear_running(task); spin_unlock(&queue->lock); if (task_is_async) - return; + goto out; /* sync task: sleep here */ - dprintk("RPC: %5u sync task going to sleep\n", task->tk_pid); + trace_rpc_task_sync_sleep(task, task->tk_action); status = out_of_line_wait_on_bit(&task->tk_runstate, RPC_TASK_QUEUED, rpc_wait_bit_killable, - TASK_KILLABLE); + TASK_KILLABLE|TASK_FREEZABLE); if (status < 0) { /* * When a sync task receives a signal, it exits with @@ -951,18 +994,15 @@ static void __rpc_execute(struct rpc_task *task) * clean up after sleeping on some queue, we don't * break the loop here, but go around once more. */ - trace_rpc_task_signalled(task, task->tk_action); - set_bit(RPC_TASK_SIGNALLED, &task->tk_runstate); - task->tk_rpc_status = -ERESTARTSYS; - rpc_exit(task, -ERESTARTSYS); + rpc_signal_task(task); } - dprintk("RPC: %5u sync task resuming\n", task->tk_pid); + trace_rpc_task_sync_wake(task, task->tk_action); } - dprintk("RPC: %5u return %d, status %d\n", task->tk_pid, status, - task->tk_status); /* Release all resources associated with the task */ rpc_release_task(task); +out: + current_restore_flags(pflags, PF_MEMALLOC); } /* @@ -980,8 +1020,11 @@ void rpc_execute(struct rpc_task *task) rpc_set_active(task); rpc_make_runnable(rpciod_workqueue, task); - if (!is_async) + if (!is_async) { + unsigned int pflags = memalloc_nofs_save(); __rpc_execute(task); + memalloc_nofs_restore(pflags); + } } static void rpc_async_schedule(struct work_struct *work) @@ -1014,23 +1057,21 @@ int rpc_malloc(struct rpc_task *task) struct rpc_rqst *rqst = task->tk_rqstp; size_t size = rqst->rq_callsize + rqst->rq_rcvsize; struct rpc_buffer *buf; - gfp_t gfp = GFP_NOFS; - - if (RPC_IS_SWAPPER(task)) - gfp = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN; + gfp_t gfp = rpc_task_gfp_mask(); size += sizeof(struct rpc_buffer); - if (size <= RPC_BUFFER_MAXSIZE) - buf = mempool_alloc(rpc_buffer_mempool, gfp); - else + if (size <= RPC_BUFFER_MAXSIZE) { + buf = kmem_cache_alloc(rpc_buffer_slabp, gfp); + /* Reach for the mempool if dynamic allocation fails */ + if (!buf && RPC_IS_ASYNC(task)) + buf = mempool_alloc(rpc_buffer_mempool, GFP_NOWAIT); + } else buf = kmalloc(size, gfp); if (!buf) return -ENOMEM; buf->len = size; - dprintk("RPC: %5u allocated buffer of size %zu at %p\n", - task->tk_pid, size, buf); rqst->rq_buffer = buf->data; rqst->rq_rbuffer = (char *)rqst->rq_buffer + rqst->rq_callsize; return 0; @@ -1051,9 +1092,6 @@ void rpc_free(struct rpc_task *task) buf = container_of(buffer, struct rpc_buffer, data); size = buf->len; - dprintk("RPC: freeing buffer of size %zu at %p\n", - size, buf); - if (size <= RPC_BUFFER_MAXSIZE) mempool_free(buf, rpc_buffer_mempool); else @@ -1088,15 +1126,16 @@ static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *ta task->tk_action = rpc_prepare_task; rpc_init_task_statistics(task); - - dprintk("RPC: new task initialized, procpid %u\n", - task_pid_nr(current)); } -static struct rpc_task * -rpc_alloc_task(void) +static struct rpc_task *rpc_alloc_task(void) { - return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS); + struct rpc_task *task; + + task = kmem_cache_alloc(rpc_task_slabp, rpc_task_gfp_mask()); + if (task) + return task; + return mempool_alloc(rpc_task_mempool, GFP_NOWAIT); } /* @@ -1109,12 +1148,16 @@ struct rpc_task *rpc_new_task(const struct rpc_task_setup *setup_data) if (task == NULL) { task = rpc_alloc_task(); + if (task == NULL) { + rpc_release_calldata(setup_data->callback_ops, + setup_data->callback_data); + return ERR_PTR(-ENOMEM); + } flags = RPC_TASK_DYNAMIC; } rpc_init_task(task, setup_data); task->tk_flags |= flags; - dprintk("RPC: allocated task %p\n", task); return task; } @@ -1144,10 +1187,8 @@ static void rpc_free_task(struct rpc_task *task) put_rpccred(task->tk_op_cred); rpc_release_calldata(task->tk_ops, task->tk_calldata); - if (tk_flags & RPC_TASK_DYNAMIC) { - dprintk("RPC: %5u freeing task\n", task->tk_pid); + if (tk_flags & RPC_TASK_DYNAMIC) mempool_free(task, rpc_task_mempool); - } } static void rpc_async_release(struct work_struct *work) @@ -1162,7 +1203,8 @@ static void rpc_release_resources_task(struct rpc_task *task) { xprt_release(task); if (task->tk_msg.rpc_cred) { - put_cred(task->tk_msg.rpc_cred); + if (!(task->tk_flags & RPC_TASK_CRED_NOREF)) + put_cred(task->tk_msg.rpc_cred); task->tk_msg.rpc_cred = NULL; } rpc_task_release_client(task); @@ -1200,8 +1242,6 @@ EXPORT_SYMBOL_GPL(rpc_put_task_async); static void rpc_release_task(struct rpc_task *task) { - dprintk("RPC: %5u release task\n", task->tk_pid); - WARN_ON_ONCE(RPC_IS_QUEUED(task)); rpc_release_resources_task(task); @@ -1242,13 +1282,11 @@ static int rpciod_start(void) /* * Create the rpciod thread and wait for it to start. */ - dprintk("RPC: creating workqueue rpciod\n"); wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM | WQ_UNBOUND, 0); if (!wq) goto out_failed; rpciod_workqueue = wq; - /* Note: highpri because network receive is latency sensitive */ - wq = alloc_workqueue("xprtiod", WQ_UNBOUND|WQ_MEM_RECLAIM|WQ_HIGHPRI, 0); + wq = alloc_workqueue("xprtiod", WQ_UNBOUND | WQ_MEM_RECLAIM, 0); if (!wq) goto free_rpciod; xprtiod_workqueue = wq; @@ -1267,7 +1305,6 @@ static void rpciod_stop(void) if (rpciod_workqueue == NULL) return; - dprintk("RPC: destroying workqueue rpciod\n"); wq = rpciod_workqueue; rpciod_workqueue = NULL; |