diff options
Diffstat (limited to '')
-rw-r--r-- | net/sunrpc/sched.c | 125 |
1 files changed, 85 insertions, 40 deletions
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index e2c835482791..be587a308e05 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -57,6 +57,21 @@ struct workqueue_struct *rpciod_workqueue __read_mostly; struct workqueue_struct *xprtiod_workqueue __read_mostly; EXPORT_SYMBOL_GPL(xprtiod_workqueue); +gfp_t rpc_task_gfp_mask(void) +{ + if (current->flags & PF_WQ_WORKER) + return GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN; + return GFP_KERNEL; +} +EXPORT_SYMBOL_GPL(rpc_task_gfp_mask); + +bool rpc_task_set_rpc_status(struct rpc_task *task, int rpc_status) +{ + if (cmpxchg(&task->tk_rpc_status, 0, rpc_status) == 0) + return true; + return false; +} + unsigned long rpc_task_timeout(const struct rpc_task *task) { @@ -186,11 +201,6 @@ static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue, /* * Add new request to wait queue. - * - * Swapper tasks always get inserted at the head of the queue. - * This should avoid many nasty memory deadlocks and hopefully - * improve overall performance. - * Everyone else gets appended to the queue to ensure proper FIFO behavior. */ static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task, @@ -199,8 +209,6 @@ static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, INIT_LIST_HEAD(&task->u.tk_wait.timer_list); if (RPC_IS_PRIORITY(queue)) __rpc_add_wait_queue_priority(queue, task, queue_priority); - else if (RPC_IS_SWAPPER(task)) - list_add(&task->u.tk_wait.list, &queue->tasks[0]); else list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]); task->tk_waitqueue = queue; @@ -268,7 +276,7 @@ EXPORT_SYMBOL_GPL(rpc_destroy_wait_queue); static int rpc_wait_bit_killable(struct wait_bit_key *key, int mode) { - freezable_schedule_unsafe(); + schedule(); if (signal_pending_state(mode, current)) return -ERESTARTSYS; return 0; @@ -332,14 +340,12 @@ static int rpc_complete_task(struct rpc_task *task) * to enforce taking of the wq->lock and hence avoid races with * rpc_complete_task(). */ -int __rpc_wait_for_completion_task(struct rpc_task *task, wait_bit_action_f *action) +int rpc_wait_for_completion_task(struct rpc_task *task) { - if (action == NULL) - action = rpc_wait_bit_killable; return out_of_line_wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE, - action, TASK_KILLABLE); + rpc_wait_bit_killable, TASK_KILLABLE|TASK_FREEZABLE_UNSAFE); } -EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task); +EXPORT_SYMBOL_GPL(rpc_wait_for_completion_task); /* * Make an RPC task runnable. @@ -854,12 +860,25 @@ void rpc_signal_task(struct rpc_task *task) if (!RPC_IS_ACTIVATED(task)) return; + if (!rpc_task_set_rpc_status(task, -ERESTARTSYS)) + return; trace_rpc_task_signalled(task, task->tk_action); set_bit(RPC_TASK_SIGNALLED, &task->tk_runstate); smp_mb__after_atomic(); queue = READ_ONCE(task->tk_waitqueue); if (queue) - rpc_wake_up_queued_task_set_status(queue, task, -ERESTARTSYS); + rpc_wake_up_queued_task(queue, task); +} + +void rpc_task_try_cancel(struct rpc_task *task, int error) +{ + struct rpc_wait_queue *queue; + + if (!rpc_task_set_rpc_status(task, error)) + return; + queue = READ_ONCE(task->tk_waitqueue); + if (queue) + rpc_wake_up_queued_task(queue, task); } void rpc_exit(struct rpc_task *task, int status) @@ -876,6 +895,15 @@ void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata) ops->rpc_release(calldata); } +static bool xprt_needs_memalloc(struct rpc_xprt *xprt, struct rpc_task *tk) +{ + if (!xprt) + return false; + if (!atomic_read(&xprt->swapper)) + return false; + return test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == tk; +} + /* * This is the RPC `scheduler' (or rather, the finite state machine). */ @@ -884,6 +912,7 @@ static void __rpc_execute(struct rpc_task *task) struct rpc_wait_queue *queue; int task_is_async = RPC_IS_ASYNC(task); int status = 0; + unsigned long pflags = current->flags; WARN_ON_ONCE(RPC_IS_QUEUED(task)); if (RPC_IS_QUEUED(task)) @@ -896,16 +925,26 @@ static void __rpc_execute(struct rpc_task *task) * Perform the next FSM step or a pending callback. * * tk_action may be NULL if the task has been killed. - * In particular, note that rpc_killall_tasks may - * do this at any time, so beware when dereferencing. */ do_action = task->tk_action; + /* Tasks with an RPC error status should exit */ + if (do_action != rpc_exit_task && + (status = READ_ONCE(task->tk_rpc_status)) != 0) { + task->tk_status = status; + if (do_action != NULL) + do_action = rpc_exit_task; + } + /* Callbacks override all actions */ if (task->tk_callback) { do_action = task->tk_callback; task->tk_callback = NULL; } if (!do_action) break; + if (RPC_IS_SWAPPER(task) || + xprt_needs_memalloc(task->tk_xprt, task)) + current->flags |= PF_MEMALLOC; + trace_rpc_task_run_action(task, do_action); do_action(task); @@ -918,14 +957,6 @@ static void __rpc_execute(struct rpc_task *task) } /* - * Signalled tasks should exit rather than sleep. - */ - if (RPC_SIGNALLED(task)) { - task->tk_rpc_status = -ERESTARTSYS; - rpc_exit(task, -ERESTARTSYS); - } - - /* * The queue->lock protects against races with * rpc_make_runnable(). * @@ -940,16 +971,22 @@ static void __rpc_execute(struct rpc_task *task) spin_unlock(&queue->lock); continue; } + /* Wake up any task that has an exit status */ + if (READ_ONCE(task->tk_rpc_status) != 0) { + rpc_wake_up_task_queue_locked(queue, task); + spin_unlock(&queue->lock); + continue; + } rpc_clear_running(task); spin_unlock(&queue->lock); if (task_is_async) - return; + goto out; /* sync task: sleep here */ trace_rpc_task_sync_sleep(task, task->tk_action); status = out_of_line_wait_on_bit(&task->tk_runstate, RPC_TASK_QUEUED, rpc_wait_bit_killable, - TASK_KILLABLE); + TASK_KILLABLE|TASK_FREEZABLE); if (status < 0) { /* * When a sync task receives a signal, it exits with @@ -957,16 +994,15 @@ static void __rpc_execute(struct rpc_task *task) * clean up after sleeping on some queue, we don't * break the loop here, but go around once more. */ - trace_rpc_task_signalled(task, task->tk_action); - set_bit(RPC_TASK_SIGNALLED, &task->tk_runstate); - task->tk_rpc_status = -ERESTARTSYS; - rpc_exit(task, -ERESTARTSYS); + rpc_signal_task(task); } trace_rpc_task_sync_wake(task, task->tk_action); } /* Release all resources associated with the task */ rpc_release_task(task); +out: + current_restore_flags(pflags, PF_MEMALLOC); } /* @@ -1021,15 +1057,15 @@ int rpc_malloc(struct rpc_task *task) struct rpc_rqst *rqst = task->tk_rqstp; size_t size = rqst->rq_callsize + rqst->rq_rcvsize; struct rpc_buffer *buf; - gfp_t gfp = GFP_NOFS; - - if (RPC_IS_SWAPPER(task)) - gfp = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN; + gfp_t gfp = rpc_task_gfp_mask(); size += sizeof(struct rpc_buffer); - if (size <= RPC_BUFFER_MAXSIZE) - buf = mempool_alloc(rpc_buffer_mempool, gfp); - else + if (size <= RPC_BUFFER_MAXSIZE) { + buf = kmem_cache_alloc(rpc_buffer_slabp, gfp); + /* Reach for the mempool if dynamic allocation fails */ + if (!buf && RPC_IS_ASYNC(task)) + buf = mempool_alloc(rpc_buffer_mempool, GFP_NOWAIT); + } else buf = kmalloc(size, gfp); if (!buf) @@ -1092,10 +1128,14 @@ static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *ta rpc_init_task_statistics(task); } -static struct rpc_task * -rpc_alloc_task(void) +static struct rpc_task *rpc_alloc_task(void) { - return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS); + struct rpc_task *task; + + task = kmem_cache_alloc(rpc_task_slabp, rpc_task_gfp_mask()); + if (task) + return task; + return mempool_alloc(rpc_task_mempool, GFP_NOWAIT); } /* @@ -1108,6 +1148,11 @@ struct rpc_task *rpc_new_task(const struct rpc_task_setup *setup_data) if (task == NULL) { task = rpc_alloc_task(); + if (task == NULL) { + rpc_release_calldata(setup_data->callback_ops, + setup_data->callback_data); + return ERR_PTR(-ENOMEM); + } flags = RPC_TASK_DYNAMIC; } |