diff options
| author | 2013-07-19 20:37:07 +0000 | |
|---|---|---|
| committer | 2013-07-19 20:37:07 +0000 | |
| commit | 3f70ecaf32e0036341b31d79061304c29c1b28b1 (patch) | |
| tree | c4d23ddda913b5df216ac48cbbbb67b6528bdd09 /usr.sbin/smtpd/queue_backend.c | |
| parent | Prep for WARNINGS=yes: add the prototypes that were missing, silence (diff) | |
| download | wireguard-openbsd-3f70ecaf32e0036341b31d79061304c29c1b28b1.tar.xz wireguard-openbsd-3f70ecaf32e0036341b31d79061304c29c1b28b1.zip | |
Assorted queue improvements:
- cleanup the internal queue backend API and get rid of the QOP_* thing.
- implement a queue_proc backend
- rename queue_fsqueue.c to queue_fs
- enable support for queue encryption
- add an envelope cache
- better logging and error reporting
Diffstat (limited to 'usr.sbin/smtpd/queue_backend.c')
| -rw-r--r-- | usr.sbin/smtpd/queue_backend.c | 278 |
1 files changed, 248 insertions, 30 deletions
diff --git a/usr.sbin/smtpd/queue_backend.c b/usr.sbin/smtpd/queue_backend.c index bed293ef237..e52d6a07afc 100644 --- a/usr.sbin/smtpd/queue_backend.c +++ b/usr.sbin/smtpd/queue_backend.c @@ -1,4 +1,4 @@ -/* $OpenBSD: queue_backend.c,v 1.45 2013/07/19 11:14:08 eric Exp $ */ +/* $OpenBSD: queue_backend.c,v 1.46 2013/07/19 20:37:07 eric Exp $ */ /* * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org> @@ -44,10 +44,30 @@ static const char* envelope_validate(struct envelope *); extern struct queue_backend queue_backend_fs; extern struct queue_backend queue_backend_null; +extern struct queue_backend queue_backend_proc; extern struct queue_backend queue_backend_ram; +static void queue_envelope_cache_add(struct envelope *); +static void queue_envelope_cache_update(struct envelope *); +static void queue_envelope_cache_del(uint64_t evpid); + +TAILQ_HEAD(evplst, envelope); + +static struct tree evpcache_tree; +static struct evplst evpcache_list; static struct queue_backend *backend; +static int (*handler_message_create)(uint32_t *); +static int (*handler_message_commit)(uint32_t, const char*); +static int (*handler_message_delete)(uint32_t); +static int (*handler_message_fd_r)(uint32_t); +static int (*handler_message_corrupt)(uint32_t); +static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *); +static int (*handler_envelope_delete)(uint64_t); +static int (*handler_envelope_update)(uint64_t, const char *, size_t); +static int (*handler_envelope_load)(uint64_t, char *, size_t); +static int (*handler_envelope_walk)(uint64_t *, char *, size_t); + #ifdef QUEUE_PROFILING static struct { @@ -73,8 +93,8 @@ static inline void profile_leave(void) clock_gettime(CLOCK_MONOTONIC, &t1); timespecsub(&t1, &profile.t0, &dt); - log_debug("profile-queue: %s %li.%06li", profile.name, - dt.tv_sec * 1000000 + dt.tv_nsec / 1000000, + log_debug("profile-queue: %s %lld.%06li", profile.name, + (long long)dt.tv_sec * 1000000 + dt.tv_nsec / 1000000, dt.tv_nsec % 1000000); } #else @@ -82,12 +102,10 @@ static inline void profile_leave(void) #define profile_leave() do {} while (0) #endif -int -queue_message_incoming_path(uint32_t msgid, char *buf, size_t len) +static int +queue_message_path(uint32_t msgid, char *buf, size_t len) { - return bsnprintf(buf, len, "%s/%08x", - PATH_INCOMING, - msgid); + return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid); } int @@ -102,10 +120,15 @@ queue_init(const char *name, int server) if (pwq == NULL) errx(1, "unknown user %s", SMTPD_USER); + tree_init(&evpcache_tree); + TAILQ_INIT(&evpcache_list); + if (!strcmp(name, "fs")) backend = &queue_backend_fs; if (!strcmp(name, "null")) backend = &queue_backend_null; + if (!strcmp(name, "proc")) + backend = &queue_backend_proc; if (!strcmp(name, "ram")) backend = &queue_backend_ram; @@ -141,7 +164,7 @@ queue_message_create(uint32_t *msgid) int r; profile_enter("queue_message_create"); - r = backend->message(QOP_CREATE, msgid); + r = handler_message_create(msgid); profile_leave(); log_trace(TRACE_QUEUE, @@ -154,12 +177,17 @@ queue_message_create(uint32_t *msgid) int queue_message_delete(uint32_t msgid) { + char msgpath[MAXPATHLEN]; int r; profile_enter("queue_message_delete"); - r = backend->message(QOP_DELETE, &msgid); + r = handler_message_delete(msgid); profile_leave(); + /* in case the message is incoming */ + queue_message_path(msgid, msgpath, sizeof(msgpath)); + unlink(msgpath); + log_trace(TRACE_QUEUE, "queue-backend: queue_message_delete(%08"PRIx32") -> %i", msgid, r); @@ -176,11 +204,10 @@ queue_message_commit(uint32_t msgid) FILE *ofp = NULL; profile_enter("queue_message_commit"); - queue_message_incoming_path(msgid, msgpath, sizeof msgpath); - strlcat(msgpath, PATH_MESSAGE, sizeof(msgpath)); - if (env->sc_queue_flags & QUEUE_COMPRESSION) { + queue_message_path(msgid, msgpath, sizeof(msgpath)); + if (env->sc_queue_flags & QUEUE_COMPRESSION) { bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath); ifp = fopen(msgpath, "r"); ofp = fopen(tmppath, "w+"); @@ -196,13 +223,40 @@ queue_message_commit(uint32_t msgid) if (rename(tmppath, msgpath) == -1) { if (errno == ENOSPC) return (0); - fatal("queue_message_commit: rename"); + unlink(tmppath); + log_warn("rename"); + return (0); + } + } + + if (env->sc_queue_flags & QUEUE_ENCRYPTION) { + bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath); + ifp = fopen(msgpath, "r"); + ofp = fopen(tmppath, "w+"); + if (ifp == NULL || ofp == NULL) + goto err; + if (! crypto_encrypt_file(ifp, ofp)) + goto err; + fclose(ifp); + fclose(ofp); + ifp = NULL; + ofp = NULL; + + if (rename(tmppath, msgpath) == -1) { + if (errno == ENOSPC) + return (0); + unlink(tmppath); + log_warn("rename"); + return (0); } } - r = backend->message(QOP_COMMIT, &msgid); + r = handler_message_commit(msgid, msgpath); profile_leave(); + /* in case it's not done by the backend */ + unlink(msgpath); + log_trace(TRACE_QUEUE, "queue-backend: queue_message_commit(%08"PRIx32") -> %i", msgid, r); @@ -223,7 +277,7 @@ queue_message_corrupt(uint32_t msgid) int r; profile_enter("queue_message_corrupt"); - r = backend->message(QOP_CORRUPT, &msgid); + r = handler_message_corrupt(msgid); profile_leave(); log_trace(TRACE_QUEUE, @@ -240,7 +294,7 @@ queue_message_fd_r(uint32_t msgid) FILE *ofp = NULL; profile_enter("queue_message_fd_r"); - fdin = backend->message(QOP_FD_R, &msgid); + fdin = handler_message_fd_r(msgid); profile_leave(); log_trace(TRACE_QUEUE, @@ -249,6 +303,26 @@ queue_message_fd_r(uint32_t msgid) if (fdin == -1) return (-1); + if (env->sc_queue_flags & QUEUE_ENCRYPTION) { + if ((fdout = mktmpfile()) == -1) + goto err; + if ((fd = dup(fdout)) == -1) + goto err; + if ((ifp = fdopen(fdin, "r")) == NULL) + goto err; + fdin = fd; + fd = -1; + if ((ofp = fdopen(fdout, "w+")) == NULL) + goto err; + + if (! crypto_decrypt_file(ifp, ofp)) + goto err; + + fclose(ifp); + fclose(ofp); + lseek(fdin, SEEK_SET, 0); + } + if (env->sc_queue_flags & QUEUE_COMPRESSION) { if ((fdout = mktmpfile()) == -1) goto err; @@ -288,16 +362,11 @@ err: int queue_message_fd_rw(uint32_t msgid) { - int r; + char buf[SMTPD_MAXPATHLEN]; - profile_enter("queue_message_fd_rw"); - r = backend->message(QOP_FD_RW, &msgid); - profile_leave(); - - log_trace(TRACE_QUEUE, - "queue-backend: queue_message_fd_rw(%08"PRIx32") -> %i", msgid, r); + queue_message_path(msgid, buf, sizeof(buf)); - return (r); + return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600); } static int @@ -307,6 +376,8 @@ queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) size_t evplen; size_t complen; char compbuf[sizeof(struct envelope)]; + size_t enclen; + char encbuf[sizeof(struct envelope)]; evp = evpbuf; evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize); @@ -321,6 +392,14 @@ queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) evplen = complen; } + if (env->sc_queue_flags & QUEUE_ENCRYPTION) { + enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf); + if (enclen == 0) + return (0); + evp = encbuf; + evplen = enclen; + } + memmove(evpbuf, evp, evplen); return (evplen); @@ -333,10 +412,20 @@ queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) size_t evplen; char compbuf[sizeof(struct envelope)]; size_t complen; + char encbuf[sizeof(struct envelope)]; + size_t enclen; evp = evpbuf; evplen = evpbufsize; + if (env->sc_queue_flags & QUEUE_ENCRYPTION) { + enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf); + if (enclen == 0) + return (0); + evp = encbuf; + evplen = enclen; + } + if (env->sc_queue_flags & QUEUE_COMPRESSION) { complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf); if (complen == 0) @@ -348,6 +437,50 @@ queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) return (envelope_load_buffer(ep, evp, evplen)); } +static void +queue_envelope_cache_add(struct envelope *e) +{ + struct envelope *cached; + + while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size) + queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id); + + cached = xcalloc(1, sizeof *cached, "queue_envelope_cache_add"); + *cached = *e; + TAILQ_INSERT_HEAD(&evpcache_list, cached, entry); + tree_xset(&evpcache_tree, e->id, cached); + stat_increment("queue.evpcache.size", 1); +} + +static void +queue_envelope_cache_update(struct envelope *e) +{ + struct envelope *cached; + + if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) { + queue_envelope_cache_add(e); + stat_increment("queue.evpcache.update.missed", 1); + } else { + TAILQ_REMOVE(&evpcache_list, cached, entry); + *cached = *e; + TAILQ_INSERT_HEAD(&evpcache_list, cached, entry); + stat_increment("queue.evpcache.update.hit", 1); + } +} + +static void +queue_envelope_cache_del(uint64_t evpid) +{ + struct envelope *cached; + + if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL) + return; + + TAILQ_REMOVE(&evpcache_list, cached, entry); + free(cached); + stat_decrement("queue.evpcache.size", 1); +} + int queue_envelope_create(struct envelope *ep) { @@ -355,6 +488,7 @@ queue_envelope_create(struct envelope *ep) char evpbuf[sizeof(struct envelope)]; size_t evplen; uint64_t evpid; + uint32_t msgid; ep->creation = time(NULL); evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf); @@ -362,9 +496,10 @@ queue_envelope_create(struct envelope *ep) return (0); evpid = ep->id; + msgid = evpid_to_msgid(evpid); profile_enter("queue_envelope_create"); - r = backend->envelope(QOP_CREATE, &ep->id, evpbuf, evplen); + r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id); profile_leave(); log_trace(TRACE_QUEUE, @@ -376,6 +511,9 @@ queue_envelope_create(struct envelope *ep) ep->id = 0; } + if (r && env->sc_queue_flags & QUEUE_EVPCACHE) + queue_envelope_cache_add(ep); + return (r); } @@ -384,8 +522,11 @@ queue_envelope_delete(uint64_t evpid) { int r; + if (env->sc_queue_flags & QUEUE_EVPCACHE) + queue_envelope_cache_del(evpid); + profile_enter("queue_envelope_delete"); - r = backend->envelope(QOP_DELETE, &evpid, NULL, 0); + r = handler_envelope_delete(evpid); profile_leave(); log_trace(TRACE_QUEUE, @@ -401,10 +542,18 @@ queue_envelope_load(uint64_t evpid, struct envelope *ep) const char *e; char evpbuf[sizeof(struct envelope)]; size_t evplen; + struct envelope *cached; + + if ((env->sc_queue_flags & QUEUE_EVPCACHE) && + (cached = tree_get(&evpcache_tree, evpid))) { + *ep = *cached; + stat_increment("queue.evpcache.load.hit", 1); + return (1); + } ep->id = evpid; profile_enter("queue_envelope_load"); - evplen = backend->envelope(QOP_LOAD, &ep->id, evpbuf, sizeof evpbuf); + evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf); profile_leave(); log_trace(TRACE_QUEUE, @@ -417,6 +566,10 @@ queue_envelope_load(uint64_t evpid, struct envelope *ep) if (queue_envelope_load_buffer(ep, evpbuf, evplen)) { if ((e = envelope_validate(ep)) == NULL) { ep->id = evpid; + if (env->sc_queue_flags & QUEUE_EVPCACHE) { + queue_envelope_cache_add(ep); + stat_increment("queue.evpcache.load.missed", 1); + } return (1); } log_debug("debug: invalid envelope %016" PRIx64 ": %s", @@ -439,9 +592,12 @@ queue_envelope_update(struct envelope *ep) return (0); profile_enter("queue_envelope_update"); - r = backend->envelope(QOP_UPDATE, &ep->id, evpbuf, evplen); + r = handler_envelope_update(ep->id, evpbuf, evplen); profile_leave(); + if (r && env->sc_queue_flags & QUEUE_EVPCACHE) + queue_envelope_cache_update(ep); + log_trace(TRACE_QUEUE, "queue-backend: queue_envelope_update(%016"PRIx64") -> %i", ep->id, r); @@ -458,7 +614,7 @@ queue_envelope_walk(struct envelope *ep) int r; profile_enter("queue_envelope_walk"); - r = backend->envelope(QOP_WALK, &evpid, evpbuf, sizeof evpbuf); + r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf); profile_leave(); log_trace(TRACE_QUEUE, @@ -471,6 +627,8 @@ queue_envelope_walk(struct envelope *ep) if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) { if ((e = envelope_validate(ep)) == NULL) { ep->id = evpid; + if (env->sc_queue_flags & QUEUE_EVPCACHE) + queue_envelope_cache_add(ep); return (1); } log_debug("debug: invalid envelope %016" PRIx64 ": %s", @@ -529,3 +687,63 @@ envelope_validate(struct envelope *ep) return NULL; } + +void +queue_api_on_message_create(int(*cb)(uint32_t *)) +{ + handler_message_create = cb; +} + +void +queue_api_on_message_commit(int(*cb)(uint32_t, const char *)) +{ + handler_message_commit = cb; +} + +void +queue_api_on_message_delete(int(*cb)(uint32_t)) +{ + handler_message_delete = cb; +} + +void +queue_api_on_message_fd_r(int(*cb)(uint32_t)) +{ + handler_message_fd_r = cb; +} + +void +queue_api_on_message_corrupt(int(*cb)(uint32_t)) +{ + handler_message_corrupt = cb; +} + +void +queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *)) +{ + handler_envelope_create = cb; +} + +void +queue_api_on_envelope_delete(int(*cb)(uint64_t)) +{ + handler_envelope_delete = cb; +} + +void +queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t)) +{ + handler_envelope_update = cb; +} + +void +queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t)) +{ + handler_envelope_load = cb; +} + +void +queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t)) +{ + handler_envelope_walk = cb; +} |
