summaryrefslogtreecommitdiffstats
path: root/sys/kern/kern_task.c
diff options
context:
space:
mode:
authordlg <dlg@openbsd.org>2013-10-29 04:23:16 +0000
committerdlg <dlg@openbsd.org>2013-10-29 04:23:16 +0000
commite72c42a1e4b08d4d98285e3eb4931823e8b00c81 (patch)
tree1a58c1afe8441c646547275e1e25d817762f6cb2 /sys/kern/kern_task.c
parentUnlock the vnode while calling a device's d_close routine, except when (diff)
downloadwireguard-openbsd-e72c42a1e4b08d4d98285e3eb4931823e8b00c81.tar.xz
wireguard-openbsd-e72c42a1e4b08d4d98285e3eb4931823e8b00c81.zip
introduce tasks and taskqs as an alternative to workqs.
tasks are modelled on the timeout api, so users familiar with timeout_set, timeout_add, and timeout_del will already know what to expect from task_set, task_add, and task_del. i wrote this because workq_add_task can fail in the place you actually need it, and there arent any good ways of recovering at that point. workq_queue_task was added to try and help, but required external state to be stored for users of that api to know whether something was already queued or not. workqs also didnt provide a way to cancel or remove work. this has been percolating with a bunch of people. putting it in as i wrote it so i can apply their feedback to the code with the history kept in cvs.
Diffstat (limited to 'sys/kern/kern_task.c')
-rw-r--r--sys/kern/kern_task.c258
1 files changed, 258 insertions, 0 deletions
diff --git a/sys/kern/kern_task.c b/sys/kern/kern_task.c
new file mode 100644
index 00000000000..f4d18b54b55
--- /dev/null
+++ b/sys/kern/kern_task.c
@@ -0,0 +1,258 @@
+/* $OpenBSD: kern_task.c,v 1.1 2013/10/29 04:23:16 dlg Exp $ */
+
+/*
+ * Copyright (c) 2013 David Gwynne <dlg@openbsd.org>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include <sys/param.h>
+#include <sys/systm.h>
+#include <sys/malloc.h>
+#include <sys/pool.h>
+#include <sys/queue.h>
+#include <sys/mutex.h>
+#include <sys/kthread.h>
+#include <sys/task.h>
+
+#define TASK_ONQUEUE 1
+
+struct taskq {
+ enum {
+ TQ_S_CREATED,
+ TQ_S_RUNNING,
+ TQ_S_DESTROYED
+ } tq_state;
+ u_int tq_running;
+ u_int tq_nthreads;
+ const char *tq_name;
+
+ struct mutex tq_mtx;
+ TAILQ_HEAD(, task) tq_worklist;
+};
+
+struct taskq taskq_sys = {
+ TQ_S_CREATED,
+ 0,
+ 1,
+ "systq",
+ MUTEX_INITIALIZER(IPL_HIGH),
+ TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist)
+};
+
+void taskq_init(void); /* called in init_main.c */
+void taskq_create_thread(void *);
+int taskq_next_work(struct taskq *, struct task *);
+void taskq_thread(void *);
+
+void
+taskq_init(void)
+{
+ kthread_create_deferred(taskq_create_thread, &taskq_sys);
+}
+
+struct taskq *
+taskq_systq(void)
+{
+ return (&taskq_sys);
+}
+
+struct taskq *
+taskq_create(const char *name, u_int nthreads, int ipl)
+{
+ struct taskq *tq;
+
+ tq = malloc(sizeof(*tq), M_DEVBUF, M_NOWAIT);
+ if (tq == NULL)
+ return (NULL);
+
+ tq->tq_state = TQ_S_CREATED;
+ tq->tq_running = 0;
+ tq->tq_nthreads = nthreads;
+ tq->tq_name = name;
+
+ mtx_init(&tq->tq_mtx, ipl);
+ TAILQ_INIT(&tq->tq_worklist);
+
+ /* try to create a thread to guarantee that tasks will be serviced */
+ kthread_create_deferred(taskq_create_thread, tq);
+
+ return (tq);
+}
+
+void
+taskq_destroy(struct taskq *tq)
+{
+ mtx_enter(&tq->tq_mtx);
+ switch (tq->tq_state) {
+ case TQ_S_CREATED:
+ /* tq is still referenced by taskq_create_thread */
+ tq->tq_state = TQ_S_DESTROYED;
+ mtx_leave(&tq->tq_mtx);
+ return;
+
+ case TQ_S_RUNNING:
+ tq->tq_state = TQ_S_DESTROYED;
+ break;
+
+ default:
+ panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state);
+ }
+
+ while (tq->tq_running > 0) {
+ wakeup(tq);
+ msleep(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy", 0);
+ }
+ mtx_leave(&tq->tq_mtx);
+
+ free(tq, M_DEVBUF);
+}
+
+void
+taskq_create_thread(void *arg)
+{
+ struct taskq *tq = arg;
+ int rv;
+
+ mtx_enter(&tq->tq_mtx);
+
+ switch (tq->tq_state) {
+ case TQ_S_DESTROYED:
+ mtx_leave(&tq->tq_mtx);
+ free(tq, M_DEVBUF);
+ return;
+
+ case TQ_S_CREATED:
+ tq->tq_state = TQ_S_RUNNING;
+ break;
+
+ default:
+ panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state);
+ }
+
+ do {
+ tq->tq_running++;
+ mtx_leave(&tq->tq_mtx);
+
+ rv = kthread_create(taskq_thread, tq, NULL, "%s", tq->tq_name);
+
+ mtx_enter(&tq->tq_mtx);
+ if (rv != 0) {
+ printf("unable to create thread for \"%s\" taskq\n",
+ tq->tq_name);
+
+ tq->tq_running--;
+ /* could have been destroyed during kthread_create */
+ if (tq->tq_state == TQ_S_DESTROYED &&
+ tq->tq_running == 0)
+ wakeup_one(&tq->tq_running);
+ break;
+ }
+ } while (tq->tq_running < tq->tq_nthreads);
+
+ mtx_leave(&tq->tq_mtx);
+}
+
+void
+task_set(struct task *t, void (*fn)(void *, void *), void *arg1, void *arg2)
+{
+ t->t_func = fn;
+ t->t_arg1 = arg1;
+ t->t_arg2 = arg2;
+
+ t->t_flags = 0;
+}
+
+int
+task_add(struct taskq *tq, struct task *w)
+{
+ int rv = 0;
+
+ mtx_enter(&tq->tq_mtx);
+ if (!ISSET(w->t_flags, TASK_ONQUEUE)) {
+ rv = 1;
+ SET(w->t_flags, TASK_ONQUEUE);
+ TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry);
+ }
+ mtx_leave(&tq->tq_mtx);
+
+ if (rv)
+ wakeup_one(tq);
+
+ return (rv);
+}
+
+int
+task_del(struct taskq *tq, struct task *w)
+{
+ int rv = 0;
+
+ mtx_enter(&tq->tq_mtx);
+ if (ISSET(w->t_flags, TASK_ONQUEUE)) {
+ rv = 1;
+ CLR(w->t_flags, TASK_ONQUEUE);
+ TAILQ_REMOVE(&tq->tq_worklist, w, t_entry);
+ }
+ mtx_leave(&tq->tq_mtx);
+
+ return (rv);
+}
+
+int
+taskq_next_work(struct taskq *tq, struct task *work)
+{
+ struct task *next;
+
+ mtx_enter(&tq->tq_mtx);
+ while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) {
+ if (tq->tq_state != TQ_S_RUNNING) {
+ mtx_leave(&tq->tq_mtx);
+ return (0);
+ }
+
+ msleep(tq, &tq->tq_mtx, PWAIT, "bored", 0);
+ }
+
+ TAILQ_REMOVE(&tq->tq_worklist, next, t_entry);
+ CLR(next->t_flags, TASK_ONQUEUE);
+
+ *work = *next; /* copy to caller to avoid races */
+
+ next = TAILQ_FIRST(&tq->tq_worklist);
+ mtx_leave(&tq->tq_mtx);
+
+ if (next != NULL)
+ wakeup_one(tq);
+
+ return (1);
+}
+
+void
+taskq_thread(void *xtq)
+{
+ struct taskq *tq = xtq;
+ struct task work;
+ int last;
+
+ while (taskq_next_work(tq, &work))
+ (*work.t_func)(work.t_arg1, work.t_arg2);
+
+ mtx_enter(&tq->tq_mtx);
+ last = (--tq->tq_running == 0);
+ mtx_leave(&tq->tq_mtx);
+
+ if (last)
+ wakeup_one(&tq->tq_running);
+
+ kthread_exit(0);
+}