diff options
Diffstat (limited to 'usr.sbin/smtpd/queue_backend.c')
-rw-r--r-- | usr.sbin/smtpd/queue_backend.c | 806 |
1 files changed, 806 insertions, 0 deletions
diff --git a/usr.sbin/smtpd/queue_backend.c b/usr.sbin/smtpd/queue_backend.c new file mode 100644 index 00000000..fa945f47 --- /dev/null +++ b/usr.sbin/smtpd/queue_backend.c @@ -0,0 +1,806 @@ +/* $OpenBSD: queue_backend.c,v 1.66 2020/04/22 11:35:34 eric Exp $ */ + +/* + * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org> + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include "includes.h" + +#include <sys/types.h> +#include <sys/queue.h> +#include <sys/tree.h> +#include <sys/socket.h> +#include <sys/stat.h> + +#include <ctype.h> +#include <err.h> +#include <errno.h> +#include <event.h> +#include <fcntl.h> +#include <grp.h> +#include <imsg.h> +#include <limits.h> +#include <inttypes.h> +#include <pwd.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <time.h> +#include <unistd.h> + +#include "smtpd.h" +#include "log.h" + +static const char* envelope_validate(struct envelope *); + +extern struct queue_backend queue_backend_fs; +extern struct queue_backend queue_backend_null; +extern struct queue_backend queue_backend_proc; +extern struct queue_backend queue_backend_ram; + +static void queue_envelope_cache_add(struct envelope *); +static void queue_envelope_cache_update(struct envelope *); +static void queue_envelope_cache_del(uint64_t evpid); + +TAILQ_HEAD(evplst, envelope); + +static struct tree evpcache_tree; +static struct evplst evpcache_list; +static struct queue_backend *backend; + +static int (*handler_close)(void); +static int (*handler_message_create)(uint32_t *); +static int (*handler_message_commit)(uint32_t, const char*); +static int (*handler_message_delete)(uint32_t); +static int (*handler_message_fd_r)(uint32_t); +static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *); +static int (*handler_envelope_delete)(uint64_t); +static int (*handler_envelope_update)(uint64_t, const char *, size_t); +static int (*handler_envelope_load)(uint64_t, char *, size_t); +static int (*handler_envelope_walk)(uint64_t *, char *, size_t); +static int (*handler_message_walk)(uint64_t *, char *, size_t, + uint32_t, int *, void **); + +#ifdef QUEUE_PROFILING + +static struct { + struct timespec t0; + const char *name; +} profile; + +static inline void profile_enter(const char *name) +{ + if ((profiling & PROFILE_QUEUE) == 0) + return; + + profile.name = name; + clock_gettime(CLOCK_MONOTONIC, &profile.t0); +} + +static inline void profile_leave(void) +{ + struct timespec t1, dt; + + if ((profiling & PROFILE_QUEUE) == 0) + return; + + clock_gettime(CLOCK_MONOTONIC, &t1); + timespecsub(&t1, &profile.t0, &dt); + log_debug("profile-queue: %s %lld.%09ld", profile.name, + (long long)dt.tv_sec, dt.tv_nsec); +} +#else +#define profile_enter(x) do {} while (0) +#define profile_leave() do {} while (0) +#endif + +static int +queue_message_path(uint32_t msgid, char *buf, size_t len) +{ + return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid); +} + +int +queue_init(const char *name, int server) +{ + struct passwd *pwq; + struct group *gr; + int r; + + pwq = getpwnam(SMTPD_QUEUE_USER); + if (pwq == NULL) + errx(1, "unknown user %s", SMTPD_QUEUE_USER); + + gr = getgrnam(SMTPD_QUEUE_GROUP); + if (gr == NULL) + errx(1, "unknown group %s", SMTPD_QUEUE_GROUP); + + tree_init(&evpcache_tree); + TAILQ_INIT(&evpcache_list); + + if (!strcmp(name, "fs")) + backend = &queue_backend_fs; + else if (!strcmp(name, "null")) + backend = &queue_backend_null; + else if (!strcmp(name, "ram")) + backend = &queue_backend_ram; + else + backend = &queue_backend_proc; + + if (server) { + if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0) + errx(1, "error in spool directory setup"); + if (ckdir(PATH_SPOOL PATH_OFFLINE, 0770, 0, gr->gr_gid, 1) == 0) + errx(1, "error in offline directory setup"); + if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0) + errx(1, "error in purge directory setup"); + + mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE); + + if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0) + errx(1, "error in purge directory setup"); + } + + r = backend->init(pwq, server, name); + + log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r); + + return (r); +} + +int +queue_close(void) +{ + if (handler_close) + return (handler_close()); + + return (1); +} + +int +queue_message_create(uint32_t *msgid) +{ + int r; + + profile_enter("queue_message_create"); + r = handler_message_create(msgid); + profile_leave(); + + log_trace(TRACE_QUEUE, + "queue-backend: queue_message_create() -> %d (%08"PRIx32")", + r, *msgid); + + return (r); +} + +int +queue_message_delete(uint32_t msgid) +{ + char msgpath[PATH_MAX]; + uint64_t evpid; + void *iter; + int r; + + profile_enter("queue_message_delete"); + r = handler_message_delete(msgid); + profile_leave(); + + /* in case the message is incoming */ + queue_message_path(msgid, msgpath, sizeof(msgpath)); + unlink(msgpath); + + /* remove remaining envelopes from the cache if any (on rollback) */ + evpid = msgid_to_evpid(msgid); + for (;;) { + iter = NULL; + if (!tree_iterfrom(&evpcache_tree, &iter, evpid, &evpid, NULL)) + break; + if (evpid_to_msgid(evpid) != msgid) + break; + queue_envelope_cache_del(evpid); + } + + log_trace(TRACE_QUEUE, + "queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r); + + return (r); +} + +int +queue_message_commit(uint32_t msgid) +{ + int r; + char msgpath[PATH_MAX]; + char tmppath[PATH_MAX]; + FILE *ifp = NULL; + FILE *ofp = NULL; + + profile_enter("queue_message_commit"); + + queue_message_path(msgid, msgpath, sizeof(msgpath)); + + if (env->sc_queue_flags & QUEUE_COMPRESSION) { + bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath); + ifp = fopen(msgpath, "r"); + ofp = fopen(tmppath, "w+"); + if (ifp == NULL || ofp == NULL) + goto err; + if (!compress_file(ifp, ofp)) + goto err; + fclose(ifp); + fclose(ofp); + ifp = NULL; + ofp = NULL; + + if (rename(tmppath, msgpath) == -1) { + if (errno == ENOSPC) + return (0); + unlink(tmppath); + log_warn("rename"); + return (0); + } + } + + if (env->sc_queue_flags & QUEUE_ENCRYPTION) { + bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath); + ifp = fopen(msgpath, "r"); + ofp = fopen(tmppath, "w+"); + if (ifp == NULL || ofp == NULL) + goto err; + if (!crypto_encrypt_file(ifp, ofp)) + goto err; + fclose(ifp); + fclose(ofp); + ifp = NULL; + ofp = NULL; + + if (rename(tmppath, msgpath) == -1) { + if (errno == ENOSPC) + return (0); + unlink(tmppath); + log_warn("rename"); + return (0); + } + } + + r = handler_message_commit(msgid, msgpath); + profile_leave(); + + /* in case it's not done by the backend */ + unlink(msgpath); + + log_trace(TRACE_QUEUE, + "queue-backend: queue_message_commit(%08"PRIx32") -> %d", + msgid, r); + + return (r); + +err: + if (ifp) + fclose(ifp); + if (ofp) + fclose(ofp); + return 0; +} + +int +queue_message_fd_r(uint32_t msgid) +{ + int fdin = -1, fdout = -1, fd = -1; + FILE *ifp = NULL; + FILE *ofp = NULL; + + profile_enter("queue_message_fd_r"); + fdin = handler_message_fd_r(msgid); + profile_leave(); + + log_trace(TRACE_QUEUE, + "queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin); + + if (fdin == -1) + return (-1); + + if (env->sc_queue_flags & QUEUE_ENCRYPTION) { + if ((fdout = mktmpfile()) == -1) + goto err; + if ((fd = dup(fdout)) == -1) + goto err; + if ((ifp = fdopen(fdin, "r")) == NULL) + goto err; + fdin = fd; + fd = -1; + if ((ofp = fdopen(fdout, "w+")) == NULL) + goto err; + + if (!crypto_decrypt_file(ifp, ofp)) + goto err; + + fclose(ifp); + ifp = NULL; + fclose(ofp); + ofp = NULL; + lseek(fdin, SEEK_SET, 0); + } + + if (env->sc_queue_flags & QUEUE_COMPRESSION) { + if ((fdout = mktmpfile()) == -1) + goto err; + if ((fd = dup(fdout)) == -1) + goto err; + if ((ifp = fdopen(fdin, "r")) == NULL) + goto err; + fdin = fd; + fd = -1; + if ((ofp = fdopen(fdout, "w+")) == NULL) + goto err; + + if (!uncompress_file(ifp, ofp)) + goto err; + + fclose(ifp); + ifp = NULL; + fclose(ofp); + ofp = NULL; + lseek(fdin, SEEK_SET, 0); + } + + return (fdin); + +err: + if (fd != -1) + close(fd); + if (fdin != -1) + close(fdin); + if (fdout != -1) + close(fdout); + if (ifp) + fclose(ifp); + if (ofp) + fclose(ofp); + return -1; +} + +int +queue_message_fd_rw(uint32_t msgid) +{ + char buf[PATH_MAX]; + + queue_message_path(msgid, buf, sizeof(buf)); + + return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600); +} + +static int +queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) +{ + char *evp; + size_t evplen; + size_t complen; + char compbuf[sizeof(struct envelope)]; + size_t enclen; + char encbuf[sizeof(struct envelope)]; + + evp = evpbuf; + evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize); + if (evplen == 0) + return (0); + + if (env->sc_queue_flags & QUEUE_COMPRESSION) { + complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf); + if (complen == 0) + return (0); + evp = compbuf; + evplen = complen; + } + + if (env->sc_queue_flags & QUEUE_ENCRYPTION) { + enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf); + if (enclen == 0) + return (0); + evp = encbuf; + evplen = enclen; + } + + memmove(evpbuf, evp, evplen); + + return (evplen); +} + +static int +queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) +{ + char *evp; + size_t evplen; + char compbuf[sizeof(struct envelope)]; + size_t complen; + char encbuf[sizeof(struct envelope)]; + size_t enclen; + + evp = evpbuf; + evplen = evpbufsize; + + if (env->sc_queue_flags & QUEUE_ENCRYPTION) { + enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf); + if (enclen == 0) + return (0); + evp = encbuf; + evplen = enclen; + } + + if (env->sc_queue_flags & QUEUE_COMPRESSION) { + complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf); + if (complen == 0) + return (0); + evp = compbuf; + evplen = complen; + } + + return (envelope_load_buffer(ep, evp, evplen)); +} + +static void +queue_envelope_cache_add(struct envelope *e) +{ + struct envelope *cached; + + while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size) + queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id); + + cached = xcalloc(1, sizeof *cached); + *cached = *e; + TAILQ_INSERT_HEAD(&evpcache_list, cached, entry); + tree_xset(&evpcache_tree, e->id, cached); + stat_increment("queue.evpcache.size", 1); +} + +static void +queue_envelope_cache_update(struct envelope *e) +{ + struct envelope *cached; + + if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) { + queue_envelope_cache_add(e); + stat_increment("queue.evpcache.update.missed", 1); + } else { + TAILQ_REMOVE(&evpcache_list, cached, entry); + *cached = *e; + TAILQ_INSERT_HEAD(&evpcache_list, cached, entry); + stat_increment("queue.evpcache.update.hit", 1); + } +} + +static void +queue_envelope_cache_del(uint64_t evpid) +{ + struct envelope *cached; + + if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL) + return; + + TAILQ_REMOVE(&evpcache_list, cached, entry); + free(cached); + stat_decrement("queue.evpcache.size", 1); +} + +int +queue_envelope_create(struct envelope *ep) +{ + int r; + char evpbuf[sizeof(struct envelope)]; + size_t evplen; + uint64_t evpid; + uint32_t msgid; + + ep->creation = time(NULL); + evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf); + if (evplen == 0) + return (0); + + evpid = ep->id; + msgid = evpid_to_msgid(evpid); + + profile_enter("queue_envelope_create"); + r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id); + profile_leave(); + + log_trace(TRACE_QUEUE, + "queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")", + evpid, evplen, r, ep->id); + + if (!r) { + ep->creation = 0; + ep->id = 0; + } + + if (r && env->sc_queue_flags & QUEUE_EVPCACHE) + queue_envelope_cache_add(ep); + + return (r); +} + +int +queue_envelope_delete(uint64_t evpid) +{ + int r; + + if (env->sc_queue_flags & QUEUE_EVPCACHE) + queue_envelope_cache_del(evpid); + + profile_enter("queue_envelope_delete"); + r = handler_envelope_delete(evpid); + profile_leave(); + + log_trace(TRACE_QUEUE, + "queue-backend: queue_envelope_delete(%016"PRIx64") -> %d", + evpid, r); + + return (r); +} + +int +queue_envelope_load(uint64_t evpid, struct envelope *ep) +{ + const char *e; + char evpbuf[sizeof(struct envelope)]; + size_t evplen; + struct envelope *cached; + + if ((env->sc_queue_flags & QUEUE_EVPCACHE) && + (cached = tree_get(&evpcache_tree, evpid))) { + *ep = *cached; + stat_increment("queue.evpcache.load.hit", 1); + return (1); + } + + ep->id = evpid; + profile_enter("queue_envelope_load"); + evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf); + profile_leave(); + + log_trace(TRACE_QUEUE, + "queue-backend: queue_envelope_load(%016"PRIx64") -> %zu", + evpid, evplen); + + if (evplen == 0) + return (0); + + if (queue_envelope_load_buffer(ep, evpbuf, evplen)) { + if ((e = envelope_validate(ep)) == NULL) { + ep->id = evpid; + if (env->sc_queue_flags & QUEUE_EVPCACHE) { + queue_envelope_cache_add(ep); + stat_increment("queue.evpcache.load.missed", 1); + } + return (1); + } + log_warnx("warn: invalid envelope %016" PRIx64 ": %s", + evpid, e); + } + return (0); +} + +int +queue_envelope_update(struct envelope *ep) +{ + char evpbuf[sizeof(struct envelope)]; + size_t evplen; + int r; + + evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf); + if (evplen == 0) + return (0); + + profile_enter("queue_envelope_update"); + r = handler_envelope_update(ep->id, evpbuf, evplen); + profile_leave(); + + if (r && env->sc_queue_flags & QUEUE_EVPCACHE) + queue_envelope_cache_update(ep); + + log_trace(TRACE_QUEUE, + "queue-backend: queue_envelope_update(%016"PRIx64") -> %d", + ep->id, r); + + return (r); +} + +int +queue_message_walk(struct envelope *ep, uint32_t msgid, int *done, void **data) +{ + char evpbuf[sizeof(struct envelope)]; + uint64_t evpid; + int r; + const char *e; + + profile_enter("queue_message_walk"); + r = handler_message_walk(&evpid, evpbuf, sizeof evpbuf, + msgid, done, data); + profile_leave(); + + log_trace(TRACE_QUEUE, + "queue-backend: queue_message_walk() -> %d (%016"PRIx64")", + r, evpid); + + if (r == -1) + return (r); + + if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) { + if ((e = envelope_validate(ep)) == NULL) { + ep->id = evpid; + /* + * do not cache the envelope here, while discovering + * envelopes one could re-run discover on already + * scheduled envelopes which leads to triggering of + * strict checks in caching. Envelopes could anyway + * be loaded from backend if it isn't cached. + */ + return (1); + } + log_warnx("warn: invalid envelope %016" PRIx64 ": %s", + evpid, e); + } + return (0); +} + +int +queue_envelope_walk(struct envelope *ep) +{ + const char *e; + uint64_t evpid; + char evpbuf[sizeof(struct envelope)]; + int r; + + profile_enter("queue_envelope_walk"); + r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf); + profile_leave(); + + log_trace(TRACE_QUEUE, + "queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")", + r, evpid); + + if (r == -1) + return (r); + + if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) { + if ((e = envelope_validate(ep)) == NULL) { + ep->id = evpid; + if (env->sc_queue_flags & QUEUE_EVPCACHE) + queue_envelope_cache_add(ep); + return (1); + } + log_warnx("warn: invalid envelope %016" PRIx64 ": %s", + evpid, e); + } + return (0); +} + +uint32_t +queue_generate_msgid(void) +{ + uint32_t msgid; + + while ((msgid = arc4random()) == 0) + ; + + return msgid; +} + +uint64_t +queue_generate_evpid(uint32_t msgid) +{ + uint32_t rnd; + uint64_t evpid; + + while ((rnd = arc4random()) == 0) + ; + + evpid = msgid; + evpid <<= 32; + evpid |= rnd; + + return evpid; +} + +static const char* +envelope_validate(struct envelope *ep) +{ + if (ep->version != SMTPD_ENVELOPE_VERSION) + return "version mismatch"; + + if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL) + return "invalid helo"; + if (ep->helo[0] == '\0') + return "empty helo"; + + if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL) + return "invalid hostname"; + if (ep->hostname[0] == '\0') + return "empty hostname"; + + if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL) + return "invalid error line"; + + if (dict_get(env->sc_dispatchers, ep->dispatcher) == NULL) + return "unknown dispatcher"; + + return NULL; +} + +void +queue_api_on_close(int(*cb)(void)) +{ + handler_close = cb; +} + +void +queue_api_on_message_create(int(*cb)(uint32_t *)) +{ + handler_message_create = cb; +} + +void +queue_api_on_message_commit(int(*cb)(uint32_t, const char *)) +{ + handler_message_commit = cb; +} + +void +queue_api_on_message_delete(int(*cb)(uint32_t)) +{ + handler_message_delete = cb; +} + +void +queue_api_on_message_fd_r(int(*cb)(uint32_t)) +{ + handler_message_fd_r = cb; +} + +void +queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *)) +{ + handler_envelope_create = cb; +} + +void +queue_api_on_envelope_delete(int(*cb)(uint64_t)) +{ + handler_envelope_delete = cb; +} + +void +queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t)) +{ + handler_envelope_update = cb; +} + +void +queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t)) +{ + handler_envelope_load = cb; +} + +void +queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t)) +{ + handler_envelope_walk = cb; +} + +void +queue_api_on_message_walk(int(*cb)(uint64_t *, char *, size_t, + uint32_t, int *, void **)) +{ + handler_message_walk = cb; +} |