aboutsummaryrefslogtreecommitdiffstats
path: root/queue_backend.c
diff options
context:
space:
mode:
Diffstat (limited to 'queue_backend.c')
-rw-r--r--queue_backend.c804
1 files changed, 0 insertions, 804 deletions
diff --git a/queue_backend.c b/queue_backend.c
deleted file mode 100644
index a33f91f4..00000000
--- a/queue_backend.c
+++ /dev/null
@@ -1,804 +0,0 @@
-/* $OpenBSD: queue_backend.c,v 1.66 2020/04/22 11:35:34 eric Exp $ */
-
-/*
- * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org>
- *
- * Permission to use, copy, modify, and distribute this software for any
- * purpose with or without fee is hereby granted, provided that the above
- * copyright notice and this permission notice appear in all copies.
- *
- * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
- * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
- * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
- * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
- * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
- * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
- * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- */
-
-#include <sys/types.h>
-#include <sys/queue.h>
-#include <sys/tree.h>
-#include <sys/socket.h>
-#include <sys/stat.h>
-
-#include <ctype.h>
-#include <err.h>
-#include <errno.h>
-#include <event.h>
-#include <fcntl.h>
-#include <grp.h>
-#include <imsg.h>
-#include <limits.h>
-#include <inttypes.h>
-#include <pwd.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <time.h>
-#include <unistd.h>
-
-#include "smtpd.h"
-#include "log.h"
-
-static const char* envelope_validate(struct envelope *);
-
-extern struct queue_backend queue_backend_fs;
-extern struct queue_backend queue_backend_null;
-extern struct queue_backend queue_backend_proc;
-extern struct queue_backend queue_backend_ram;
-
-static void queue_envelope_cache_add(struct envelope *);
-static void queue_envelope_cache_update(struct envelope *);
-static void queue_envelope_cache_del(uint64_t evpid);
-
-TAILQ_HEAD(evplst, envelope);
-
-static struct tree evpcache_tree;
-static struct evplst evpcache_list;
-static struct queue_backend *backend;
-
-static int (*handler_close)(void);
-static int (*handler_message_create)(uint32_t *);
-static int (*handler_message_commit)(uint32_t, const char*);
-static int (*handler_message_delete)(uint32_t);
-static int (*handler_message_fd_r)(uint32_t);
-static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *);
-static int (*handler_envelope_delete)(uint64_t);
-static int (*handler_envelope_update)(uint64_t, const char *, size_t);
-static int (*handler_envelope_load)(uint64_t, char *, size_t);
-static int (*handler_envelope_walk)(uint64_t *, char *, size_t);
-static int (*handler_message_walk)(uint64_t *, char *, size_t,
- uint32_t, int *, void **);
-
-#ifdef QUEUE_PROFILING
-
-static struct {
- struct timespec t0;
- const char *name;
-} profile;
-
-static inline void profile_enter(const char *name)
-{
- if ((profiling & PROFILE_QUEUE) == 0)
- return;
-
- profile.name = name;
- clock_gettime(CLOCK_MONOTONIC, &profile.t0);
-}
-
-static inline void profile_leave(void)
-{
- struct timespec t1, dt;
-
- if ((profiling & PROFILE_QUEUE) == 0)
- return;
-
- clock_gettime(CLOCK_MONOTONIC, &t1);
- timespecsub(&t1, &profile.t0, &dt);
- log_debug("profile-queue: %s %lld.%09ld", profile.name,
- (long long)dt.tv_sec, dt.tv_nsec);
-}
-#else
-#define profile_enter(x) do {} while (0)
-#define profile_leave() do {} while (0)
-#endif
-
-static int
-queue_message_path(uint32_t msgid, char *buf, size_t len)
-{
- return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid);
-}
-
-int
-queue_init(const char *name, int server)
-{
- struct passwd *pwq;
- struct group *gr;
- int r;
-
- pwq = getpwnam(SMTPD_QUEUE_USER);
- if (pwq == NULL)
- errx(1, "unknown user %s", SMTPD_QUEUE_USER);
-
- gr = getgrnam(SMTPD_QUEUE_GROUP);
- if (gr == NULL)
- errx(1, "unknown group %s", SMTPD_QUEUE_GROUP);
-
- tree_init(&evpcache_tree);
- TAILQ_INIT(&evpcache_list);
-
- if (!strcmp(name, "fs"))
- backend = &queue_backend_fs;
- else if (!strcmp(name, "null"))
- backend = &queue_backend_null;
- else if (!strcmp(name, "ram"))
- backend = &queue_backend_ram;
- else
- backend = &queue_backend_proc;
-
- if (server) {
- if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0)
- errx(1, "error in spool directory setup");
- if (ckdir(PATH_SPOOL PATH_OFFLINE, 0770, 0, gr->gr_gid, 1) == 0)
- errx(1, "error in offline directory setup");
- if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0)
- errx(1, "error in purge directory setup");
-
- mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE);
-
- if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0)
- errx(1, "error in purge directory setup");
- }
-
- r = backend->init(pwq, server, name);
-
- log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r);
-
- return (r);
-}
-
-int
-queue_close(void)
-{
- if (handler_close)
- return (handler_close());
-
- return (1);
-}
-
-int
-queue_message_create(uint32_t *msgid)
-{
- int r;
-
- profile_enter("queue_message_create");
- r = handler_message_create(msgid);
- profile_leave();
-
- log_trace(TRACE_QUEUE,
- "queue-backend: queue_message_create() -> %d (%08"PRIx32")",
- r, *msgid);
-
- return (r);
-}
-
-int
-queue_message_delete(uint32_t msgid)
-{
- char msgpath[PATH_MAX];
- uint64_t evpid;
- void *iter;
- int r;
-
- profile_enter("queue_message_delete");
- r = handler_message_delete(msgid);
- profile_leave();
-
- /* in case the message is incoming */
- queue_message_path(msgid, msgpath, sizeof(msgpath));
- unlink(msgpath);
-
- /* remove remaining envelopes from the cache if any (on rollback) */
- evpid = msgid_to_evpid(msgid);
- for (;;) {
- iter = NULL;
- if (!tree_iterfrom(&evpcache_tree, &iter, evpid, &evpid, NULL))
- break;
- if (evpid_to_msgid(evpid) != msgid)
- break;
- queue_envelope_cache_del(evpid);
- }
-
- log_trace(TRACE_QUEUE,
- "queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r);
-
- return (r);
-}
-
-int
-queue_message_commit(uint32_t msgid)
-{
- int r;
- char msgpath[PATH_MAX];
- char tmppath[PATH_MAX];
- FILE *ifp = NULL;
- FILE *ofp = NULL;
-
- profile_enter("queue_message_commit");
-
- queue_message_path(msgid, msgpath, sizeof(msgpath));
-
- if (env->sc_queue_flags & QUEUE_COMPRESSION) {
- bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath);
- ifp = fopen(msgpath, "r");
- ofp = fopen(tmppath, "w+");
- if (ifp == NULL || ofp == NULL)
- goto err;
- if (!compress_file(ifp, ofp))
- goto err;
- fclose(ifp);
- fclose(ofp);
- ifp = NULL;
- ofp = NULL;
-
- if (rename(tmppath, msgpath) == -1) {
- if (errno == ENOSPC)
- return (0);
- unlink(tmppath);
- log_warn("rename");
- return (0);
- }
- }
-
- if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
- bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath);
- ifp = fopen(msgpath, "r");
- ofp = fopen(tmppath, "w+");
- if (ifp == NULL || ofp == NULL)
- goto err;
- if (!crypto_encrypt_file(ifp, ofp))
- goto err;
- fclose(ifp);
- fclose(ofp);
- ifp = NULL;
- ofp = NULL;
-
- if (rename(tmppath, msgpath) == -1) {
- if (errno == ENOSPC)
- return (0);
- unlink(tmppath);
- log_warn("rename");
- return (0);
- }
- }
-
- r = handler_message_commit(msgid, msgpath);
- profile_leave();
-
- /* in case it's not done by the backend */
- unlink(msgpath);
-
- log_trace(TRACE_QUEUE,
- "queue-backend: queue_message_commit(%08"PRIx32") -> %d",
- msgid, r);
-
- return (r);
-
-err:
- if (ifp)
- fclose(ifp);
- if (ofp)
- fclose(ofp);
- return 0;
-}
-
-int
-queue_message_fd_r(uint32_t msgid)
-{
- int fdin = -1, fdout = -1, fd = -1;
- FILE *ifp = NULL;
- FILE *ofp = NULL;
-
- profile_enter("queue_message_fd_r");
- fdin = handler_message_fd_r(msgid);
- profile_leave();
-
- log_trace(TRACE_QUEUE,
- "queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin);
-
- if (fdin == -1)
- return (-1);
-
- if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
- if ((fdout = mktmpfile()) == -1)
- goto err;
- if ((fd = dup(fdout)) == -1)
- goto err;
- if ((ifp = fdopen(fdin, "r")) == NULL)
- goto err;
- fdin = fd;
- fd = -1;
- if ((ofp = fdopen(fdout, "w+")) == NULL)
- goto err;
-
- if (!crypto_decrypt_file(ifp, ofp))
- goto err;
-
- fclose(ifp);
- ifp = NULL;
- fclose(ofp);
- ofp = NULL;
- lseek(fdin, SEEK_SET, 0);
- }
-
- if (env->sc_queue_flags & QUEUE_COMPRESSION) {
- if ((fdout = mktmpfile()) == -1)
- goto err;
- if ((fd = dup(fdout)) == -1)
- goto err;
- if ((ifp = fdopen(fdin, "r")) == NULL)
- goto err;
- fdin = fd;
- fd = -1;
- if ((ofp = fdopen(fdout, "w+")) == NULL)
- goto err;
-
- if (!uncompress_file(ifp, ofp))
- goto err;
-
- fclose(ifp);
- ifp = NULL;
- fclose(ofp);
- ofp = NULL;
- lseek(fdin, SEEK_SET, 0);
- }
-
- return (fdin);
-
-err:
- if (fd != -1)
- close(fd);
- if (fdin != -1)
- close(fdin);
- if (fdout != -1)
- close(fdout);
- if (ifp)
- fclose(ifp);
- if (ofp)
- fclose(ofp);
- return -1;
-}
-
-int
-queue_message_fd_rw(uint32_t msgid)
-{
- char buf[PATH_MAX];
-
- queue_message_path(msgid, buf, sizeof(buf));
-
- return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600);
-}
-
-static int
-queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
-{
- char *evp;
- size_t evplen;
- size_t complen;
- char compbuf[sizeof(struct envelope)];
- size_t enclen;
- char encbuf[sizeof(struct envelope)];
-
- evp = evpbuf;
- evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize);
- if (evplen == 0)
- return (0);
-
- if (env->sc_queue_flags & QUEUE_COMPRESSION) {
- complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf);
- if (complen == 0)
- return (0);
- evp = compbuf;
- evplen = complen;
- }
-
- if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
- enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
- if (enclen == 0)
- return (0);
- evp = encbuf;
- evplen = enclen;
- }
-
- memmove(evpbuf, evp, evplen);
-
- return (evplen);
-}
-
-static int
-queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
-{
- char *evp;
- size_t evplen;
- char compbuf[sizeof(struct envelope)];
- size_t complen;
- char encbuf[sizeof(struct envelope)];
- size_t enclen;
-
- evp = evpbuf;
- evplen = evpbufsize;
-
- if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
- enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
- if (enclen == 0)
- return (0);
- evp = encbuf;
- evplen = enclen;
- }
-
- if (env->sc_queue_flags & QUEUE_COMPRESSION) {
- complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf);
- if (complen == 0)
- return (0);
- evp = compbuf;
- evplen = complen;
- }
-
- return (envelope_load_buffer(ep, evp, evplen));
-}
-
-static void
-queue_envelope_cache_add(struct envelope *e)
-{
- struct envelope *cached;
-
- while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size)
- queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id);
-
- cached = xcalloc(1, sizeof *cached);
- *cached = *e;
- TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
- tree_xset(&evpcache_tree, e->id, cached);
- stat_increment("queue.evpcache.size", 1);
-}
-
-static void
-queue_envelope_cache_update(struct envelope *e)
-{
- struct envelope *cached;
-
- if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) {
- queue_envelope_cache_add(e);
- stat_increment("queue.evpcache.update.missed", 1);
- } else {
- TAILQ_REMOVE(&evpcache_list, cached, entry);
- *cached = *e;
- TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
- stat_increment("queue.evpcache.update.hit", 1);
- }
-}
-
-static void
-queue_envelope_cache_del(uint64_t evpid)
-{
- struct envelope *cached;
-
- if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL)
- return;
-
- TAILQ_REMOVE(&evpcache_list, cached, entry);
- free(cached);
- stat_decrement("queue.evpcache.size", 1);
-}
-
-int
-queue_envelope_create(struct envelope *ep)
-{
- int r;
- char evpbuf[sizeof(struct envelope)];
- size_t evplen;
- uint64_t evpid;
- uint32_t msgid;
-
- ep->creation = time(NULL);
- evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
- if (evplen == 0)
- return (0);
-
- evpid = ep->id;
- msgid = evpid_to_msgid(evpid);
-
- profile_enter("queue_envelope_create");
- r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id);
- profile_leave();
-
- log_trace(TRACE_QUEUE,
- "queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")",
- evpid, evplen, r, ep->id);
-
- if (!r) {
- ep->creation = 0;
- ep->id = 0;
- }
-
- if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
- queue_envelope_cache_add(ep);
-
- return (r);
-}
-
-int
-queue_envelope_delete(uint64_t evpid)
-{
- int r;
-
- if (env->sc_queue_flags & QUEUE_EVPCACHE)
- queue_envelope_cache_del(evpid);
-
- profile_enter("queue_envelope_delete");
- r = handler_envelope_delete(evpid);
- profile_leave();
-
- log_trace(TRACE_QUEUE,
- "queue-backend: queue_envelope_delete(%016"PRIx64") -> %d",
- evpid, r);
-
- return (r);
-}
-
-int
-queue_envelope_load(uint64_t evpid, struct envelope *ep)
-{
- const char *e;
- char evpbuf[sizeof(struct envelope)];
- size_t evplen;
- struct envelope *cached;
-
- if ((env->sc_queue_flags & QUEUE_EVPCACHE) &&
- (cached = tree_get(&evpcache_tree, evpid))) {
- *ep = *cached;
- stat_increment("queue.evpcache.load.hit", 1);
- return (1);
- }
-
- ep->id = evpid;
- profile_enter("queue_envelope_load");
- evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf);
- profile_leave();
-
- log_trace(TRACE_QUEUE,
- "queue-backend: queue_envelope_load(%016"PRIx64") -> %zu",
- evpid, evplen);
-
- if (evplen == 0)
- return (0);
-
- if (queue_envelope_load_buffer(ep, evpbuf, evplen)) {
- if ((e = envelope_validate(ep)) == NULL) {
- ep->id = evpid;
- if (env->sc_queue_flags & QUEUE_EVPCACHE) {
- queue_envelope_cache_add(ep);
- stat_increment("queue.evpcache.load.missed", 1);
- }
- return (1);
- }
- log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
- evpid, e);
- }
- return (0);
-}
-
-int
-queue_envelope_update(struct envelope *ep)
-{
- char evpbuf[sizeof(struct envelope)];
- size_t evplen;
- int r;
-
- evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
- if (evplen == 0)
- return (0);
-
- profile_enter("queue_envelope_update");
- r = handler_envelope_update(ep->id, evpbuf, evplen);
- profile_leave();
-
- if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
- queue_envelope_cache_update(ep);
-
- log_trace(TRACE_QUEUE,
- "queue-backend: queue_envelope_update(%016"PRIx64") -> %d",
- ep->id, r);
-
- return (r);
-}
-
-int
-queue_message_walk(struct envelope *ep, uint32_t msgid, int *done, void **data)
-{
- char evpbuf[sizeof(struct envelope)];
- uint64_t evpid;
- int r;
- const char *e;
-
- profile_enter("queue_message_walk");
- r = handler_message_walk(&evpid, evpbuf, sizeof evpbuf,
- msgid, done, data);
- profile_leave();
-
- log_trace(TRACE_QUEUE,
- "queue-backend: queue_message_walk() -> %d (%016"PRIx64")",
- r, evpid);
-
- if (r == -1)
- return (r);
-
- if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
- if ((e = envelope_validate(ep)) == NULL) {
- ep->id = evpid;
- /*
- * do not cache the envelope here, while discovering
- * envelopes one could re-run discover on already
- * scheduled envelopes which leads to triggering of
- * strict checks in caching. Envelopes could anyway
- * be loaded from backend if it isn't cached.
- */
- return (1);
- }
- log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
- evpid, e);
- }
- return (0);
-}
-
-int
-queue_envelope_walk(struct envelope *ep)
-{
- const char *e;
- uint64_t evpid;
- char evpbuf[sizeof(struct envelope)];
- int r;
-
- profile_enter("queue_envelope_walk");
- r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf);
- profile_leave();
-
- log_trace(TRACE_QUEUE,
- "queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")",
- r, evpid);
-
- if (r == -1)
- return (r);
-
- if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
- if ((e = envelope_validate(ep)) == NULL) {
- ep->id = evpid;
- if (env->sc_queue_flags & QUEUE_EVPCACHE)
- queue_envelope_cache_add(ep);
- return (1);
- }
- log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
- evpid, e);
- }
- return (0);
-}
-
-uint32_t
-queue_generate_msgid(void)
-{
- uint32_t msgid;
-
- while ((msgid = arc4random()) == 0)
- ;
-
- return msgid;
-}
-
-uint64_t
-queue_generate_evpid(uint32_t msgid)
-{
- uint32_t rnd;
- uint64_t evpid;
-
- while ((rnd = arc4random()) == 0)
- ;
-
- evpid = msgid;
- evpid <<= 32;
- evpid |= rnd;
-
- return evpid;
-}
-
-static const char*
-envelope_validate(struct envelope *ep)
-{
- if (ep->version != SMTPD_ENVELOPE_VERSION)
- return "version mismatch";
-
- if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL)
- return "invalid helo";
- if (ep->helo[0] == '\0')
- return "empty helo";
-
- if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL)
- return "invalid hostname";
- if (ep->hostname[0] == '\0')
- return "empty hostname";
-
- if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL)
- return "invalid error line";
-
- if (dict_get(env->sc_dispatchers, ep->dispatcher) == NULL)
- return "unknown dispatcher";
-
- return NULL;
-}
-
-void
-queue_api_on_close(int(*cb)(void))
-{
- handler_close = cb;
-}
-
-void
-queue_api_on_message_create(int(*cb)(uint32_t *))
-{
- handler_message_create = cb;
-}
-
-void
-queue_api_on_message_commit(int(*cb)(uint32_t, const char *))
-{
- handler_message_commit = cb;
-}
-
-void
-queue_api_on_message_delete(int(*cb)(uint32_t))
-{
- handler_message_delete = cb;
-}
-
-void
-queue_api_on_message_fd_r(int(*cb)(uint32_t))
-{
- handler_message_fd_r = cb;
-}
-
-void
-queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *))
-{
- handler_envelope_create = cb;
-}
-
-void
-queue_api_on_envelope_delete(int(*cb)(uint64_t))
-{
- handler_envelope_delete = cb;
-}
-
-void
-queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t))
-{
- handler_envelope_update = cb;
-}
-
-void
-queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t))
-{
- handler_envelope_load = cb;
-}
-
-void
-queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t))
-{
- handler_envelope_walk = cb;
-}
-
-void
-queue_api_on_message_walk(int(*cb)(uint64_t *, char *, size_t,
- uint32_t, int *, void **))
-{
- handler_message_walk = cb;
-}