aboutsummaryrefslogtreecommitdiffstats
path: root/smtpd/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'smtpd/queue.c')
-rw-r--r--smtpd/queue.c750
1 files changed, 750 insertions, 0 deletions
diff --git a/smtpd/queue.c b/smtpd/queue.c
new file mode 100644
index 00000000..434e3647
--- /dev/null
+++ b/smtpd/queue.c
@@ -0,0 +1,750 @@
+/* $OpenBSD: queue.c,v 1.190 2020/04/22 11:35:34 eric Exp $ */
+
+/*
+ * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org>
+ * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
+ * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include "includes.h"
+
+#include <sys/types.h>
+#include <sys/queue.h>
+#include <sys/tree.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+
+#include <err.h>
+#include <event.h>
+#include <fcntl.h>
+#include <grp.h> /* needed for setgroups */
+#include <imsg.h>
+#include <inttypes.h>
+#include <pwd.h>
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include <unistd.h>
+#include <limits.h>
+
+#include "smtpd.h"
+#include "log.h"
+
+static void queue_imsg(struct mproc *, struct imsg *);
+static void queue_timeout(int, short, void *);
+static void queue_bounce(struct envelope *, struct delivery_bounce *);
+static void queue_shutdown(void);
+static void queue_log(const struct envelope *, const char *, const char *);
+static void queue_msgid_walk(int, short, void *);
+
+
+static void
+queue_imsg(struct mproc *p, struct imsg *imsg)
+{
+ struct delivery_bounce bounce;
+ struct msg_walkinfo *wi;
+ struct timeval tv;
+ struct bounce_req_msg *req_bounce;
+ struct envelope evp;
+ struct msg m;
+ const char *reason;
+ uint64_t reqid, evpid, holdq;
+ uint32_t msgid;
+ time_t nexttry;
+ size_t n_evp;
+ int fd, mta_ext, ret, v, flags, code;
+
+ if (imsg == NULL)
+ queue_shutdown();
+
+ memset(&bounce, 0, sizeof(struct delivery_bounce));
+
+ switch (imsg->hdr.type) {
+ case IMSG_SMTP_MESSAGE_CREATE:
+ m_msg(&m, imsg);
+ m_get_id(&m, &reqid);
+ m_end(&m);
+
+ ret = queue_message_create(&msgid);
+
+ m_create(p, IMSG_SMTP_MESSAGE_CREATE, 0, 0, -1);
+ m_add_id(p, reqid);
+ if (ret == 0)
+ m_add_int(p, 0);
+ else {
+ m_add_int(p, 1);
+ m_add_msgid(p, msgid);
+ }
+ m_close(p);
+ return;
+
+ case IMSG_SMTP_MESSAGE_ROLLBACK:
+ m_msg(&m, imsg);
+ m_get_msgid(&m, &msgid);
+ m_end(&m);
+
+ queue_message_delete(msgid);
+
+ m_create(p_scheduler, IMSG_QUEUE_MESSAGE_ROLLBACK,
+ 0, 0, -1);
+ m_add_msgid(p_scheduler, msgid);
+ m_close(p_scheduler);
+ return;
+
+ case IMSG_SMTP_MESSAGE_COMMIT:
+ m_msg(&m, imsg);
+ m_get_id(&m, &reqid);
+ m_get_msgid(&m, &msgid);
+ m_end(&m);
+
+ ret = queue_message_commit(msgid);
+
+ m_create(p, IMSG_SMTP_MESSAGE_COMMIT, 0, 0, -1);
+ m_add_id(p, reqid);
+ m_add_int(p, (ret == 0) ? 0 : 1);
+ m_close(p);
+
+ if (ret) {
+ m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT,
+ 0, 0, -1);
+ m_add_msgid(p_scheduler, msgid);
+ m_close(p_scheduler);
+ }
+ return;
+
+ case IMSG_SMTP_MESSAGE_OPEN:
+ m_msg(&m, imsg);
+ m_get_id(&m, &reqid);
+ m_get_msgid(&m, &msgid);
+ m_end(&m);
+
+ fd = queue_message_fd_rw(msgid);
+
+ m_create(p, IMSG_SMTP_MESSAGE_OPEN, 0, 0, fd);
+ m_add_id(p, reqid);
+ m_add_int(p, (fd == -1) ? 0 : 1);
+ m_close(p);
+ return;
+
+ case IMSG_QUEUE_SMTP_SESSION:
+ bounce_fd(imsg->fd);
+ return;
+
+ case IMSG_LKA_ENVELOPE_SUBMIT:
+ m_msg(&m, imsg);
+ m_get_id(&m, &reqid);
+ m_get_envelope(&m, &evp);
+ m_end(&m);
+
+ if (evp.id == 0)
+ log_warnx("warn: imsg_queue_submit_envelope: evpid=0");
+ if (evpid_to_msgid(evp.id) == 0)
+ log_warnx("warn: imsg_queue_submit_envelope: msgid=0, "
+ "evpid=%016"PRIx64, evp.id);
+ ret = queue_envelope_create(&evp);
+ m_create(p_pony, IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1);
+ m_add_id(p_pony, reqid);
+ if (ret == 0)
+ m_add_int(p_pony, 0);
+ else {
+ m_add_int(p_pony, 1);
+ m_add_evpid(p_pony, evp.id);
+ }
+ m_close(p_pony);
+ if (ret) {
+ m_create(p_scheduler,
+ IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1);
+ m_add_envelope(p_scheduler, &evp);
+ m_close(p_scheduler);
+ }
+ return;
+
+ case IMSG_LKA_ENVELOPE_COMMIT:
+ m_msg(&m, imsg);
+ m_get_id(&m, &reqid);
+ m_end(&m);
+ m_create(p_pony, IMSG_QUEUE_ENVELOPE_COMMIT, 0, 0, -1);
+ m_add_id(p_pony, reqid);
+ m_add_int(p_pony, 1);
+ m_close(p_pony);
+ return;
+
+ case IMSG_SCHED_ENVELOPE_REMOVE:
+ m_msg(&m, imsg);
+ m_get_evpid(&m, &evpid);
+ m_end(&m);
+
+ m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_ACK, 0, 0, -1);
+ m_add_evpid(p_scheduler, evpid);
+ m_close(p_scheduler);
+
+ /* already removed by scheduler */
+ if (queue_envelope_load(evpid, &evp) == 0)
+ return;
+
+ queue_log(&evp, "Remove", "Removed by administrator");
+ queue_envelope_delete(evpid);
+ return;
+
+ case IMSG_SCHED_ENVELOPE_EXPIRE:
+ m_msg(&m, imsg);
+ m_get_evpid(&m, &evpid);
+ m_end(&m);
+
+ m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_ACK, 0, 0, -1);
+ m_add_evpid(p_scheduler, evpid);
+ m_close(p_scheduler);
+
+ /* already removed by scheduler*/
+ if (queue_envelope_load(evpid, &evp) == 0)
+ return;
+
+ bounce.type = B_FAILED;
+ envelope_set_errormsg(&evp, "Envelope expired");
+ envelope_set_esc_class(&evp, ESC_STATUS_TEMPFAIL);
+ envelope_set_esc_code(&evp, ESC_DELIVERY_TIME_EXPIRED);
+ queue_bounce(&evp, &bounce);
+ queue_log(&evp, "Expire", "Envelope expired");
+ queue_envelope_delete(evpid);
+ return;
+
+ case IMSG_SCHED_ENVELOPE_BOUNCE:
+ CHECK_IMSG_DATA_SIZE(imsg, sizeof *req_bounce);
+ req_bounce = imsg->data;
+ evpid = req_bounce->evpid;
+
+ if (queue_envelope_load(evpid, &evp) == 0) {
+ log_warnx("queue: bounce: failed to load envelope");
+ m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1);
+ m_add_evpid(p_scheduler, evpid);
+ m_add_u32(p_scheduler, 0); /* not in-flight */
+ m_close(p_scheduler);
+ return;
+ }
+ queue_bounce(&evp, &req_bounce->bounce);
+ evp.lastbounce = req_bounce->timestamp;
+ if (!queue_envelope_update(&evp))
+ log_warnx("warn: could not update envelope %016"PRIx64, evpid);
+ return;
+
+ case IMSG_SCHED_ENVELOPE_DELIVER:
+ m_msg(&m, imsg);
+ m_get_evpid(&m, &evpid);
+ m_end(&m);
+ if (queue_envelope_load(evpid, &evp) == 0) {
+ log_warnx("queue: deliver: failed to load envelope");
+ m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1);
+ m_add_evpid(p_scheduler, evpid);
+ m_add_u32(p_scheduler, 1); /* in-flight */
+ m_close(p_scheduler);
+ return;
+ }
+ evp.lasttry = time(NULL);
+ m_create(p_pony, IMSG_QUEUE_DELIVER, 0, 0, -1);
+ m_add_envelope(p_pony, &evp);
+ m_close(p_pony);
+ return;
+
+ case IMSG_SCHED_ENVELOPE_INJECT:
+ m_msg(&m, imsg);
+ m_get_evpid(&m, &evpid);
+ m_end(&m);
+ bounce_add(evpid);
+ return;
+
+ case IMSG_SCHED_ENVELOPE_TRANSFER:
+ m_msg(&m, imsg);
+ m_get_evpid(&m, &evpid);
+ m_end(&m);
+ if (queue_envelope_load(evpid, &evp) == 0) {
+ log_warnx("queue: failed to load envelope");
+ m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1);
+ m_add_evpid(p_scheduler, evpid);
+ m_add_u32(p_scheduler, 1); /* in-flight */
+ m_close(p_scheduler);
+ return;
+ }
+ evp.lasttry = time(NULL);
+ m_create(p_pony, IMSG_QUEUE_TRANSFER, 0, 0, -1);
+ m_add_envelope(p_pony, &evp);
+ m_close(p_pony);
+ return;
+
+ case IMSG_CTL_LIST_ENVELOPES:
+ if (imsg->hdr.len == sizeof imsg->hdr) {
+ m_forward(p_control, imsg);
+ return;
+ }
+
+ m_msg(&m, imsg);
+ m_get_evpid(&m, &evpid);
+ m_get_int(&m, &flags);
+ m_get_time(&m, &nexttry);
+ m_end(&m);
+
+ if (queue_envelope_load(evpid, &evp) == 0)
+ return; /* Envelope is gone, drop it */
+
+ /*
+ * XXX consistency: The envelope might already be on
+ * its way back to the scheduler. We need to detect
+ * this properly and report that state.
+ */
+ if (flags & EF_INFLIGHT) {
+ /*
+ * Not exactly correct but pretty close: The
+ * value is not recorded on the envelope unless
+ * a tempfail occurs.
+ */
+ evp.lasttry = nexttry;
+ }
+
+ m_create(p_control, IMSG_CTL_LIST_ENVELOPES,
+ imsg->hdr.peerid, 0, -1);
+ m_add_int(p_control, flags);
+ m_add_time(p_control, nexttry);
+ m_add_envelope(p_control, &evp);
+ m_close(p_control);
+ return;
+
+ case IMSG_MDA_OPEN_MESSAGE:
+ case IMSG_MTA_OPEN_MESSAGE:
+ m_msg(&m, imsg);
+ m_get_id(&m, &reqid);
+ m_get_msgid(&m, &msgid);
+ m_end(&m);
+ fd = queue_message_fd_r(msgid);
+ m_create(p, imsg->hdr.type, 0, 0, fd);
+ m_add_id(p, reqid);
+ m_close(p);
+ return;
+
+ case IMSG_MDA_DELIVERY_OK:
+ case IMSG_MTA_DELIVERY_OK:
+ m_msg(&m, imsg);
+ m_get_evpid(&m, &evpid);
+ if (imsg->hdr.type == IMSG_MTA_DELIVERY_OK)
+ m_get_int(&m, &mta_ext);
+ m_end(&m);
+ if (queue_envelope_load(evpid, &evp) == 0) {
+ log_warn("queue: dsn: failed to load envelope");
+ return;
+ }
+ if (evp.dsn_notify & DSN_SUCCESS) {
+ bounce.type = B_DELIVERED;
+ bounce.dsn_ret = evp.dsn_ret;
+ envelope_set_esc_class(&evp, ESC_STATUS_OK);
+ if (imsg->hdr.type == IMSG_MDA_DELIVERY_OK)
+ queue_bounce(&evp, &bounce);
+ else if (imsg->hdr.type == IMSG_MTA_DELIVERY_OK &&
+ (mta_ext & MTA_EXT_DSN) == 0) {
+ bounce.mta_without_dsn = 1;
+ queue_bounce(&evp, &bounce);
+ }
+ }
+ queue_envelope_delete(evpid);
+ m_create(p_scheduler, IMSG_QUEUE_DELIVERY_OK, 0, 0, -1);
+ m_add_evpid(p_scheduler, evpid);
+ m_close(p_scheduler);
+ return;
+
+ case IMSG_MDA_DELIVERY_TEMPFAIL:
+ case IMSG_MTA_DELIVERY_TEMPFAIL:
+ m_msg(&m, imsg);
+ m_get_evpid(&m, &evpid);
+ m_get_string(&m, &reason);
+ m_get_int(&m, &code);
+ m_end(&m);
+ if (queue_envelope_load(evpid, &evp) == 0) {
+ log_warnx("queue: tempfail: failed to load envelope");
+ m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1);
+ m_add_evpid(p_scheduler, evpid);
+ m_add_u32(p_scheduler, 1); /* in-flight */
+ m_close(p_scheduler);
+ return;
+ }
+ envelope_set_errormsg(&evp, "%s", reason);
+ envelope_set_esc_class(&evp, ESC_STATUS_TEMPFAIL);
+ envelope_set_esc_code(&evp, code);
+ evp.retry++;
+ if (!queue_envelope_update(&evp))
+ log_warnx("warn: could not update envelope %016"PRIx64, evpid);
+ m_create(p_scheduler, IMSG_QUEUE_DELIVERY_TEMPFAIL, 0, 0, -1);
+ m_add_envelope(p_scheduler, &evp);
+ m_close(p_scheduler);
+ return;
+
+ case IMSG_MDA_DELIVERY_PERMFAIL:
+ case IMSG_MTA_DELIVERY_PERMFAIL:
+ m_msg(&m, imsg);
+ m_get_evpid(&m, &evpid);
+ m_get_string(&m, &reason);
+ m_get_int(&m, &code);
+ m_end(&m);
+ if (queue_envelope_load(evpid, &evp) == 0) {
+ log_warnx("queue: permfail: failed to load envelope");
+ m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1);
+ m_add_evpid(p_scheduler, evpid);
+ m_add_u32(p_scheduler, 1); /* in-flight */
+ m_close(p_scheduler);
+ return;
+ }
+ bounce.type = B_FAILED;
+ envelope_set_errormsg(&evp, "%s", reason);
+ envelope_set_esc_class(&evp, ESC_STATUS_PERMFAIL);
+ envelope_set_esc_code(&evp, code);
+ queue_bounce(&evp, &bounce);
+ queue_envelope_delete(evpid);
+ m_create(p_scheduler, IMSG_QUEUE_DELIVERY_PERMFAIL, 0, 0, -1);
+ m_add_evpid(p_scheduler, evpid);
+ m_close(p_scheduler);
+ return;
+
+ case IMSG_MDA_DELIVERY_LOOP:
+ case IMSG_MTA_DELIVERY_LOOP:
+ m_msg(&m, imsg);
+ m_get_evpid(&m, &evpid);
+ m_end(&m);
+ if (queue_envelope_load(evpid, &evp) == 0) {
+ log_warnx("queue: loop: failed to load envelope");
+ m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1);
+ m_add_evpid(p_scheduler, evpid);
+ m_add_u32(p_scheduler, 1); /* in-flight */
+ m_close(p_scheduler);
+ return;
+ }
+ envelope_set_errormsg(&evp, "%s", "Loop detected");
+ envelope_set_esc_class(&evp, ESC_STATUS_TEMPFAIL);
+ envelope_set_esc_code(&evp, ESC_ROUTING_LOOP_DETECTED);
+ bounce.type = B_FAILED;
+ queue_bounce(&evp, &bounce);
+ queue_envelope_delete(evp.id);
+ m_create(p_scheduler, IMSG_QUEUE_DELIVERY_LOOP, 0, 0, -1);
+ m_add_evpid(p_scheduler, evp.id);
+ m_close(p_scheduler);
+ return;
+
+ case IMSG_MTA_DELIVERY_HOLD:
+ case IMSG_MDA_DELIVERY_HOLD:
+ imsg->hdr.type = IMSG_QUEUE_HOLDQ_HOLD;
+ m_forward(p_scheduler, imsg);
+ return;
+
+ case IMSG_MTA_SCHEDULE:
+ imsg->hdr.type = IMSG_QUEUE_ENVELOPE_SCHEDULE;
+ m_forward(p_scheduler, imsg);
+ return;
+
+ case IMSG_MTA_HOLDQ_RELEASE:
+ case IMSG_MDA_HOLDQ_RELEASE:
+ m_msg(&m, imsg);
+ m_get_id(&m, &holdq);
+ m_get_int(&m, &v);
+ m_end(&m);
+ m_create(p_scheduler, IMSG_QUEUE_HOLDQ_RELEASE, 0, 0, -1);
+ if (imsg->hdr.type == IMSG_MTA_HOLDQ_RELEASE)
+ m_add_int(p_scheduler, D_MTA);
+ else
+ m_add_int(p_scheduler, D_MDA);
+ m_add_id(p_scheduler, holdq);
+ m_add_int(p_scheduler, v);
+ m_close(p_scheduler);
+ return;
+
+ case IMSG_CTL_PAUSE_MDA:
+ case IMSG_CTL_PAUSE_MTA:
+ case IMSG_CTL_RESUME_MDA:
+ case IMSG_CTL_RESUME_MTA:
+ m_forward(p_scheduler, imsg);
+ return;
+
+ case IMSG_CTL_VERBOSE:
+ m_msg(&m, imsg);
+ m_get_int(&m, &v);
+ m_end(&m);
+ log_trace_verbose(v);
+ return;
+
+ case IMSG_CTL_PROFILE:
+ m_msg(&m, imsg);
+ m_get_int(&m, &v);
+ m_end(&m);
+ profiling = v;
+ return;
+
+ case IMSG_CTL_DISCOVER_EVPID:
+ m_msg(&m, imsg);
+ m_get_evpid(&m, &evpid);
+ m_end(&m);
+ if (queue_envelope_load(evpid, &evp) == 0) {
+ log_warnx("queue: discover: failed to load "
+ "envelope %016" PRIx64, evpid);
+ n_evp = 0;
+ m_compose(p_control, imsg->hdr.type,
+ imsg->hdr.peerid, 0, -1,
+ &n_evp, sizeof n_evp);
+ return;
+ }
+
+ m_create(p_scheduler, IMSG_QUEUE_DISCOVER_EVPID,
+ 0, 0, -1);
+ m_add_envelope(p_scheduler, &evp);
+ m_close(p_scheduler);
+
+ m_create(p_scheduler, IMSG_QUEUE_DISCOVER_MSGID,
+ 0, 0, -1);
+ m_add_msgid(p_scheduler, evpid_to_msgid(evpid));
+ m_close(p_scheduler);
+ n_evp = 1;
+ m_compose(p_control, imsg->hdr.type, imsg->hdr.peerid,
+ 0, -1, &n_evp, sizeof n_evp);
+ return;
+
+ case IMSG_CTL_DISCOVER_MSGID:
+ m_msg(&m, imsg);
+ m_get_msgid(&m, &msgid);
+ m_end(&m);
+ /* handle concurrent walk requests */
+ wi = xcalloc(1, sizeof *wi);
+ wi->msgid = msgid;
+ wi->peerid = imsg->hdr.peerid;
+ evtimer_set(&wi->ev, queue_msgid_walk, wi);
+ tv.tv_sec = 0;
+ tv.tv_usec = 10;
+ evtimer_add(&wi->ev, &tv);
+ return;
+ }
+
+ errx(1, "queue_imsg: unexpected %s imsg", imsg_to_str(imsg->hdr.type));
+}
+
+static void
+queue_msgid_walk(int fd, short event, void *arg)
+{
+ struct envelope evp;
+ struct timeval tv;
+ struct msg_walkinfo *wi = arg;
+ int r;
+
+ r = queue_message_walk(&evp, wi->msgid, &wi->done, &wi->data);
+ if (r == -1) {
+ if (wi->n_evp) {
+ m_create(p_scheduler, IMSG_QUEUE_DISCOVER_MSGID,
+ 0, 0, -1);
+ m_add_msgid(p_scheduler, wi->msgid);
+ m_close(p_scheduler);
+ }
+
+ m_compose(p_control, IMSG_CTL_DISCOVER_MSGID, wi->peerid, 0, -1,
+ &wi->n_evp, sizeof wi->n_evp);
+ evtimer_del(&wi->ev);
+ free(wi);
+ return;
+ }
+
+ if (r) {
+ m_create(p_scheduler, IMSG_QUEUE_DISCOVER_EVPID, 0, 0, -1);
+ m_add_envelope(p_scheduler, &evp);
+ m_close(p_scheduler);
+ wi->n_evp += 1;
+ }
+
+ tv.tv_sec = 0;
+ tv.tv_usec = 10;
+ evtimer_set(&wi->ev, queue_msgid_walk, wi);
+ evtimer_add(&wi->ev, &tv);
+}
+
+static void
+queue_bounce(struct envelope *e, struct delivery_bounce *d)
+{
+ struct envelope b;
+
+ b = *e;
+ b.type = D_BOUNCE;
+ b.agent.bounce = *d;
+ b.retry = 0;
+ b.lasttry = 0;
+ b.creation = time(NULL);
+ b.ttl = 3600 * 24 * 7;
+
+ if (e->dsn_notify & DSN_NEVER)
+ return;
+
+ if (b.id == 0)
+ log_warnx("warn: queue_bounce: evpid=0");
+ if (evpid_to_msgid(b.id) == 0)
+ log_warnx("warn: queue_bounce: msgid=0, evpid=%016"PRIx64,
+ b.id);
+ if (e->type == D_BOUNCE) {
+ log_warnx("warn: queue: double bounce!");
+ } else if (e->sender.user[0] == '\0') {
+ log_warnx("warn: queue: no return path!");
+ } else if (!queue_envelope_create(&b)) {
+ log_warnx("warn: queue: cannot bounce!");
+ } else {
+ log_debug("debug: queue: bouncing evp:%016" PRIx64
+ " as evp:%016" PRIx64, e->id, b.id);
+
+ m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1);
+ m_add_envelope(p_scheduler, &b);
+ m_close(p_scheduler);
+
+ m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT, 0, 0, -1);
+ m_add_msgid(p_scheduler, evpid_to_msgid(b.id));
+ m_close(p_scheduler);
+
+ stat_increment("queue.bounce", 1);
+ }
+}
+
+static void
+queue_shutdown(void)
+{
+ log_debug("debug: queue agent exiting");
+ queue_close();
+ _exit(0);
+}
+
+int
+queue(void)
+{
+ struct passwd *pw;
+ struct timeval tv;
+ struct event ev_qload;
+
+ purge_config(PURGE_EVERYTHING & ~PURGE_DISPATCHERS);
+
+ if ((pw = getpwnam(SMTPD_QUEUE_USER)) == NULL)
+ if ((pw = getpwnam(SMTPD_USER)) == NULL)
+ fatalx("unknown user " SMTPD_USER);
+
+ env->sc_queue_flags |= QUEUE_EVPCACHE;
+ env->sc_queue_evpcache_size = 1024;
+
+ if (chroot(PATH_SPOOL) == -1)
+ fatal("queue: chroot");
+ if (chdir("/") == -1)
+ fatal("queue: chdir(\"/\")");
+
+ config_process(PROC_QUEUE);
+
+ if (env->sc_queue_flags & QUEUE_COMPRESSION)
+ log_info("queue: queue compression enabled");
+
+ if (env->sc_queue_key) {
+ if (!crypto_setup(env->sc_queue_key, strlen(env->sc_queue_key)))
+ fatalx("crypto_setup: invalid key for queue encryption");
+ log_info("queue: queue encryption enabled");
+ }
+
+ if (setgroups(1, &pw->pw_gid) ||
+ setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) ||
+ setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid))
+ fatal("queue: cannot drop privileges");
+
+ imsg_callback = queue_imsg;
+ event_init();
+
+ signal(SIGINT, SIG_IGN);
+ signal(SIGTERM, SIG_IGN);
+ signal(SIGPIPE, SIG_IGN);
+ signal(SIGHUP, SIG_IGN);
+
+ config_peer(PROC_PARENT);
+ config_peer(PROC_CONTROL);
+ config_peer(PROC_LKA);
+ config_peer(PROC_SCHEDULER);
+ config_peer(PROC_PONY);
+
+ /* setup queue loading task */
+ evtimer_set(&ev_qload, queue_timeout, &ev_qload);
+ tv.tv_sec = 0;
+ tv.tv_usec = 10;
+ evtimer_add(&ev_qload, &tv);
+
+#if HAVE_PLEDGE
+ if (pledge("stdio rpath wpath cpath flock recvfd sendfd", NULL) == -1)
+ err(1, "pledge");
+#endif
+
+ event_dispatch();
+ fatalx("exited event loop");
+
+ return (0);
+}
+
+static void
+queue_timeout(int fd, short event, void *p)
+{
+ static uint32_t msgid = 0;
+ struct envelope evp;
+ struct event *ev = p;
+ struct timeval tv;
+ int r;
+
+ r = queue_envelope_walk(&evp);
+ if (r == -1) {
+ if (msgid) {
+ m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT,
+ 0, 0, -1);
+ m_add_msgid(p_scheduler, msgid);
+ m_close(p_scheduler);
+ }
+ log_debug("debug: queue: done loading queue into scheduler");
+ return;
+ }
+
+ if (r) {
+ if (msgid && evpid_to_msgid(evp.id) != msgid) {
+ m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT,
+ 0, 0, -1);
+ m_add_msgid(p_scheduler, msgid);
+ m_close(p_scheduler);
+ }
+ msgid = evpid_to_msgid(evp.id);
+ m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1);
+ m_add_envelope(p_scheduler, &evp);
+ m_close(p_scheduler);
+ }
+
+ tv.tv_sec = 0;
+ tv.tv_usec = 10;
+ evtimer_add(ev, &tv);
+}
+
+static void
+queue_log(const struct envelope *e, const char *prefix, const char *status)
+{
+ char rcpt[LINE_MAX];
+
+ (void)strlcpy(rcpt, "-", sizeof rcpt);
+ if (strcmp(e->rcpt.user, e->dest.user) ||
+ strcmp(e->rcpt.domain, e->dest.domain))
+ (void)snprintf(rcpt, sizeof rcpt, "%s@%s",
+ e->rcpt.user, e->rcpt.domain);
+
+ log_info("%s: %s for %016" PRIx64 ": from=<%s@%s>, to=<%s@%s>, "
+ "rcpt=<%s>, delay=%s, stat=%s",
+ e->type == D_MDA ? "delivery" : "relay",
+ prefix,
+ e->id, e->sender.user, e->sender.domain,
+ e->dest.user, e->dest.domain,
+ rcpt,
+ duration_to_text(time(NULL) - e->creation),
+ status);
+}