summaryrefslogtreecommitdiffstats
path: root/usr.sbin/smtpd/queue_backend.c
diff options
context:
space:
mode:
authoreric <eric@openbsd.org>2013-07-19 20:37:07 +0000
committereric <eric@openbsd.org>2013-07-19 20:37:07 +0000
commit3f70ecaf32e0036341b31d79061304c29c1b28b1 (patch)
treec4d23ddda913b5df216ac48cbbbb67b6528bdd09 /usr.sbin/smtpd/queue_backend.c
parentPrep for WARNINGS=yes: add the prototypes that were missing, silence (diff)
downloadwireguard-openbsd-3f70ecaf32e0036341b31d79061304c29c1b28b1.tar.xz
wireguard-openbsd-3f70ecaf32e0036341b31d79061304c29c1b28b1.zip
Assorted queue improvements:
- cleanup the internal queue backend API and get rid of the QOP_* thing. - implement a queue_proc backend - rename queue_fsqueue.c to queue_fs - enable support for queue encryption - add an envelope cache - better logging and error reporting
Diffstat (limited to 'usr.sbin/smtpd/queue_backend.c')
-rw-r--r--usr.sbin/smtpd/queue_backend.c278
1 files changed, 248 insertions, 30 deletions
diff --git a/usr.sbin/smtpd/queue_backend.c b/usr.sbin/smtpd/queue_backend.c
index bed293ef237..e52d6a07afc 100644
--- a/usr.sbin/smtpd/queue_backend.c
+++ b/usr.sbin/smtpd/queue_backend.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: queue_backend.c,v 1.45 2013/07/19 11:14:08 eric Exp $ */
+/* $OpenBSD: queue_backend.c,v 1.46 2013/07/19 20:37:07 eric Exp $ */
/*
* Copyright (c) 2011 Gilles Chehade <gilles@poolp.org>
@@ -44,10 +44,30 @@ static const char* envelope_validate(struct envelope *);
extern struct queue_backend queue_backend_fs;
extern struct queue_backend queue_backend_null;
+extern struct queue_backend queue_backend_proc;
extern struct queue_backend queue_backend_ram;
+static void queue_envelope_cache_add(struct envelope *);
+static void queue_envelope_cache_update(struct envelope *);
+static void queue_envelope_cache_del(uint64_t evpid);
+
+TAILQ_HEAD(evplst, envelope);
+
+static struct tree evpcache_tree;
+static struct evplst evpcache_list;
static struct queue_backend *backend;
+static int (*handler_message_create)(uint32_t *);
+static int (*handler_message_commit)(uint32_t, const char*);
+static int (*handler_message_delete)(uint32_t);
+static int (*handler_message_fd_r)(uint32_t);
+static int (*handler_message_corrupt)(uint32_t);
+static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *);
+static int (*handler_envelope_delete)(uint64_t);
+static int (*handler_envelope_update)(uint64_t, const char *, size_t);
+static int (*handler_envelope_load)(uint64_t, char *, size_t);
+static int (*handler_envelope_walk)(uint64_t *, char *, size_t);
+
#ifdef QUEUE_PROFILING
static struct {
@@ -73,8 +93,8 @@ static inline void profile_leave(void)
clock_gettime(CLOCK_MONOTONIC, &t1);
timespecsub(&t1, &profile.t0, &dt);
- log_debug("profile-queue: %s %li.%06li", profile.name,
- dt.tv_sec * 1000000 + dt.tv_nsec / 1000000,
+ log_debug("profile-queue: %s %lld.%06li", profile.name,
+ (long long)dt.tv_sec * 1000000 + dt.tv_nsec / 1000000,
dt.tv_nsec % 1000000);
}
#else
@@ -82,12 +102,10 @@ static inline void profile_leave(void)
#define profile_leave() do {} while (0)
#endif
-int
-queue_message_incoming_path(uint32_t msgid, char *buf, size_t len)
+static int
+queue_message_path(uint32_t msgid, char *buf, size_t len)
{
- return bsnprintf(buf, len, "%s/%08x",
- PATH_INCOMING,
- msgid);
+ return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid);
}
int
@@ -102,10 +120,15 @@ queue_init(const char *name, int server)
if (pwq == NULL)
errx(1, "unknown user %s", SMTPD_USER);
+ tree_init(&evpcache_tree);
+ TAILQ_INIT(&evpcache_list);
+
if (!strcmp(name, "fs"))
backend = &queue_backend_fs;
if (!strcmp(name, "null"))
backend = &queue_backend_null;
+ if (!strcmp(name, "proc"))
+ backend = &queue_backend_proc;
if (!strcmp(name, "ram"))
backend = &queue_backend_ram;
@@ -141,7 +164,7 @@ queue_message_create(uint32_t *msgid)
int r;
profile_enter("queue_message_create");
- r = backend->message(QOP_CREATE, msgid);
+ r = handler_message_create(msgid);
profile_leave();
log_trace(TRACE_QUEUE,
@@ -154,12 +177,17 @@ queue_message_create(uint32_t *msgid)
int
queue_message_delete(uint32_t msgid)
{
+ char msgpath[MAXPATHLEN];
int r;
profile_enter("queue_message_delete");
- r = backend->message(QOP_DELETE, &msgid);
+ r = handler_message_delete(msgid);
profile_leave();
+ /* in case the message is incoming */
+ queue_message_path(msgid, msgpath, sizeof(msgpath));
+ unlink(msgpath);
+
log_trace(TRACE_QUEUE,
"queue-backend: queue_message_delete(%08"PRIx32") -> %i", msgid, r);
@@ -176,11 +204,10 @@ queue_message_commit(uint32_t msgid)
FILE *ofp = NULL;
profile_enter("queue_message_commit");
- queue_message_incoming_path(msgid, msgpath, sizeof msgpath);
- strlcat(msgpath, PATH_MESSAGE, sizeof(msgpath));
- if (env->sc_queue_flags & QUEUE_COMPRESSION) {
+ queue_message_path(msgid, msgpath, sizeof(msgpath));
+ if (env->sc_queue_flags & QUEUE_COMPRESSION) {
bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath);
ifp = fopen(msgpath, "r");
ofp = fopen(tmppath, "w+");
@@ -196,13 +223,40 @@ queue_message_commit(uint32_t msgid)
if (rename(tmppath, msgpath) == -1) {
if (errno == ENOSPC)
return (0);
- fatal("queue_message_commit: rename");
+ unlink(tmppath);
+ log_warn("rename");
+ return (0);
+ }
+ }
+
+ if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
+ bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath);
+ ifp = fopen(msgpath, "r");
+ ofp = fopen(tmppath, "w+");
+ if (ifp == NULL || ofp == NULL)
+ goto err;
+ if (! crypto_encrypt_file(ifp, ofp))
+ goto err;
+ fclose(ifp);
+ fclose(ofp);
+ ifp = NULL;
+ ofp = NULL;
+
+ if (rename(tmppath, msgpath) == -1) {
+ if (errno == ENOSPC)
+ return (0);
+ unlink(tmppath);
+ log_warn("rename");
+ return (0);
}
}
- r = backend->message(QOP_COMMIT, &msgid);
+ r = handler_message_commit(msgid, msgpath);
profile_leave();
+ /* in case it's not done by the backend */
+ unlink(msgpath);
+
log_trace(TRACE_QUEUE,
"queue-backend: queue_message_commit(%08"PRIx32") -> %i",
msgid, r);
@@ -223,7 +277,7 @@ queue_message_corrupt(uint32_t msgid)
int r;
profile_enter("queue_message_corrupt");
- r = backend->message(QOP_CORRUPT, &msgid);
+ r = handler_message_corrupt(msgid);
profile_leave();
log_trace(TRACE_QUEUE,
@@ -240,7 +294,7 @@ queue_message_fd_r(uint32_t msgid)
FILE *ofp = NULL;
profile_enter("queue_message_fd_r");
- fdin = backend->message(QOP_FD_R, &msgid);
+ fdin = handler_message_fd_r(msgid);
profile_leave();
log_trace(TRACE_QUEUE,
@@ -249,6 +303,26 @@ queue_message_fd_r(uint32_t msgid)
if (fdin == -1)
return (-1);
+ if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
+ if ((fdout = mktmpfile()) == -1)
+ goto err;
+ if ((fd = dup(fdout)) == -1)
+ goto err;
+ if ((ifp = fdopen(fdin, "r")) == NULL)
+ goto err;
+ fdin = fd;
+ fd = -1;
+ if ((ofp = fdopen(fdout, "w+")) == NULL)
+ goto err;
+
+ if (! crypto_decrypt_file(ifp, ofp))
+ goto err;
+
+ fclose(ifp);
+ fclose(ofp);
+ lseek(fdin, SEEK_SET, 0);
+ }
+
if (env->sc_queue_flags & QUEUE_COMPRESSION) {
if ((fdout = mktmpfile()) == -1)
goto err;
@@ -288,16 +362,11 @@ err:
int
queue_message_fd_rw(uint32_t msgid)
{
- int r;
+ char buf[SMTPD_MAXPATHLEN];
- profile_enter("queue_message_fd_rw");
- r = backend->message(QOP_FD_RW, &msgid);
- profile_leave();
-
- log_trace(TRACE_QUEUE,
- "queue-backend: queue_message_fd_rw(%08"PRIx32") -> %i", msgid, r);
+ queue_message_path(msgid, buf, sizeof(buf));
- return (r);
+ return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600);
}
static int
@@ -307,6 +376,8 @@ queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
size_t evplen;
size_t complen;
char compbuf[sizeof(struct envelope)];
+ size_t enclen;
+ char encbuf[sizeof(struct envelope)];
evp = evpbuf;
evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize);
@@ -321,6 +392,14 @@ queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
evplen = complen;
}
+ if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
+ enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
+ if (enclen == 0)
+ return (0);
+ evp = encbuf;
+ evplen = enclen;
+ }
+
memmove(evpbuf, evp, evplen);
return (evplen);
@@ -333,10 +412,20 @@ queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
size_t evplen;
char compbuf[sizeof(struct envelope)];
size_t complen;
+ char encbuf[sizeof(struct envelope)];
+ size_t enclen;
evp = evpbuf;
evplen = evpbufsize;
+ if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
+ enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
+ if (enclen == 0)
+ return (0);
+ evp = encbuf;
+ evplen = enclen;
+ }
+
if (env->sc_queue_flags & QUEUE_COMPRESSION) {
complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf);
if (complen == 0)
@@ -348,6 +437,50 @@ queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
return (envelope_load_buffer(ep, evp, evplen));
}
+static void
+queue_envelope_cache_add(struct envelope *e)
+{
+ struct envelope *cached;
+
+ while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size)
+ queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id);
+
+ cached = xcalloc(1, sizeof *cached, "queue_envelope_cache_add");
+ *cached = *e;
+ TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
+ tree_xset(&evpcache_tree, e->id, cached);
+ stat_increment("queue.evpcache.size", 1);
+}
+
+static void
+queue_envelope_cache_update(struct envelope *e)
+{
+ struct envelope *cached;
+
+ if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) {
+ queue_envelope_cache_add(e);
+ stat_increment("queue.evpcache.update.missed", 1);
+ } else {
+ TAILQ_REMOVE(&evpcache_list, cached, entry);
+ *cached = *e;
+ TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
+ stat_increment("queue.evpcache.update.hit", 1);
+ }
+}
+
+static void
+queue_envelope_cache_del(uint64_t evpid)
+{
+ struct envelope *cached;
+
+ if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL)
+ return;
+
+ TAILQ_REMOVE(&evpcache_list, cached, entry);
+ free(cached);
+ stat_decrement("queue.evpcache.size", 1);
+}
+
int
queue_envelope_create(struct envelope *ep)
{
@@ -355,6 +488,7 @@ queue_envelope_create(struct envelope *ep)
char evpbuf[sizeof(struct envelope)];
size_t evplen;
uint64_t evpid;
+ uint32_t msgid;
ep->creation = time(NULL);
evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
@@ -362,9 +496,10 @@ queue_envelope_create(struct envelope *ep)
return (0);
evpid = ep->id;
+ msgid = evpid_to_msgid(evpid);
profile_enter("queue_envelope_create");
- r = backend->envelope(QOP_CREATE, &ep->id, evpbuf, evplen);
+ r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id);
profile_leave();
log_trace(TRACE_QUEUE,
@@ -376,6 +511,9 @@ queue_envelope_create(struct envelope *ep)
ep->id = 0;
}
+ if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
+ queue_envelope_cache_add(ep);
+
return (r);
}
@@ -384,8 +522,11 @@ queue_envelope_delete(uint64_t evpid)
{
int r;
+ if (env->sc_queue_flags & QUEUE_EVPCACHE)
+ queue_envelope_cache_del(evpid);
+
profile_enter("queue_envelope_delete");
- r = backend->envelope(QOP_DELETE, &evpid, NULL, 0);
+ r = handler_envelope_delete(evpid);
profile_leave();
log_trace(TRACE_QUEUE,
@@ -401,10 +542,18 @@ queue_envelope_load(uint64_t evpid, struct envelope *ep)
const char *e;
char evpbuf[sizeof(struct envelope)];
size_t evplen;
+ struct envelope *cached;
+
+ if ((env->sc_queue_flags & QUEUE_EVPCACHE) &&
+ (cached = tree_get(&evpcache_tree, evpid))) {
+ *ep = *cached;
+ stat_increment("queue.evpcache.load.hit", 1);
+ return (1);
+ }
ep->id = evpid;
profile_enter("queue_envelope_load");
- evplen = backend->envelope(QOP_LOAD, &ep->id, evpbuf, sizeof evpbuf);
+ evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf);
profile_leave();
log_trace(TRACE_QUEUE,
@@ -417,6 +566,10 @@ queue_envelope_load(uint64_t evpid, struct envelope *ep)
if (queue_envelope_load_buffer(ep, evpbuf, evplen)) {
if ((e = envelope_validate(ep)) == NULL) {
ep->id = evpid;
+ if (env->sc_queue_flags & QUEUE_EVPCACHE) {
+ queue_envelope_cache_add(ep);
+ stat_increment("queue.evpcache.load.missed", 1);
+ }
return (1);
}
log_debug("debug: invalid envelope %016" PRIx64 ": %s",
@@ -439,9 +592,12 @@ queue_envelope_update(struct envelope *ep)
return (0);
profile_enter("queue_envelope_update");
- r = backend->envelope(QOP_UPDATE, &ep->id, evpbuf, evplen);
+ r = handler_envelope_update(ep->id, evpbuf, evplen);
profile_leave();
+ if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
+ queue_envelope_cache_update(ep);
+
log_trace(TRACE_QUEUE,
"queue-backend: queue_envelope_update(%016"PRIx64") -> %i",
ep->id, r);
@@ -458,7 +614,7 @@ queue_envelope_walk(struct envelope *ep)
int r;
profile_enter("queue_envelope_walk");
- r = backend->envelope(QOP_WALK, &evpid, evpbuf, sizeof evpbuf);
+ r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf);
profile_leave();
log_trace(TRACE_QUEUE,
@@ -471,6 +627,8 @@ queue_envelope_walk(struct envelope *ep)
if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
if ((e = envelope_validate(ep)) == NULL) {
ep->id = evpid;
+ if (env->sc_queue_flags & QUEUE_EVPCACHE)
+ queue_envelope_cache_add(ep);
return (1);
}
log_debug("debug: invalid envelope %016" PRIx64 ": %s",
@@ -529,3 +687,63 @@ envelope_validate(struct envelope *ep)
return NULL;
}
+
+void
+queue_api_on_message_create(int(*cb)(uint32_t *))
+{
+ handler_message_create = cb;
+}
+
+void
+queue_api_on_message_commit(int(*cb)(uint32_t, const char *))
+{
+ handler_message_commit = cb;
+}
+
+void
+queue_api_on_message_delete(int(*cb)(uint32_t))
+{
+ handler_message_delete = cb;
+}
+
+void
+queue_api_on_message_fd_r(int(*cb)(uint32_t))
+{
+ handler_message_fd_r = cb;
+}
+
+void
+queue_api_on_message_corrupt(int(*cb)(uint32_t))
+{
+ handler_message_corrupt = cb;
+}
+
+void
+queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *))
+{
+ handler_envelope_create = cb;
+}
+
+void
+queue_api_on_envelope_delete(int(*cb)(uint64_t))
+{
+ handler_envelope_delete = cb;
+}
+
+void
+queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t))
+{
+ handler_envelope_update = cb;
+}
+
+void
+queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t))
+{
+ handler_envelope_load = cb;
+}
+
+void
+queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t))
+{
+ handler_envelope_walk = cb;
+}