diff options
Diffstat (limited to 'usr.sbin/smtpd/scheduler.c')
-rw-r--r-- | usr.sbin/smtpd/scheduler.c | 618 |
1 files changed, 618 insertions, 0 deletions
diff --git a/usr.sbin/smtpd/scheduler.c b/usr.sbin/smtpd/scheduler.c new file mode 100644 index 00000000..ea70a83d --- /dev/null +++ b/usr.sbin/smtpd/scheduler.c @@ -0,0 +1,618 @@ +/* $OpenBSD: scheduler.c,v 1.60 2018/12/30 23:09:58 guenther Exp $ */ + +/* + * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org> + * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> + * Copyright (c) 2008-2009 Jacek Masiulaniec <jacekm@dobremiasto.net> + * Copyright (c) 2012 Eric Faurot <eric@openbsd.org> + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include "includes.h" + +#include <sys/types.h> +#include <sys/queue.h> +#include <sys/tree.h> +#include <sys/socket.h> +#include <sys/stat.h> + +#include <ctype.h> +#include <dirent.h> +#include <err.h> +#include <errno.h> +#include <event.h> +#include <grp.h> /* needed for setgroups */ +#include <imsg.h> +#include <inttypes.h> +#include <pwd.h> +#include <signal.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <time.h> +#include <unistd.h> +#include <limits.h> + +#include "smtpd.h" +#include "log.h" + +static void scheduler_imsg(struct mproc *, struct imsg *); +static void scheduler_shutdown(void); +static void scheduler_reset_events(void); +static void scheduler_timeout(int, short, void *); + +static struct scheduler_backend *backend = NULL; +static struct event ev; +static size_t ninflight = 0; +static int *types; +static uint64_t *evpids; +static uint32_t *msgids; +static struct evpstate *state; + +extern const char *backend_scheduler; + +void +scheduler_imsg(struct mproc *p, struct imsg *imsg) +{ + struct bounce_req_msg req; + struct envelope evp; + struct scheduler_info si; + struct msg m; + uint64_t evpid, id, holdq; + uint32_t msgid; + uint32_t inflight; + size_t n, i; + time_t timestamp; + int v, r, type; + + if (imsg == NULL) + scheduler_shutdown(); + + switch (imsg->hdr.type) { + + case IMSG_QUEUE_ENVELOPE_SUBMIT: + m_msg(&m, imsg); + m_get_envelope(&m, &evp); + m_end(&m); + log_trace(TRACE_SCHEDULER, + "scheduler: inserting evp:%016" PRIx64, evp.id); + scheduler_info(&si, &evp); + stat_increment("scheduler.envelope.incoming", 1); + backend->insert(&si); + return; + + case IMSG_QUEUE_MESSAGE_COMMIT: + m_msg(&m, imsg); + m_get_msgid(&m, &msgid); + m_end(&m); + log_trace(TRACE_SCHEDULER, + "scheduler: committing msg:%08" PRIx32, msgid); + n = backend->commit(msgid); + stat_decrement("scheduler.envelope.incoming", n); + stat_increment("scheduler.envelope", n); + scheduler_reset_events(); + return; + + case IMSG_QUEUE_DISCOVER_EVPID: + m_msg(&m, imsg); + m_get_envelope(&m, &evp); + m_end(&m); + r = backend->query(evp.id); + if (r) { + log_debug("debug: scheduler: evp:%016" PRIx64 + " already scheduled", evp.id); + return; + } + log_trace(TRACE_SCHEDULER, + "scheduler: discovering evp:%016" PRIx64, evp.id); + scheduler_info(&si, &evp); + stat_increment("scheduler.envelope.incoming", 1); + backend->insert(&si); + return; + + case IMSG_QUEUE_DISCOVER_MSGID: + m_msg(&m, imsg); + m_get_msgid(&m, &msgid); + m_end(&m); + r = backend->query(msgid); + if (r) { + log_debug("debug: scheduler: msgid:%08" PRIx32 + " already scheduled", msgid); + return; + } + log_trace(TRACE_SCHEDULER, + "scheduler: committing msg:%08" PRIx32, msgid); + n = backend->commit(msgid); + stat_decrement("scheduler.envelope.incoming", n); + stat_increment("scheduler.envelope", n); + scheduler_reset_events(); + return; + + case IMSG_QUEUE_MESSAGE_ROLLBACK: + m_msg(&m, imsg); + m_get_msgid(&m, &msgid); + m_end(&m); + log_trace(TRACE_SCHEDULER, "scheduler: aborting msg:%08" PRIx32, + msgid); + n = backend->rollback(msgid); + stat_decrement("scheduler.envelope.incoming", n); + scheduler_reset_events(); + return; + + case IMSG_QUEUE_ENVELOPE_REMOVE: + m_msg(&m, imsg); + m_get_evpid(&m, &evpid); + m_get_u32(&m, &inflight); + m_end(&m); + log_trace(TRACE_SCHEDULER, + "scheduler: queue requested removal of evp:%016" PRIx64, + evpid); + stat_decrement("scheduler.envelope", 1); + if (!inflight) + backend->remove(evpid); + else { + backend->delete(evpid); + ninflight -= 1; + stat_decrement("scheduler.envelope.inflight", 1); + } + + scheduler_reset_events(); + return; + + case IMSG_QUEUE_ENVELOPE_ACK: + m_msg(&m, imsg); + m_get_evpid(&m, &evpid); + m_end(&m); + log_trace(TRACE_SCHEDULER, + "scheduler: queue ack removal of evp:%016" PRIx64, + evpid); + ninflight -= 1; + stat_decrement("scheduler.envelope.inflight", 1); + scheduler_reset_events(); + return; + + case IMSG_QUEUE_DELIVERY_OK: + m_msg(&m, imsg); + m_get_evpid(&m, &evpid); + m_end(&m); + log_trace(TRACE_SCHEDULER, + "scheduler: deleting evp:%016" PRIx64 " (ok)", evpid); + backend->delete(evpid); + ninflight -= 1; + stat_increment("scheduler.delivery.ok", 1); + stat_decrement("scheduler.envelope.inflight", 1); + stat_decrement("scheduler.envelope", 1); + scheduler_reset_events(); + return; + + case IMSG_QUEUE_DELIVERY_TEMPFAIL: + m_msg(&m, imsg); + m_get_envelope(&m, &evp); + m_end(&m); + log_trace(TRACE_SCHEDULER, + "scheduler: updating evp:%016" PRIx64, evp.id); + scheduler_info(&si, &evp); + backend->update(&si); + ninflight -= 1; + stat_increment("scheduler.delivery.tempfail", 1); + stat_decrement("scheduler.envelope.inflight", 1); + + for (i = 0; i < MAX_BOUNCE_WARN; i++) { + if (env->sc_bounce_warn[i] == 0) + break; + timestamp = si.creation + env->sc_bounce_warn[i]; + if (si.nexttry >= timestamp && + si.lastbounce < timestamp) { + req.evpid = evp.id; + req.timestamp = timestamp; + req.bounce.type = B_DELAYED; + req.bounce.delay = env->sc_bounce_warn[i]; + req.bounce.ttl = si.ttl; + m_compose(p, IMSG_SCHED_ENVELOPE_BOUNCE, 0, 0, -1, + &req, sizeof req); + break; + } + } + scheduler_reset_events(); + return; + + case IMSG_QUEUE_DELIVERY_PERMFAIL: + m_msg(&m, imsg); + m_get_evpid(&m, &evpid); + m_end(&m); + log_trace(TRACE_SCHEDULER, + "scheduler: deleting evp:%016" PRIx64 " (fail)", evpid); + backend->delete(evpid); + ninflight -= 1; + stat_increment("scheduler.delivery.permfail", 1); + stat_decrement("scheduler.envelope.inflight", 1); + stat_decrement("scheduler.envelope", 1); + scheduler_reset_events(); + return; + + case IMSG_QUEUE_DELIVERY_LOOP: + m_msg(&m, imsg); + m_get_evpid(&m, &evpid); + m_end(&m); + log_trace(TRACE_SCHEDULER, + "scheduler: deleting evp:%016" PRIx64 " (loop)", evpid); + backend->delete(evpid); + ninflight -= 1; + stat_increment("scheduler.delivery.loop", 1); + stat_decrement("scheduler.envelope.inflight", 1); + stat_decrement("scheduler.envelope", 1); + scheduler_reset_events(); + return; + + case IMSG_QUEUE_HOLDQ_HOLD: + m_msg(&m, imsg); + m_get_evpid(&m, &evpid); + m_get_id(&m, &holdq); + m_end(&m); + log_trace(TRACE_SCHEDULER, + "scheduler: holding evp:%016" PRIx64 " on %016" PRIx64, + evpid, holdq); + backend->hold(evpid, holdq); + ninflight -= 1; + stat_decrement("scheduler.envelope.inflight", 1); + scheduler_reset_events(); + return; + + case IMSG_QUEUE_HOLDQ_RELEASE: + m_msg(&m, imsg); + m_get_int(&m, &type); + m_get_id(&m, &holdq); + m_get_int(&m, &r); + m_end(&m); + log_trace(TRACE_SCHEDULER, + "scheduler: releasing %d on holdq (%d, %016" PRIx64 ")", + r, type, holdq); + backend->release(type, holdq, r); + scheduler_reset_events(); + return; + + case IMSG_CTL_PAUSE_MDA: + log_trace(TRACE_SCHEDULER, "scheduler: pausing mda"); + env->sc_flags |= SMTPD_MDA_PAUSED; + return; + + case IMSG_CTL_RESUME_MDA: + log_trace(TRACE_SCHEDULER, "scheduler: resuming mda"); + env->sc_flags &= ~SMTPD_MDA_PAUSED; + scheduler_reset_events(); + return; + + case IMSG_CTL_PAUSE_MTA: + log_trace(TRACE_SCHEDULER, "scheduler: pausing mta"); + env->sc_flags |= SMTPD_MTA_PAUSED; + return; + + case IMSG_CTL_RESUME_MTA: + log_trace(TRACE_SCHEDULER, "scheduler: resuming mta"); + env->sc_flags &= ~SMTPD_MTA_PAUSED; + scheduler_reset_events(); + return; + + case IMSG_CTL_VERBOSE: + m_msg(&m, imsg); + m_get_int(&m, &v); + m_end(&m); + log_setverbose(v); + return; + + case IMSG_CTL_PROFILE: + m_msg(&m, imsg); + m_get_int(&m, &v); + m_end(&m); + profiling = v; + return; + + case IMSG_CTL_LIST_MESSAGES: + msgid = *(uint32_t *)(imsg->data); + n = backend->messages(msgid, msgids, env->sc_scheduler_max_msg_batch_size); + m_compose(p, IMSG_CTL_LIST_MESSAGES, imsg->hdr.peerid, 0, -1, + msgids, n * sizeof (*msgids)); + return; + + case IMSG_CTL_LIST_ENVELOPES: + id = *(uint64_t *)(imsg->data); + n = backend->envelopes(id, state, env->sc_scheduler_max_evp_batch_size); + for (i = 0; i < n; i++) { + m_create(p_queue, IMSG_CTL_LIST_ENVELOPES, + imsg->hdr.peerid, 0, -1); + m_add_evpid(p_queue, state[i].evpid); + m_add_int(p_queue, state[i].flags); + m_add_time(p_queue, state[i].time); + m_close(p_queue); + } + m_compose(p_queue, IMSG_CTL_LIST_ENVELOPES, + imsg->hdr.peerid, 0, -1, NULL, 0); + return; + + case IMSG_CTL_SCHEDULE: + id = *(uint64_t *)(imsg->data); + if (id <= 0xffffffffL) + log_debug("debug: scheduler: " + "scheduling msg:%08" PRIx64, id); + else + log_debug("debug: scheduler: " + "scheduling evp:%016" PRIx64, id); + r = backend->schedule(id); + scheduler_reset_events(); + m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid, + 0, -1, NULL, 0); + return; + + case IMSG_QUEUE_ENVELOPE_SCHEDULE: + id = *(uint64_t *)(imsg->data); + backend->schedule(id); + scheduler_reset_events(); + return; + + case IMSG_CTL_REMOVE: + id = *(uint64_t *)(imsg->data); + if (id <= 0xffffffffL) + log_debug("debug: scheduler: " + "removing msg:%08" PRIx64, id); + else + log_debug("debug: scheduler: " + "removing evp:%016" PRIx64, id); + r = backend->remove(id); + scheduler_reset_events(); + m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid, + 0, -1, NULL, 0); + return; + + case IMSG_CTL_PAUSE_EVP: + id = *(uint64_t *)(imsg->data); + if (id <= 0xffffffffL) + log_debug("debug: scheduler: " + "suspending msg:%08" PRIx64, id); + else + log_debug("debug: scheduler: " + "suspending evp:%016" PRIx64, id); + r = backend->suspend(id); + scheduler_reset_events(); + m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid, + 0, -1, NULL, 0); + return; + + case IMSG_CTL_RESUME_EVP: + id = *(uint64_t *)(imsg->data); + if (id <= 0xffffffffL) + log_debug("debug: scheduler: " + "resuming msg:%08" PRIx64, id); + else + log_debug("debug: scheduler: " + "resuming evp:%016" PRIx64, id); + r = backend->resume(id); + scheduler_reset_events(); + m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid, + 0, -1, NULL, 0); + return; + } + + errx(1, "scheduler_imsg: unexpected %s imsg", + imsg_to_str(imsg->hdr.type)); +} + +static void +scheduler_shutdown(void) +{ + log_debug("debug: scheduler agent exiting"); + _exit(0); +} + +static void +scheduler_reset_events(void) +{ + struct timeval tv; + + evtimer_del(&ev); + tv.tv_sec = 0; + tv.tv_usec = 0; + evtimer_add(&ev, &tv); +} + +int +scheduler(void) +{ + struct passwd *pw; + + backend = scheduler_backend_lookup(backend_scheduler); + if (backend == NULL) + errx(1, "cannot find scheduler backend \"%s\"", + backend_scheduler); + + purge_config(PURGE_EVERYTHING & ~PURGE_DISPATCHERS); + + if ((pw = getpwnam(SMTPD_USER)) == NULL) + fatalx("unknown user " SMTPD_USER); + + config_process(PROC_SCHEDULER); + + backend->init(backend_scheduler); + + if (chroot(PATH_CHROOT) == -1) + fatal("scheduler: chroot"); + if (chdir("/") == -1) + fatal("scheduler: chdir(\"/\")"); + + if (setgroups(1, &pw->pw_gid) || + setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) || + setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid)) + fatal("scheduler: cannot drop privileges"); + + evpids = xcalloc(env->sc_scheduler_max_schedule, sizeof *evpids); + types = xcalloc(env->sc_scheduler_max_schedule, sizeof *types); + msgids = xcalloc(env->sc_scheduler_max_msg_batch_size, sizeof *msgids); + state = xcalloc(env->sc_scheduler_max_evp_batch_size, sizeof *state); + + imsg_callback = scheduler_imsg; + event_init(); + + signal(SIGINT, SIG_IGN); + signal(SIGTERM, SIG_IGN); + signal(SIGPIPE, SIG_IGN); + signal(SIGHUP, SIG_IGN); + + config_peer(PROC_CONTROL); + config_peer(PROC_QUEUE); + + evtimer_set(&ev, scheduler_timeout, NULL); + scheduler_reset_events(); + +#if HAVE_PLEDGE + if (pledge("stdio", NULL) == -1) + err(1, "pledge"); +#endif + + event_dispatch(); + fatalx("exited event loop"); + + return (0); +} + +static void +scheduler_timeout(int fd, short event, void *p) +{ + struct timeval tv; + size_t i; + size_t d_inflight; + size_t d_envelope; + size_t d_removed; + size_t d_expired; + size_t d_updated; + size_t count; + int mask, r, delay; + + tv.tv_sec = 0; + tv.tv_usec = 0; + + mask = SCHED_UPDATE; + + if (ninflight < env->sc_scheduler_max_inflight) { + mask |= SCHED_EXPIRE | SCHED_REMOVE | SCHED_BOUNCE; + if (!(env->sc_flags & SMTPD_MDA_PAUSED)) + mask |= SCHED_MDA; + if (!(env->sc_flags & SMTPD_MTA_PAUSED)) + mask |= SCHED_MTA; + } + + count = env->sc_scheduler_max_schedule; + + log_trace(TRACE_SCHEDULER, "scheduler: getting batch: mask=0x%x, count=%zu", mask, count); + + r = backend->batch(mask, &delay, &count, evpids, types); + + log_trace(TRACE_SCHEDULER, "scheduler: got r=%i, delay=%i, count=%zu", r, delay, count); + + if (r < 0) + fatalx("scheduler: error in batch handler"); + + if (r == 0) { + + if (delay < -1) + fatalx("scheduler: invalid delay %d", delay); + + if (delay == -1) { + log_trace(TRACE_SCHEDULER, "scheduler: sleeping"); + return; + } + + tv.tv_sec = delay; + tv.tv_usec = 0; + log_trace(TRACE_SCHEDULER, + "scheduler: waiting for %s", duration_to_text(tv.tv_sec)); + evtimer_add(&ev, &tv); + return; + } + + d_inflight = 0; + d_envelope = 0; + d_removed = 0; + d_expired = 0; + d_updated = 0; + + for (i = 0; i < count; i++) { + switch(types[i]) { + case SCHED_REMOVE: + log_debug("debug: scheduler: evp:%016" PRIx64 + " removed", evpids[i]); + m_create(p_queue, IMSG_SCHED_ENVELOPE_REMOVE, 0, 0, -1); + m_add_evpid(p_queue, evpids[i]); + m_close(p_queue); + d_envelope += 1; + d_removed += 1; + d_inflight += 1; + break; + + case SCHED_EXPIRE: + log_debug("debug: scheduler: evp:%016" PRIx64 + " expired", evpids[i]); + m_create(p_queue, IMSG_SCHED_ENVELOPE_EXPIRE, 0, 0, -1); + m_add_evpid(p_queue, evpids[i]); + m_close(p_queue); + d_envelope += 1; + d_expired += 1; + d_inflight += 1; + break; + + case SCHED_UPDATE: + log_debug("debug: scheduler: evp:%016" PRIx64 + " scheduled (update)", evpids[i]); + d_updated += 1; + break; + + case SCHED_BOUNCE: + log_debug("debug: scheduler: evp:%016" PRIx64 + " scheduled (bounce)", evpids[i]); + m_create(p_queue, IMSG_SCHED_ENVELOPE_INJECT, 0, 0, -1); + m_add_evpid(p_queue, evpids[i]); + m_close(p_queue); + d_inflight += 1; + break; + + case SCHED_MDA: + log_debug("debug: scheduler: evp:%016" PRIx64 + " scheduled (mda)", evpids[i]); + m_create(p_queue, IMSG_SCHED_ENVELOPE_DELIVER, 0, 0, -1); + m_add_evpid(p_queue, evpids[i]); + m_close(p_queue); + d_inflight += 1; + break; + + case SCHED_MTA: + log_debug("debug: scheduler: evp:%016" PRIx64 + " scheduled (mta)", evpids[i]); + m_create(p_queue, IMSG_SCHED_ENVELOPE_TRANSFER, 0, 0, -1); + m_add_evpid(p_queue, evpids[i]); + m_close(p_queue); + d_inflight += 1; + break; + } + } + + stat_decrement("scheduler.envelope", d_envelope); + stat_increment("scheduler.envelope.inflight", d_inflight); + stat_increment("scheduler.envelope.expired", d_expired); + stat_increment("scheduler.envelope.removed", d_removed); + stat_increment("scheduler.envelope.updated", d_updated); + + ninflight += d_inflight; + + tv.tv_sec = 0; + tv.tv_usec = 0; + evtimer_add(&ev, &tv); +} |