diff options
| author | 2014-07-10 14:45:02 +0000 | |
|---|---|---|
| committer | 2014-07-10 14:45:02 +0000 | |
| commit | acfdf0da6e926b6b38c6ddfb5f9451e047ef456c (patch) | |
| tree | a5c0758ad0e2a1b0c9d3c304c684defacb946fe3 | |
| parent | repair prototypes (diff) | |
| download | wireguard-openbsd-acfdf0da6e926b6b38c6ddfb5f9451e047ef456c.tar.xz wireguard-openbsd-acfdf0da6e926b6b38c6ddfb5f9451e047ef456c.zip | |
Improve the scheduler, better and simpler.
- Get rid of the scheduler_batch structure. The scheduler can now return
envelopes of different types in a single run, interlacing them to avoid
batch effects.
- Ask for an acknowledgement from the queue when removing or expiring
an envelope to benefit from the inflight envelope limitation mechanism.
This ensures that the scheduler always keeps sending envelopes at a rate
that the queue can sustain in all cases.
- Limit the number of envelopes in a holdq. When a holdq is full,
new envelopes are put back in the pending queue instead, with a
shorter retry time.
- Plumbing for proc-ified schedulers.
imsg version bump. smtpctl stop before updating.
ok gilles@
| -rw-r--r-- | usr.sbin/smtpd/queue.c | 11 | ||||
| -rw-r--r-- | usr.sbin/smtpd/scheduler.c | 306 | ||||
| -rw-r--r-- | usr.sbin/smtpd/scheduler_api.c | 82 | ||||
| -rw-r--r-- | usr.sbin/smtpd/scheduler_backend.c | 23 | ||||
| -rw-r--r-- | usr.sbin/smtpd/scheduler_null.c | 13 | ||||
| -rw-r--r-- | usr.sbin/smtpd/scheduler_proc.c | 76 | ||||
| -rw-r--r-- | usr.sbin/smtpd/scheduler_ramqueue.c | 240 | ||||
| -rw-r--r-- | usr.sbin/smtpd/smtpd-api.h | 30 | ||||
| -rw-r--r-- | usr.sbin/smtpd/smtpd.c | 3 | ||||
| -rw-r--r-- | usr.sbin/smtpd/smtpd.h | 10 |
10 files changed, 390 insertions, 404 deletions
diff --git a/usr.sbin/smtpd/queue.c b/usr.sbin/smtpd/queue.c index 4e26fe600c3..889ce104e48 100644 --- a/usr.sbin/smtpd/queue.c +++ b/usr.sbin/smtpd/queue.c @@ -1,4 +1,4 @@ -/* $OpenBSD: queue.c,v 1.163 2014/07/08 15:45:32 eric Exp $ */ +/* $OpenBSD: queue.c,v 1.164 2014/07/10 14:45:02 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org> @@ -197,9 +197,14 @@ queue_imsg(struct mproc *p, struct imsg *imsg) m_get_evpid(&m, &evpid); m_end(&m); + m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_ACK, 0, 0, -1); + m_add_evpid(p_scheduler, evpid); + m_close(p_scheduler); + /* already removed by scheduler */ if (queue_envelope_load(evpid, &evp) == 0) return; + queue_log(&evp, "Remove", "Removed by administrator"); queue_envelope_delete(evpid); return; @@ -209,6 +214,10 @@ queue_imsg(struct mproc *p, struct imsg *imsg) m_get_evpid(&m, &evpid); m_end(&m); + m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_ACK, 0, 0, -1); + m_add_evpid(p_scheduler, evpid); + m_close(p_scheduler); + /* already removed by scheduler*/ if (queue_envelope_load(evpid, &evp) == 0) return; diff --git a/usr.sbin/smtpd/scheduler.c b/usr.sbin/smtpd/scheduler.c index 9301a4259fb..874ffc2aff7 100644 --- a/usr.sbin/smtpd/scheduler.c +++ b/usr.sbin/smtpd/scheduler.c @@ -1,4 +1,4 @@ -/* $OpenBSD: scheduler.c,v 1.46 2014/07/08 07:59:31 sobrado Exp $ */ +/* $OpenBSD: scheduler.c,v 1.47 2014/07/10 14:45:02 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org> @@ -49,16 +49,11 @@ static void scheduler_shutdown(void); static void scheduler_sig_handler(int, short, void *); static void scheduler_reset_events(void); static void scheduler_timeout(int, short, void *); -static void scheduler_process_remove(struct scheduler_batch *); -static void scheduler_process_expire(struct scheduler_batch *); -static void scheduler_process_update(struct scheduler_batch *); -static void scheduler_process_bounce(struct scheduler_batch *); -static void scheduler_process_mda(struct scheduler_batch *); -static void scheduler_process_mta(struct scheduler_batch *); static struct scheduler_backend *backend = NULL; static struct event ev; -static size_t ninflight; +static size_t ninflight = 0; +static int *types; static uint64_t *evpids; static uint32_t *msgids; static struct evpstate *state; @@ -135,6 +130,18 @@ scheduler_imsg(struct mproc *p, struct imsg *imsg) scheduler_reset_events(); return; + case IMSG_QUEUE_ENVELOPE_ACK: + m_msg(&m, imsg); + m_get_evpid(&m, &evpid); + m_end(&m); + log_trace(TRACE_SCHEDULER, + "scheduler: queue ack removal of evp:%016" PRIx64, + evpid); + ninflight -= 1; + stat_decrement("scheduler.envelope.inflight", 1); + scheduler_reset_events(); + return; + case IMSG_QUEUE_DELIVERY_OK: m_msg(&m, imsg); m_get_evpid(&m, &evpid); @@ -264,6 +271,13 @@ scheduler_imsg(struct mproc *p, struct imsg *imsg) log_verbose(v); return; + case IMSG_CTL_PROFILE: + m_msg(&m, imsg); + m_get_int(&m, &v); + m_end(&m); + profiling = v; + return; + case IMSG_CTL_LIST_MESSAGES: msgid = *(uint32_t *)(imsg->data); n = backend->messages(msgid, msgids, env->sc_scheduler_max_msg_batch_size); @@ -414,7 +428,7 @@ scheduler(void) config_process(PROC_SCHEDULER); - backend->init(); + backend->init(backend_scheduler); if (chroot(PATH_CHROOT) == -1) fatal("scheduler: chroot"); @@ -427,6 +441,7 @@ scheduler(void) fatal("scheduler: cannot drop privileges"); evpids = xcalloc(env->sc_scheduler_max_schedule, sizeof *evpids, "scheduler: init evpids"); + types = xcalloc(env->sc_scheduler_max_schedule, sizeof *types, "scheduler: init types"); msgids = xcalloc(env->sc_scheduler_max_msg_batch_size, sizeof *msgids, "scheduler: list msg"); state = xcalloc(env->sc_scheduler_max_evp_batch_size, sizeof *state, "scheduler: list evp"); @@ -457,198 +472,131 @@ static void scheduler_timeout(int fd, short event, void *p) { struct timeval tv; - struct scheduler_batch batch; - int typemask, left; - - log_trace(TRACE_SCHEDULER, "scheduler: getting next batch"); + size_t i; + size_t d_inflight; + size_t d_envelope; + size_t d_removed; + size_t d_expired; + size_t d_updated; + size_t count; + int mask, r, delay; tv.tv_sec = 0; tv.tv_usec = 0; - typemask = SCHED_REMOVE | SCHED_EXPIRE | SCHED_UPDATE | SCHED_BOUNCE; - if (ninflight < env->sc_scheduler_max_inflight && - !(env->sc_flags & SMTPD_MDA_PAUSED)) - typemask |= SCHED_MDA; - if (ninflight < env->sc_scheduler_max_inflight && - !(env->sc_flags & SMTPD_MTA_PAUSED)) - typemask |= SCHED_MTA; - - left = typemask; - - again: - - log_trace(TRACE_SCHEDULER, "scheduler: typemask=0x%x", left); - - memset(&batch, 0, sizeof (batch)); - batch.evpids = evpids; - batch.evpcount = env->sc_scheduler_max_schedule; - backend->batch(left, &batch); + mask = SCHED_UPDATE; - switch (batch.type) { - case SCHED_REMOVE: - log_trace(TRACE_SCHEDULER, "scheduler: SCHED_REMOVE %zu", - batch.evpcount); - scheduler_process_remove(&batch); - break; - - case SCHED_EXPIRE: - log_trace(TRACE_SCHEDULER, "scheduler: SCHED_EXPIRE %zu", - batch.evpcount); - scheduler_process_expire(&batch); - break; - - case SCHED_UPDATE: - log_trace(TRACE_SCHEDULER, "scheduler: SCHED_UPDATE %zu", - batch.evpcount); - scheduler_process_update(&batch); - break; + if (ninflight < env->sc_scheduler_max_inflight) { + mask |= SCHED_EXPIRE | SCHED_REMOVE | SCHED_BOUNCE; + if (!(env->sc_flags & SMTPD_MDA_PAUSED)) + mask |= SCHED_MDA; + if (!(env->sc_flags & SMTPD_MTA_PAUSED)) + mask |= SCHED_MTA; + } - case SCHED_BOUNCE: - log_trace(TRACE_SCHEDULER, "scheduler: SCHED_BOUNCE %zu", - batch.evpcount); - scheduler_process_bounce(&batch); - break; + count = env->sc_scheduler_max_schedule; - case SCHED_MDA: - log_trace(TRACE_SCHEDULER, "scheduler: SCHED_MDA %zu", - batch.evpcount); - scheduler_process_mda(&batch); - break; + log_trace(TRACE_SCHEDULER, "scheduler: getting batch: mask=0x%x, count=%zu", mask, count); - case SCHED_MTA: - log_trace(TRACE_SCHEDULER, "scheduler: SCHED_MTA %zu", - batch.evpcount); - scheduler_process_mta(&batch); - break; + r = backend->batch(mask, &delay, &count, evpids, types); - default: - break; - } + log_trace(TRACE_SCHEDULER, "scheduler: got r=%i, delay=%i, count=%zu", r, delay, count); - log_trace(TRACE_SCHEDULER, "scheduler: mask=0x%x", batch.mask); + if (r < 0) + fatalx("scheduler: error in batch handler"); - left &= batch.mask; - left &= ~batch.type; + if (r == 0) { - /* We can still schedule something immediately. */ - if (left) - goto again; + if (delay < -1) + fatalx("scheduler: invalid delay %d", delay); - /* We can schedule in the next event frame */ - if (batch.mask & typemask || - (batch.mask & SCHED_DELAY && batch.type != SCHED_DELAY)) { - tv.tv_sec = 0; - tv.tv_usec = 0; - evtimer_add(&ev, &tv); - return; - } + if (delay == -1) { + log_trace(TRACE_SCHEDULER, "scheduler: sleeping"); + return; + } - if (batch.type == SCHED_DELAY) { - tv.tv_sec = batch.delay; + tv.tv_sec = delay; tv.tv_usec = 0; log_trace(TRACE_SCHEDULER, - "scheduler: SCHED_DELAY %s", duration_to_text(tv.tv_sec)); + "scheduler: waiting for %s", duration_to_text(tv.tv_sec)); evtimer_add(&ev, &tv); return; } - log_trace(TRACE_SCHEDULER, "scheduler: SCHED_NONE"); -} - -static void -scheduler_process_remove(struct scheduler_batch *batch) -{ - size_t i; - - for (i = 0; i < batch->evpcount; i++) { - log_debug("debug: scheduler: evp:%016" PRIx64 " removed", - batch->evpids[i]); - m_create(p_queue, IMSG_SCHED_ENVELOPE_REMOVE, 0, 0, -1); - m_add_evpid(p_queue, batch->evpids[i]); - m_close(p_queue); - } - - stat_decrement("scheduler.envelope", batch->evpcount); - stat_increment("scheduler.envelope.removed", batch->evpcount); -} - -static void -scheduler_process_expire(struct scheduler_batch *batch) -{ - size_t i; - - for (i = 0; i < batch->evpcount; i++) { - log_debug("debug: scheduler: evp:%016" PRIx64 " expired", - batch->evpids[i]); - m_create(p_queue, IMSG_SCHED_ENVELOPE_EXPIRE, 0, 0, -1); - m_add_evpid(p_queue, batch->evpids[i]); - m_close(p_queue); - } - - stat_decrement("scheduler.envelope", batch->evpcount); - stat_increment("scheduler.envelope.expired", batch->evpcount); -} - -static void -scheduler_process_update(struct scheduler_batch *batch) -{ - size_t i; - - for (i = 0; i < batch->evpcount; i++) { - log_debug("debug: scheduler: evp:%016" PRIx64 - " scheduled (update)", batch->evpids[i]); - } - - stat_increment("scheduler.envelope.update", batch->evpcount); -} - -static void -scheduler_process_bounce(struct scheduler_batch *batch) -{ - size_t i; - - for (i = 0; i < batch->evpcount; i++) { - log_debug("debug: scheduler: evp:%016" PRIx64 - " scheduled (bounce)", batch->evpids[i]); - m_create(p_queue, IMSG_SCHED_ENVELOPE_INJECT, 0, 0, -1); - m_add_evpid(p_queue, batch->evpids[i]); - m_close(p_queue); - } - - ninflight += batch->evpcount; - stat_increment("scheduler.envelope.inflight", batch->evpcount); -} - -static void -scheduler_process_mda(struct scheduler_batch *batch) -{ - size_t i; - - for (i = 0; i < batch->evpcount; i++) { - log_debug("debug: scheduler: evp:%016" PRIx64 - " scheduled (mda)", batch->evpids[i]); - m_create(p_queue, IMSG_SCHED_ENVELOPE_DELIVER, 0, 0, -1); - m_add_evpid(p_queue, batch->evpids[i]); - m_close(p_queue); + d_inflight = 0; + d_envelope = 0; + d_removed = 0; + d_expired = 0; + d_updated = 0; + + for (i = 0; i < count; i++) { + switch(types[i]) { + case SCHED_REMOVE: + log_debug("debug: scheduler: evp:%016" PRIx64 + " removed", evpids[i]); + m_create(p_queue, IMSG_SCHED_ENVELOPE_REMOVE, 0, 0, -1); + m_add_evpid(p_queue, evpids[i]); + m_close(p_queue); + d_envelope += 1; + d_removed += 1; + d_inflight += 1; + break; + + case SCHED_EXPIRE: + log_debug("debug: scheduler: evp:%016" PRIx64 + " expired", evpids[i]); + m_create(p_queue, IMSG_SCHED_ENVELOPE_EXPIRE, 0, 0, -1); + m_add_evpid(p_queue, evpids[i]); + m_close(p_queue); + d_envelope += 1; + d_expired += 1; + d_inflight += 1; + break; + + case SCHED_UPDATE: + log_debug("debug: scheduler: evp:%016" PRIx64 + " scheduled (update)", evpids[i]); + d_updated += 1; + break; + + case SCHED_BOUNCE: + log_debug("debug: scheduler: evp:%016" PRIx64 + " scheduled (bounce)", evpids[i]); + m_create(p_queue, IMSG_SCHED_ENVELOPE_INJECT, 0, 0, -1); + m_add_evpid(p_queue, evpids[i]); + m_close(p_queue); + d_inflight += 1; + break; + + case SCHED_MDA: + log_debug("debug: scheduler: evp:%016" PRIx64 + " scheduled (mda)", evpids[i]); + m_create(p_queue, IMSG_SCHED_ENVELOPE_DELIVER, 0, 0, -1); + m_add_evpid(p_queue, evpids[i]); + m_close(p_queue); + d_inflight += 1; + break; + + case SCHED_MTA: + log_debug("debug: scheduler: evp:%016" PRIx64 + " scheduled (mta)", evpids[i]); + m_create(p_queue, IMSG_SCHED_ENVELOPE_TRANSFER, 0, 0, -1); + m_add_evpid(p_queue, evpids[i]); + m_close(p_queue); + d_inflight += 1; + break; + } } - ninflight += batch->evpcount; - stat_increment("scheduler.envelope.inflight", batch->evpcount); -} + stat_decrement("scheduler.envelope", d_envelope); + stat_increment("scheduler.envelope.inflight", d_inflight); + stat_increment("scheduler.envelope.expired", d_expired); + stat_increment("scheduler.envelope.removed", d_removed); + stat_increment("scheduler.envelope.updated", d_updated); -static void -scheduler_process_mta(struct scheduler_batch *batch) -{ - size_t i; - - for (i = 0; i < batch->evpcount; i++) { - log_debug("debug: scheduler: evp:%016" PRIx64 - " scheduled (mta)", batch->evpids[i]); - m_create(p_queue, IMSG_SCHED_ENVELOPE_TRANSFER, 0, 0, -1); - m_add_evpid(p_queue, batch->evpids[i]); - m_close(p_queue); - } + ninflight += d_inflight; - ninflight += batch->evpcount; - stat_increment("scheduler.envelope.inflight", batch->evpcount); + tv.tv_sec = 0; + tv.tv_usec = 0; + evtimer_add(&ev, &tv); } diff --git a/usr.sbin/smtpd/scheduler_api.c b/usr.sbin/smtpd/scheduler_api.c index bc3f076b390..e692d78b634 100644 --- a/usr.sbin/smtpd/scheduler_api.c +++ b/usr.sbin/smtpd/scheduler_api.c @@ -1,4 +1,4 @@ -/* $OpenBSD: scheduler_api.c,v 1.5 2014/02/04 14:56:03 eric Exp $ */ +/* $OpenBSD: scheduler_api.c,v 1.6 2014/07/10 14:45:02 eric Exp $ */ /* * Copyright (c) 2013 Eric Faurot <eric@openbsd.org> @@ -37,7 +37,7 @@ static int (*handler_update)(struct scheduler_info *); static int (*handler_delete)(uint64_t); static int (*handler_hold)(uint64_t, uint64_t); static int (*handler_release)(int, uint64_t, int); -static int (*handler_batch)(int, struct scheduler_batch *); +static int (*handler_batch)(int, int *, size_t *, uint64_t *, int *); static size_t (*handler_messages)(uint32_t, uint32_t *, size_t); static size_t (*handler_envelopes)(uint64_t, struct evpstate *, size_t); static int (*handler_schedule)(uint64_t); @@ -52,8 +52,8 @@ static struct imsg imsg; static size_t rlen; static char *rdata; static struct ibuf *buf; -static char *rootpath = PATH_CHROOT; -static char *user = SMTPD_USER; +static const char *rootpath = PATH_CHROOT; +static const char *user = SMTPD_USER; static void scheduler_msg_get(void *dst, size_t len) @@ -108,13 +108,13 @@ scheduler_msg_close(void) static void scheduler_msg_dispatch(void) { - size_t n, sz; + size_t n, sz, count; struct evpstate evpstates[MAX_BATCH_SIZE]; uint64_t evpid, evpids[MAX_BATCH_SIZE], u64; uint32_t msgids[MAX_BATCH_SIZE], version, msgid; struct scheduler_info info; - struct scheduler_batch batch; - int typemask, r, type; + int typemask, r, type, types[MAX_BATCH_SIZE]; + int delay; switch (imsg.hdr.type) { case PROC_SCHEDULER_INIT: @@ -211,17 +211,20 @@ scheduler_msg_dispatch(void) case PROC_SCHEDULER_BATCH: log_debug("scheduler-api: PROC_SCHEDULER_BATCH"); scheduler_msg_get(&typemask, sizeof(typemask)); - scheduler_msg_get(&batch.evpcount, sizeof(batch.evpcount)); + scheduler_msg_get(&count, sizeof(count)); scheduler_msg_end(); - if (batch.evpcount > MAX_BATCH_SIZE) - batch.evpcount = MAX_BATCH_SIZE; - batch.evpids = evpids; + if (count > MAX_BATCH_SIZE) + count = MAX_BATCH_SIZE; - handler_batch(typemask, &batch); - - scheduler_msg_add(&batch, sizeof(batch)); - scheduler_msg_add(evpids, sizeof(*evpids) * batch.evpcount); + r = handler_batch(typemask, &delay, &count, evpids, types); + scheduler_msg_add(&r, sizeof(r)); + scheduler_msg_add(&delay, sizeof(delay)); + scheduler_msg_add(&count, sizeof(count)); + if (r > 0) { + scheduler_msg_add(evpids, sizeof(*evpids) * count); + scheduler_msg_add(types, sizeof(*types) * count); + } scheduler_msg_close(); break; @@ -338,7 +341,7 @@ scheduler_api_on_delete(int(*cb)(uint64_t)) } void -scheduler_api_on_batch(int(*cb)(int, struct scheduler_batch *)) +scheduler_api_on_batch(int(*cb)(int, int *, size_t *, uint64_t *, int *)) { handler_batch = cb; } @@ -391,34 +394,55 @@ scheduler_api_on_release(int(*cb)(int, uint64_t, int)) handler_release = cb; } +void +scheduler_api_no_chroot(void) +{ + rootpath = NULL; +} + +void +scheduler_api_set_chroot(const char *path) +{ + rootpath = path; +} + +void +scheduler_api_set_user(const char *username) +{ + user = username; +} + int scheduler_api_dispatch(void) { - struct passwd *pw; + struct passwd *pw = NULL; ssize_t n; - pw = getpwnam(user); - if (pw == NULL) { - log_warn("scheduler-api: getpwnam"); - fatalx("scheduler-api: exiting"); + if (user) { + pw = getpwnam(user); + if (pw == NULL) { + log_warn("queue-api: getpwnam"); + fatalx("queue-api: exiting"); + } } if (rootpath) { if (chroot(rootpath) == -1) { - log_warn("scheduler-api: chroot"); - fatalx("scheduler-api: exiting"); + log_warn("queue-api: chroot"); + fatalx("queue-api: exiting"); } if (chdir("/") == -1) { - log_warn("scheduler-api: chdir"); - fatalx("scheduler-api: exiting"); + log_warn("queue-api: chdir"); + fatalx("queue-api: exiting"); } } - if (setgroups(1, &pw->pw_gid) || + if (pw && + (setgroups(1, &pw->pw_gid) || setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) || - setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid)) { - log_warn("scheduler-api: cannot drop privileges"); - fatalx("scheduler-api: exiting"); + setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid))) { + log_warn("queue-api: cannot drop privileges"); + fatalx("queue-api: exiting"); } imsg_init(&ibuf, 0); diff --git a/usr.sbin/smtpd/scheduler_backend.c b/usr.sbin/smtpd/scheduler_backend.c index 5ee9d5b30a1..1f3c9e26c11 100644 --- a/usr.sbin/smtpd/scheduler_backend.c +++ b/usr.sbin/smtpd/scheduler_backend.c @@ -1,4 +1,4 @@ -/* $OpenBSD: scheduler_backend.c,v 1.13 2014/02/04 14:56:03 eric Exp $ */ +/* $OpenBSD: scheduler_backend.c,v 1.14 2014/07/10 14:45:02 eric Exp $ */ /* * Copyright (c) 2012 Gilles Chehade <gilles@poolp.org> @@ -42,12 +42,10 @@ scheduler_backend_lookup(const char *name) { if (!strcmp(name, "null")) return &scheduler_backend_null; - if (!strcmp(name, "proc")) - return &scheduler_backend_proc; if (!strcmp(name, "ramqueue")) return &scheduler_backend_ramqueue; - return NULL; + return &scheduler_backend_proc; } void @@ -62,20 +60,3 @@ scheduler_info(struct scheduler_info *sched, struct envelope *evp) sched->lastbounce = evp->lastbounce; sched->nexttry = 0; } - -time_t -scheduler_compute_schedule(struct scheduler_info *sched) -{ - time_t delay; - uint32_t retry; - - if (sched->type == D_MTA) - delay = 800; - else - delay = 10; - - retry = sched->retry; - delay = ((delay * retry) * retry) / 2; - - return (sched->creation + delay); -} diff --git a/usr.sbin/smtpd/scheduler_null.c b/usr.sbin/smtpd/scheduler_null.c index 61dd9e2ebf8..e1abe954fd2 100644 --- a/usr.sbin/smtpd/scheduler_null.c +++ b/usr.sbin/smtpd/scheduler_null.c @@ -1,4 +1,4 @@ -/* $OpenBSD: scheduler_null.c,v 1.7 2014/02/04 14:56:03 eric Exp $ */ +/* $OpenBSD: scheduler_null.c,v 1.8 2014/07/10 14:45:02 eric Exp $ */ /* * Copyright (c) 2012 Eric Faurot <eric@openbsd.org> @@ -30,7 +30,7 @@ #include "smtpd.h" -static int scheduler_null_init(void); +static int scheduler_null_init(const char *); static int scheduler_null_insert(struct scheduler_info *); static size_t scheduler_null_commit(uint32_t); static size_t scheduler_null_rollback(uint32_t); @@ -38,7 +38,7 @@ static int scheduler_null_update(struct scheduler_info *); static int scheduler_null_delete(uint64_t); static int scheduler_null_hold(uint64_t, uint64_t); static int scheduler_null_release(int, uint64_t, int); -static int scheduler_null_batch(int, struct scheduler_batch *); +static int scheduler_null_batch(int, int*, size_t*, uint64_t*, int*); static size_t scheduler_null_messages(uint32_t, uint32_t *, size_t); static size_t scheduler_null_envelopes(uint64_t, struct evpstate *, size_t); static int scheduler_null_schedule(uint64_t); @@ -69,7 +69,7 @@ struct scheduler_backend scheduler_backend_null = { }; static int -scheduler_null_init(void) +scheduler_null_init(const char *arg) { return (1); } @@ -117,10 +117,9 @@ scheduler_null_release(int type, uint64_t holdq, int n) } static int -scheduler_null_batch(int typemask, struct scheduler_batch *ret) +scheduler_null_batch(int typemask, int *delay, size_t *count, uint64_t *evpids, int *types) { - ret->type = SCHED_NONE; - ret->evpcount = 0; + *delay = 0; return (0); } diff --git a/usr.sbin/smtpd/scheduler_proc.c b/usr.sbin/smtpd/scheduler_proc.c index f90bcdde166..5dcc8ddc5d8 100644 --- a/usr.sbin/smtpd/scheduler_proc.c +++ b/usr.sbin/smtpd/scheduler_proc.c @@ -1,4 +1,4 @@ -/* $OpenBSD: scheduler_proc.c,v 1.4 2014/02/04 14:56:03 eric Exp $ */ +/* $OpenBSD: scheduler_proc.c,v 1.5 2014/07/10 14:45:02 eric Exp $ */ /* * Copyright (c) 2013 Eric Faurot <eric@openbsd.org> @@ -23,8 +23,6 @@ #include <sys/socket.h> #include <ctype.h> -#include <err.h> -#include <errno.h> #include <event.h> #include <fcntl.h> #include <imsg.h> @@ -36,14 +34,11 @@ #include "smtpd.h" #include "log.h" -static pid_t pid; static struct imsgbuf ibuf; static struct imsg imsg; static size_t rlen; static char *rdata; -static const char *execpath = "/usr/libexec/smtpd/backend-scheduler"; - static void scheduler_proc_call(void) { @@ -112,53 +107,25 @@ scheduler_proc_end(void) */ static int -scheduler_proc_init(void) +scheduler_proc_init(const char *conf) { - int sp[2], r; + int fd, r; uint32_t version; - errno = 0; - - if (socketpair(AF_UNIX, SOCK_STREAM, PF_UNSPEC, sp) < 0) { - log_warn("warn: scheduler-proc: socketpair"); - goto err; - } - - if ((pid = fork()) == -1) { - log_warn("warn: scheduler-proc: fork"); - goto err; - } - - if (pid == 0) { - /* child process */ - dup2(sp[0], STDIN_FILENO); - if (closefrom(STDERR_FILENO + 1) < 0) - exit(1); - - execl(execpath, "scheduler-proc", NULL); - err(1, "execl"); - } + fd = fork_proc_backend("scheduler", conf, "scheduler-proc"); + if (fd == -1) + fatalx("scheduler-proc: exiting"); - /* parent process */ - close(sp[0]); - imsg_init(&ibuf, sp[1]); + imsg_init(&ibuf, fd); version = PROC_SCHEDULER_API_VERSION; imsg_compose(&ibuf, PROC_SCHEDULER_INIT, 0, 0, -1, &version, sizeof(version)); - scheduler_proc_call(); scheduler_proc_read(&r, sizeof(r)); scheduler_proc_end(); - return (r); - -err: - close(sp[0]); - close(sp[1]); - fatalx("scheduler-proc: exiting"); - - return (0); + return (1); } static int @@ -301,37 +268,34 @@ scheduler_proc_release(int type, uint64_t holdq, int n) } static int -scheduler_proc_batch(int typemask, struct scheduler_batch *ret) +scheduler_proc_batch(int typemask, int *delay, size_t *count, uint64_t *evpids, int *types) { struct ibuf *buf; - uint64_t *evpids; + int r; log_debug("debug: scheduler-proc: PROC_SCHEDULER_BATCH"); buf = imsg_create(&ibuf, PROC_SCHEDULER_BATCH, 0, 0, - sizeof(typemask) + sizeof(ret->evpcount)); + sizeof(typemask) + sizeof(*count)); if (buf == NULL) return (-1); if (imsg_add(buf, &typemask, sizeof(typemask)) == -1) return (-1); - if (imsg_add(buf, &ret->evpcount, sizeof(ret->evpcount)) == -1) + if (imsg_add(buf, count, sizeof(*count)) == -1) return (-1); imsg_close(&ibuf, buf); - evpids = ret->evpids; - scheduler_proc_call(); - - scheduler_proc_read(ret, sizeof(*ret)); - scheduler_proc_read(evpids, sizeof(*evpids) * ret->evpcount); + scheduler_proc_read(&r, sizeof(r)); + scheduler_proc_read(delay, sizeof(*delay)); + scheduler_proc_read(count, sizeof(*count)); + if (r > 0) { + scheduler_proc_read(evpids, sizeof(*evpids) * (*count)); + scheduler_proc_read(types, sizeof(*types) * (*count)); + } scheduler_proc_end(); - ret->evpids = evpids; - - if (ret->type == SCHED_NONE) - return (0); - - return (1); + return (r); } static size_t diff --git a/usr.sbin/smtpd/scheduler_ramqueue.c b/usr.sbin/smtpd/scheduler_ramqueue.c index ffb05415a9c..229e74c80d1 100644 --- a/usr.sbin/smtpd/scheduler_ramqueue.c +++ b/usr.sbin/smtpd/scheduler_ramqueue.c @@ -1,4 +1,4 @@ -/* $OpenBSD: scheduler_ramqueue.c,v 1.39 2014/07/08 07:59:31 sobrado Exp $ */ +/* $OpenBSD: scheduler_ramqueue.c,v 1.40 2014/07/10 14:45:02 eric Exp $ */ /* * Copyright (c) 2012 Gilles Chehade <gilles@poolp.org> @@ -61,6 +61,7 @@ struct rq_envelope { #define RQ_ENVELOPE_REMOVED 0x02 #define RQ_ENVELOPE_SUSPEND 0x04 #define RQ_ENVELOPE_UPDATE 0x08 +#define RQ_ENVELOPE_OVERFLOW 0x10 uint8_t flags; time_t ctime; @@ -75,6 +76,7 @@ struct rq_envelope { struct rq_holdq { struct evplist q; + size_t count; }; struct rq_queue { @@ -96,7 +98,7 @@ struct rq_queue { static int rq_envelope_cmp(struct rq_envelope *, struct rq_envelope *); SPLAY_PROTOTYPE(prioqtree, rq_envelope, t_entry, rq_envelope_cmp); -static int scheduler_ram_init(void); +static int scheduler_ram_init(const char *); static int scheduler_ram_insert(struct scheduler_info *); static size_t scheduler_ram_commit(uint32_t); static size_t scheduler_ram_rollback(uint32_t); @@ -104,7 +106,7 @@ static int scheduler_ram_update(struct scheduler_info *); static int scheduler_ram_delete(uint64_t); static int scheduler_ram_hold(uint64_t, uint64_t); static int scheduler_ram_release(int, uint64_t, int); -static int scheduler_ram_batch(int, struct scheduler_batch *); +static int scheduler_ram_batch(int, int *, size_t *, uint64_t *, int *); static size_t scheduler_ram_messages(uint32_t, uint32_t *, size_t); static size_t scheduler_ram_envelopes(uint64_t, struct evpstate *, size_t); static int scheduler_ram_schedule(uint64_t); @@ -154,8 +156,30 @@ static struct tree holdqs[3]; /* delivery type */ static time_t currtime; +#define BACKOFF_TRANSFER 400 +#define BACKOFF_DELIVERY 10 +#define BACKOFF_OVERFLOW 3 + +static time_t +scheduler_backoff(time_t t0, time_t base, uint32_t step) +{ + return (t0 + base * step * step); +} + +static time_t +scheduler_next(time_t t0, time_t base, uint32_t step) +{ + time_t t; + + /* XXX be more efficient */ + while ((t = scheduler_backoff(t0, base, step)) <= currtime) + step++; + + return (t); +} + static int -scheduler_ram_init(void) +scheduler_ram_init(const char *arg) { rq_queue_init(&ramqueue); tree_init(&updates); @@ -169,10 +193,10 @@ scheduler_ram_init(void) static int scheduler_ram_insert(struct scheduler_info *si) { - uint32_t msgid; struct rq_queue *update; struct rq_message *message; struct rq_envelope *envelope; + uint32_t msgid; currtime = time(NULL); @@ -202,7 +226,8 @@ scheduler_ram_insert(struct scheduler_info *si) envelope->message = message; envelope->ctime = si->creation; envelope->expire = si->creation + si->expire; - envelope->sched = scheduler_compute_schedule(si); + envelope->sched = scheduler_backoff(si->creation, + (si->type == D_MTA) ? BACKOFF_TRANSFER : BACKOFF_DELIVERY, si->retry); tree_xset(&message->envelopes, envelope->evpid, envelope); update->evpcount++; @@ -297,8 +322,8 @@ scheduler_ram_update(struct scheduler_info *si) return (1); } - while ((evp->sched = scheduler_compute_schedule(si)) <= currtime) - si->retry += 1; + evp->sched = scheduler_next(evp->ctime, + (si->type == D_MTA) ? BACKOFF_TRANSFER : BACKOFF_DELIVERY, si->retry); evp->state = RQ_EVPSTATE_PENDING; if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) @@ -333,6 +358,8 @@ scheduler_ram_delete(uint64_t evpid) return (1); } +#define HOLDQ_MAXSIZE 1000 + static int scheduler_ram_hold(uint64_t evpid, uint64_t holdq) { @@ -364,6 +391,17 @@ scheduler_ram_hold(uint64_t evpid, uint64_t holdq) hq = xcalloc(1, sizeof(*hq), "scheduler_hold"); TAILQ_INIT(&hq->q); tree_xset(&holdqs[evp->type], holdq, hq); + stat_increment("scheduler.ramqueue.holdq", 1); + } + + /* If the holdq is full, just "tempfail" the envelope */ + if (hq->count >= HOLDQ_MAXSIZE) { + evp->state = RQ_EVPSTATE_PENDING; + evp->flags |= RQ_ENVELOPE_UPDATE; + evp->flags |= RQ_ENVELOPE_OVERFLOW; + sorted_insert(&ramqueue, evp); + stat_increment("scheduler.ramqueue.hold-overflow", 1); + return (0); } evp->state = RQ_EVPSTATE_HELD; @@ -375,6 +413,7 @@ scheduler_ram_hold(uint64_t evpid, uint64_t holdq) * current element. */ TAILQ_INSERT_HEAD(&hq->q, evp, entry); + hq->count += 1; stat_increment("scheduler.ramqueue.hold", 1); return (1); @@ -406,6 +445,7 @@ scheduler_ram_release(int type, uint64_t holdq, int n) break; TAILQ_REMOVE(&hq->q, evp, entry); + hq->count -= 1; evp->holdq = 0; /* When released, all envelopes are put in the pending queue @@ -421,6 +461,7 @@ scheduler_ram_release(int type, uint64_t holdq, int n) if (TAILQ_EMPTY(&hq->q)) { tree_xpop(&holdqs[type], holdq); free(hq); + stat_decrement("scheduler.ramqueue.holdq", 1); } stat_decrement("scheduler.ramqueue.hold", i); @@ -428,12 +469,11 @@ scheduler_ram_release(int type, uint64_t holdq, int n) } static int -scheduler_ram_batch(int typemask, struct scheduler_batch *ret) +scheduler_ram_batch(int mask, int *delay, size_t *count, uint64_t *evpids, int *types) { - struct evplist *q; struct rq_envelope *evp; - size_t n; - int retry; + size_t i, n; + time_t t; currtime = time(NULL); @@ -441,98 +481,115 @@ scheduler_ram_batch(int typemask, struct scheduler_batch *ret) if (verbose & TRACE_SCHEDULER) rq_queue_dump(&ramqueue, "scheduler_ram_batch()"); - if (typemask & SCHED_REMOVE && TAILQ_FIRST(&ramqueue.q_removed)) { - q = &ramqueue.q_removed; - ret->type = SCHED_REMOVE; - } - else if (typemask & SCHED_EXPIRE && TAILQ_FIRST(&ramqueue.q_expired)) { - q = &ramqueue.q_expired; - ret->type = SCHED_EXPIRE; - } - else if (typemask & SCHED_UPDATE && TAILQ_FIRST(&ramqueue.q_update)) { - q = &ramqueue.q_update; - ret->type = SCHED_UPDATE; - } - else if (typemask & SCHED_BOUNCE && TAILQ_FIRST(&ramqueue.q_bounce)) { - q = &ramqueue.q_bounce; - ret->type = SCHED_BOUNCE; - } - else if (typemask & SCHED_MDA && TAILQ_FIRST(&ramqueue.q_mda)) { - q = &ramqueue.q_mda; - ret->type = SCHED_MDA; - } - else if (typemask & SCHED_MTA && TAILQ_FIRST(&ramqueue.q_mta)) { - q = &ramqueue.q_mta; - ret->type = SCHED_MTA; - } - else if ((evp = TAILQ_FIRST(&ramqueue.q_pending))) { - ret->type = SCHED_DELAY; - ret->evpcount = 0; - if (evp->sched < evp->expire) - ret->delay = evp->sched - currtime; - else - ret->delay = evp->expire - currtime; - goto done; - } - else { - ret->type = SCHED_NONE; - ret->evpcount = 0; - goto done; - } - - for (n = 0; (evp = TAILQ_FIRST(q)) && n < ret->evpcount; n++) { + i = 0; + n = 0; - TAILQ_REMOVE(q, evp, entry); + for (;;) { - /* consistency check */ - if (evp->state != RQ_EVPSTATE_SCHEDULED) - errx(1, "evp:%016" PRIx64 " not scheduled", evp->evpid); + if (mask & SCHED_REMOVE && (evp = TAILQ_FIRST(&ramqueue.q_removed))) { + TAILQ_REMOVE(&ramqueue.q_removed, evp, entry); + types[i] = SCHED_REMOVE; + evpids[i] = evp->evpid; + rq_envelope_delete(&ramqueue, evp); - ret->evpids[n] = evp->evpid; + if (++i == *count) + break; + } - if (ret->type == SCHED_REMOVE || ret->type == SCHED_EXPIRE) + if (mask & SCHED_EXPIRE && (evp = TAILQ_FIRST(&ramqueue.q_expired))) { + TAILQ_REMOVE(&ramqueue.q_expired, evp, entry); + types[i] = SCHED_EXPIRE; + evpids[i] = evp->evpid; rq_envelope_delete(&ramqueue, evp); - else if (ret->type == SCHED_UPDATE) { - evp->flags &= ~RQ_ENVELOPE_UPDATE; + if (++i == *count) + break; + } + + if (mask & SCHED_UPDATE && (evp = TAILQ_FIRST(&ramqueue.q_update))) { + TAILQ_REMOVE(&ramqueue.q_update, evp, entry); + types[i] = SCHED_UPDATE; + evpids[i] = evp->evpid; - /* XXX we can't really use scheduler_compute_schedule */ - retry = 0; - while ((evp->sched = evp->ctime + 800 * retry * retry / 2) <= currtime) - retry += 1; + if (evp->flags & RQ_ENVELOPE_OVERFLOW) + t = BACKOFF_OVERFLOW; + else if (evp->type == D_MTA) + t = BACKOFF_TRANSFER; + else + t = BACKOFF_DELIVERY; + evp->sched = scheduler_next(evp->ctime, t, 0); + evp->flags &= ~(RQ_ENVELOPE_UPDATE|RQ_ENVELOPE_OVERFLOW); evp->state = RQ_EVPSTATE_PENDING; if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) sorted_insert(&ramqueue, evp); + + if (++i == *count) + break; } - else { + + if (mask & SCHED_BOUNCE && (evp = TAILQ_FIRST(&ramqueue.q_bounce))) { + TAILQ_REMOVE(&ramqueue.q_bounce, evp, entry); + types[i] = SCHED_BOUNCE; + evpids[i] = evp->evpid; + TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry); evp->state = RQ_EVPSTATE_INFLIGHT; evp->t_inflight = currtime; + + if (++i == *count) + break; } + + if (mask & SCHED_MDA && (evp = TAILQ_FIRST(&ramqueue.q_mda))) { + TAILQ_REMOVE(&ramqueue.q_mda, evp, entry); + types[i] = SCHED_MDA; + evpids[i] = evp->evpid; + + TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry); + evp->state = RQ_EVPSTATE_INFLIGHT; + evp->t_inflight = currtime; + + if (++i == *count) + break; + } + + if (mask & SCHED_MTA && (evp = TAILQ_FIRST(&ramqueue.q_mta))) { + TAILQ_REMOVE(&ramqueue.q_mta, evp, entry); + types[i] = SCHED_MTA; + evpids[i] = evp->evpid; + + TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry); + evp->state = RQ_EVPSTATE_INFLIGHT; + evp->t_inflight = currtime; + + if (++i == *count) + break; + } + + /* nothing seen this round */ + if (i == n) + break; + + n = i; + } + + if (i) { + *count = i; + return (1); + } + + if ((evp = TAILQ_FIRST(&ramqueue.q_pending))) { + if (evp->sched < evp->expire) + t = evp->sched; + else + t = evp->expire; + *delay = (t < currtime) ? 0 : (t - currtime); } + else + *delay = -1; - ret->evpcount = n; - - done: - - ret->mask = 0; - if (TAILQ_FIRST(&ramqueue.q_removed)) - ret->mask |= SCHED_REMOVE; - if (TAILQ_FIRST(&ramqueue.q_expired)) - ret->mask |= SCHED_EXPIRE; - if (TAILQ_FIRST(&ramqueue.q_update)) - ret->mask |= SCHED_UPDATE; - if (TAILQ_FIRST(&ramqueue.q_bounce)) - ret->mask |= SCHED_BOUNCE; - if (TAILQ_FIRST(&ramqueue.q_mda)) - ret->mask |= SCHED_MDA; - if (TAILQ_FIRST(&ramqueue.q_mta)) - ret->mask |= SCHED_MTA; - if (TAILQ_FIRST(&ramqueue.q_pending)) - ret->mask |= SCHED_DELAY; - - return ((ret->type == SCHED_NONE) ? 0 : 1); + return (0); } static size_t @@ -805,15 +862,22 @@ rq_queue_merge(struct rq_queue *rq, struct rq_queue *update) rq->evpcount += update->evpcount; } +#define SCHEDULEMAX 1024 + static void rq_queue_schedule(struct rq_queue *rq) { struct rq_envelope *evp; + size_t n; + n = 0; while ((evp = TAILQ_FIRST(&rq->q_pending))) { if (evp->sched > currtime && evp->expire > currtime) break; + if (n == SCHEDULEMAX) + break; + if (evp->state != RQ_EVPSTATE_PENDING) errx(1, "evp:%016" PRIx64 " flags=0x%x", evp->evpid, evp->flags); @@ -828,6 +892,7 @@ rq_queue_schedule(struct rq_queue *rq) continue; } rq_envelope_schedule(rq, evp); + n += 1; } } @@ -888,6 +953,7 @@ rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp) if (evp->state == RQ_EVPSTATE_HELD) { hq = tree_xget(&holdqs[evp->type], evp->holdq); TAILQ_REMOVE(&hq->q, evp, entry); + hq->count -= 1; if (TAILQ_EMPTY(&hq->q)) { tree_xpop(&holdqs[evp->type], evp->holdq); free(hq); @@ -924,6 +990,7 @@ rq_envelope_remove(struct rq_queue *rq, struct rq_envelope *evp) if (evp->state == RQ_EVPSTATE_HELD) { hq = tree_xget(&holdqs[evp->type], evp->holdq); TAILQ_REMOVE(&hq->q, evp, entry); + hq->count -= 1; if (TAILQ_EMPTY(&hq->q)) { tree_xpop(&holdqs[evp->type], evp->holdq); free(hq); @@ -958,6 +1025,7 @@ rq_envelope_suspend(struct rq_queue *rq, struct rq_envelope *evp) if (evp->state == RQ_EVPSTATE_HELD) { hq = tree_xget(&holdqs[evp->type], evp->holdq); TAILQ_REMOVE(&hq->q, evp, entry); + hq->count -= 1; if (TAILQ_EMPTY(&hq->q)) { tree_xpop(&holdqs[evp->type], evp->holdq); free(hq); diff --git a/usr.sbin/smtpd/smtpd-api.h b/usr.sbin/smtpd/smtpd-api.h index 5b3629caf8a..05309658d18 100644 --- a/usr.sbin/smtpd/smtpd-api.h +++ b/usr.sbin/smtpd/smtpd-api.h @@ -1,4 +1,4 @@ -/* $OpenBSD: smtpd-api.h,v 1.19 2014/07/08 15:45:32 eric Exp $ */ +/* $OpenBSD: smtpd-api.h,v 1.20 2014/07/10 14:45:02 eric Exp $ */ /* * Copyright (c) 2013 Eric Faurot <eric@openbsd.org> @@ -124,7 +124,6 @@ enum { #define PROC_SCHEDULER_API_VERSION 1 struct scheduler_info; -struct scheduler_batch; enum { PROC_SCHEDULER_OK, @@ -183,22 +182,12 @@ struct scheduler_info { time_t nexttry; }; -#define SCHED_NONE 0x00 -#define SCHED_DELAY 0x01 -#define SCHED_REMOVE 0x02 -#define SCHED_EXPIRE 0x04 -#define SCHED_UPDATE 0x08 -#define SCHED_BOUNCE 0x10 -#define SCHED_MDA 0x20 -#define SCHED_MTA 0x40 - -struct scheduler_batch { - int mask; - int type; - time_t delay; - size_t evpcount; - uint64_t *evpids; -}; +#define SCHED_REMOVE 0x01 +#define SCHED_EXPIRE 0x02 +#define SCHED_UPDATE 0x04 +#define SCHED_BOUNCE 0x08 +#define SCHED_MDA 0x10 +#define SCHED_MTA 0x20 #define PROC_TABLE_API_VERSION 1 @@ -388,13 +377,16 @@ void scheduler_api_on_update(int(*)(struct scheduler_info *)); void scheduler_api_on_delete(int(*)(uint64_t)); void scheduler_api_on_hold(int(*)(uint64_t, uint64_t)); void scheduler_api_on_release(int(*)(int, uint64_t, int)); -void scheduler_api_on_batch(int(*)(int, struct scheduler_batch *)); +void scheduler_api_on_batch(int(*)(int, int *, size_t *, uint64_t *, int *)); void scheduler_api_on_messages(size_t(*)(uint32_t, uint32_t *, size_t)); void scheduler_api_on_envelopes(size_t(*)(uint64_t, struct evpstate *, size_t)); void scheduler_api_on_schedule(int(*)(uint64_t)); void scheduler_api_on_remove(int(*)(uint64_t)); void scheduler_api_on_suspend(int(*)(uint64_t)); void scheduler_api_on_resume(int(*)(uint64_t)); +void scheduler_api_no_chroot(void); +void scheduler_api_set_chroot(const char *); +void scheduler_api_set_user(const char *); int scheduler_api_dispatch(void); /* table */ diff --git a/usr.sbin/smtpd/smtpd.c b/usr.sbin/smtpd/smtpd.c index 3a11a2a49eb..09ada703b7d 100644 --- a/usr.sbin/smtpd/smtpd.c +++ b/usr.sbin/smtpd/smtpd.c @@ -1,4 +1,4 @@ -/* $OpenBSD: smtpd.c,v 1.232 2014/07/09 09:53:37 eric Exp $ */ +/* $OpenBSD: smtpd.c,v 1.233 2014/07/10 14:45:02 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org> @@ -1435,6 +1435,7 @@ imsg_to_str(int type) CASE(IMSG_QUEUE_DELIVERY_TEMPFAIL); CASE(IMSG_QUEUE_DELIVERY_PERMFAIL); CASE(IMSG_QUEUE_DELIVERY_LOOP); + CASE(IMSG_QUEUE_ENVELOPE_ACK); CASE(IMSG_QUEUE_ENVELOPE_COMMIT); CASE(IMSG_QUEUE_ENVELOPE_REMOVE); CASE(IMSG_QUEUE_ENVELOPE_SCHEDULE); diff --git a/usr.sbin/smtpd/smtpd.h b/usr.sbin/smtpd/smtpd.h index 43c3c8f325d..0a8e282a68a 100644 --- a/usr.sbin/smtpd/smtpd.h +++ b/usr.sbin/smtpd/smtpd.h @@ -1,4 +1,4 @@ -/* $OpenBSD: smtpd.h,v 1.467 2014/07/09 12:44:54 eric Exp $ */ +/* $OpenBSD: smtpd.h,v 1.468 2014/07/10 14:45:02 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org> @@ -150,7 +150,7 @@ union lookup { * Bump IMSG_VERSION whenever a change is made to enum imsg_type. * This will ensure that we can never use a wrong version of smtpctl with smtpd. */ -#define IMSG_VERSION 10 +#define IMSG_VERSION 11 enum imsg_type { IMSG_NONE, @@ -209,6 +209,7 @@ enum imsg_type { IMSG_QUEUE_DELIVERY_TEMPFAIL, IMSG_QUEUE_DELIVERY_PERMFAIL, IMSG_QUEUE_DELIVERY_LOOP, + IMSG_QUEUE_ENVELOPE_ACK, IMSG_QUEUE_ENVELOPE_COMMIT, IMSG_QUEUE_ENVELOPE_REMOVE, IMSG_QUEUE_ENVELOPE_SCHEDULE, @@ -864,7 +865,7 @@ struct delivery_backend { }; struct scheduler_backend { - int (*init)(void); + int (*init)(const char *); int (*insert)(struct scheduler_info *); size_t (*commit)(uint32_t); @@ -875,7 +876,7 @@ struct scheduler_backend { int (*hold)(uint64_t, uint64_t); int (*release)(int, uint64_t, int); - int (*batch)(int, struct scheduler_batch *); + int (*batch)(int, int*, size_t*, uint64_t*, int*); size_t (*messages)(uint32_t, uint32_t *, size_t); size_t (*envelopes)(uint64_t, struct evpstate *, size_t); @@ -1285,7 +1286,6 @@ pid_t scheduler(void); /* scheduler_bakend.c */ struct scheduler_backend *scheduler_backend_lookup(const char *); void scheduler_info(struct scheduler_info *, struct envelope *); -time_t scheduler_compute_schedule(struct scheduler_info *); /* pony.c */ |
