diff options
author | 2012-09-11 08:37:52 +0000 | |
---|---|---|
committer | 2012-09-11 08:37:52 +0000 | |
commit | 010bda1b2945eae83dfcf0832c80e8f1456ac1b8 (patch) | |
tree | f73b3c8028185526dfc37af38ba54f66c3ae27cc | |
parent | Check that the host supports GET_SPEED as well as GET_VERSION before deciding (diff) | |
download | wireguard-openbsd-010bda1b2945eae83dfcf0832c80e8f1456ac1b8.tar.xz wireguard-openbsd-010bda1b2945eae83dfcf0832c80e8f1456ac1b8.zip |
Rework the scheduler internals. Fix some scheduling loop issues and
handle envelope scheduling/expiration better.
ok gilles@
-rw-r--r-- | usr.sbin/smtpd/scheduler.c | 7 | ||||
-rw-r--r-- | usr.sbin/smtpd/scheduler_ramqueue.c | 548 | ||||
-rw-r--r-- | usr.sbin/smtpd/smtpd.h | 4 |
3 files changed, 255 insertions, 304 deletions
diff --git a/usr.sbin/smtpd/scheduler.c b/usr.sbin/smtpd/scheduler.c index 541edbcf090..ffc556a2b22 100644 --- a/usr.sbin/smtpd/scheduler.c +++ b/usr.sbin/smtpd/scheduler.c @@ -1,4 +1,4 @@ -/* $OpenBSD: scheduler.c,v 1.19 2012/08/25 22:03:26 gilles Exp $ */ +/* $OpenBSD: scheduler.c,v 1.20 2012/09/11 08:37:52 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> @@ -302,7 +302,7 @@ scheduler_timeout(int fd, short event, void *p) if (!(env->sc_flags & SMTPD_MTA_PAUSED)) typemask |= SCHED_MTA; - backend->batch(typemask, time(NULL), &batch); + backend->batch(typemask, &batch); switch (batch.type) { case SCHED_NONE: log_trace(TRACE_SCHEDULER, "scheduler: sleeping"); @@ -311,7 +311,7 @@ scheduler_timeout(int fd, short event, void *p) case SCHED_DELAY: tv.tv_sec = batch.delay; log_trace(TRACE_SCHEDULER, - "scheduler: pausing for %li seconds", tv.tv_sec); + "scheduler: pausing for %s", duration_to_text(tv.tv_sec)); break; case SCHED_REMOVE: @@ -366,7 +366,6 @@ scheduler_process_expire(struct scheduler_batch *batch) batch->evpids = e->next; log_debug("scheduler: evp:%016" PRIx64 " expired", e->id); - backend->delete(e->id); imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_QUEUE_EXPIRE, 0, 0, -1, &e->id, sizeof e->id); stat_increment("scheduler.expired", 1); diff --git a/usr.sbin/smtpd/scheduler_ramqueue.c b/usr.sbin/smtpd/scheduler_ramqueue.c index 69b87710a82..18179f24803 100644 --- a/usr.sbin/smtpd/scheduler_ramqueue.c +++ b/usr.sbin/smtpd/scheduler_ramqueue.c @@ -1,4 +1,4 @@ -/* $OpenBSD: scheduler_ramqueue.c,v 1.20 2012/08/25 15:47:47 eric Exp $ */ +/* $OpenBSD: scheduler_ramqueue.c,v 1.21 2012/09/11 08:37:52 eric Exp $ */ /* * Copyright (c) 2012 Gilles Chehade <gilles@openbsd.org> @@ -42,6 +42,8 @@ TAILQ_HEAD(evplist, rq_envelope); struct rq_message { uint32_t msgid; struct tree envelopes; + struct rq_message *sched_next; + struct rq_envelope *sched_mta; }; struct rq_envelope { @@ -50,25 +52,34 @@ struct rq_envelope { uint64_t evpid; int type; -#define RQ_ENVELOPE_INFLIGHT 0x01 -#define RQ_ENVELOPE_EXPIRED 0x02 +#define RQ_ENVELOPE_PENDING 0x01 +#define RQ_ENVELOPE_SCHEDULED 0x02 +#define RQ_ENVELOPE_EXPIRED 0x04 +#define RQ_ENVELOPE_REMOVED 0x08 +#define RQ_ENVELOPE_INFLIGHT 0x10 uint8_t flags; time_t sched; time_t expire; struct rq_message *message; - struct evplist *queue; + + struct rq_envelope *sched_next; + time_t t_inflight; + time_t t_scheduled; }; struct rq_queue { struct tree messages; - struct evplist mda; - struct evplist mta; - struct evplist bounce; - struct evplist inflight; - struct tree expired; - struct tree removed; + + struct evplist pending; + + struct rq_message *sched_mta; + struct rq_envelope *sched_mda; + struct rq_envelope *sched_bounce; + struct rq_envelope *sched_expired; + struct rq_envelope *sched_removed; + }; static void scheduler_ramqueue_init(void); @@ -77,19 +88,21 @@ static void scheduler_ramqueue_commit(uint32_t); static void scheduler_ramqueue_rollback(uint32_t); static void scheduler_ramqueue_update(struct scheduler_info *); static void scheduler_ramqueue_delete(uint64_t); -static void scheduler_ramqueue_batch(int, time_t, struct scheduler_batch *); +static void scheduler_ramqueue_batch(int, struct scheduler_batch *); static void scheduler_ramqueue_schedule(uint64_t); static void scheduler_ramqueue_remove(uint64_t); -static int scheduler_ramqueue_next(int, uint64_t *, time_t *); static void sorted_insert(struct evplist *, struct rq_envelope *); static void sorted_merge(struct evplist *, struct evplist *); static void rq_queue_init(struct rq_queue *); static void rq_queue_merge(struct rq_queue *, struct rq_queue *); -static void rq_queue_dump(struct rq_queue *, const char *, time_t); +static void rq_queue_dump(struct rq_queue *, const char *); +static void rq_queue_schedule(struct rq_queue *rq); +static void rq_envelope_schedule(struct rq_queue *, struct rq_envelope *); +static void rq_envelope_remove(struct rq_queue *, struct rq_envelope *); static void rq_envelope_delete(struct rq_queue *, struct rq_envelope *); -static const char *rq_envelope_to_text(struct rq_envelope *, time_t); +static const char *rq_envelope_to_text(struct rq_envelope *); struct scheduler_backend scheduler_backend_ramqueue = { scheduler_ramqueue_init, @@ -107,8 +120,10 @@ struct scheduler_backend scheduler_backend_ramqueue = { scheduler_ramqueue_remove, }; -static struct rq_queue ramqueue; -static struct tree updates; +static struct rq_queue ramqueue; +static struct tree updates; + +static time_t currtime; extern int verbose; @@ -127,6 +142,8 @@ scheduler_ramqueue_insert(struct scheduler_info *si) struct rq_message *message; struct rq_envelope *envelope; + currtime = time(NULL); + msgid = evpid_to_msgid(si->evpid); /* find/prepare a ramqueue update */ @@ -151,52 +168,33 @@ scheduler_ramqueue_insert(struct scheduler_info *si) envelope->evpid = si->evpid; envelope->type = si->type; envelope->message = message; - envelope->sched = scheduler_compute_schedule(si); envelope->expire = si->creation + si->expire; - - stat_increment("scheduler.ramqueue.envelope", 1); - - if (envelope->expire < envelope->sched) { - envelope->flags |= RQ_ENVELOPE_EXPIRED; - tree_xset(&update->expired, envelope->evpid, envelope); - } - + envelope->sched = scheduler_compute_schedule(si); tree_xset(&message->envelopes, envelope->evpid, envelope); - if (si->type == D_BOUNCE) - envelope->queue = &update->bounce; - else if (si->type == D_MDA) - envelope->queue = &update->mda; - else if (si->type == D_MTA) - envelope->queue = &update->mta; - else - errx(1, "bad type"); - - sorted_insert(envelope->queue, envelope); + stat_increment("scheduler.ramqueue.envelope", 1); - if (verbose & TRACE_SCHEDULER) - rq_queue_dump(update, "inserted", time(NULL)); + envelope->flags = RQ_ENVELOPE_PENDING; + sorted_insert(&update->pending, envelope); } static void scheduler_ramqueue_commit(uint32_t msgid) { struct rq_queue *update; - time_t now; - update = tree_xpop(&updates, msgid); + currtime = time(NULL); - if (verbose & TRACE_SCHEDULER) { - now = time(NULL); - rq_queue_dump(update, "commit update", now); - rq_queue_dump(&ramqueue, "before commit", now); - } - rq_queue_merge(&ramqueue, update); + update = tree_xpop(&updates, msgid); if (verbose & TRACE_SCHEDULER) - rq_queue_dump(&ramqueue, "after commit", time(NULL)); + rq_queue_dump(update, "update to commit"); + + rq_queue_merge(&ramqueue, update); + rq_queue_schedule(&ramqueue); free(update); + stat_decrement("scheduler.ramqueue.update", 1); } @@ -204,17 +202,17 @@ static void scheduler_ramqueue_rollback(uint32_t msgid) { struct rq_queue *update; - struct rq_envelope *envelope; + struct rq_envelope *evp; + + currtime = time(NULL); if ((update = tree_pop(&updates, msgid)) == NULL) return; - while ((envelope = TAILQ_FIRST(&update->bounce))) - rq_envelope_delete(update, envelope); - while ((envelope = TAILQ_FIRST(&update->mda))) - rq_envelope_delete(update, envelope); - while ((envelope = TAILQ_FIRST(&update->mta))) - rq_envelope_delete(update, envelope); + while ((evp = TAILQ_FIRST(&update->pending))) { + TAILQ_REMOVE(&update->pending, evp, entry); + rq_envelope_delete(update, evp); + } free(update); stat_decrement("scheduler.ramqueue.update", 1); @@ -223,282 +221,182 @@ scheduler_ramqueue_rollback(uint32_t msgid) static void scheduler_ramqueue_update(struct scheduler_info *si) { - struct rq_message *message; - struct rq_envelope *envelope; + struct rq_message *msg; + struct rq_envelope *evp; uint32_t msgid; - msgid = evpid_to_msgid(si->evpid); - message = tree_xget(&ramqueue.messages, msgid); - envelope = tree_xget(&message->envelopes, si->evpid); + currtime = time(NULL); - /* it *should* be in-flight */ - if (!(envelope->flags & RQ_ENVELOPE_INFLIGHT)) - log_warnx("evp:%016" PRIx64 " not in-flight", si->evpid); + msgid = evpid_to_msgid(si->evpid); + msg = tree_xget(&ramqueue.messages, msgid); + evp = tree_xget(&msg->envelopes, si->evpid); - envelope->flags &= ~RQ_ENVELOPE_INFLIGHT; - envelope->sched = scheduler_compute_schedule(si); - if (envelope->expire < envelope->sched) { - envelope->flags |= RQ_ENVELOPE_EXPIRED; - tree_xset(&ramqueue.expired, envelope->evpid, envelope); - } + /* it *must* be in-flight */ + if (!(evp->flags & RQ_ENVELOPE_INFLIGHT)) + errx(1, "evp:%016" PRIx64 " not in-flight", si->evpid); - TAILQ_REMOVE(envelope->queue, envelope, entry); - if (si->type == D_BOUNCE) - envelope->queue = &ramqueue.bounce; - else if (si->type == D_MDA) - envelope->queue = &ramqueue.mda; - else if (si->type == D_MTA) - envelope->queue = &ramqueue.mta; + while ((evp->sched = scheduler_compute_schedule(si)) <= currtime) + si->retry += 1; - sorted_insert(envelope->queue, envelope); + evp->flags &= ~RQ_ENVELOPE_INFLIGHT; + evp->flags |= RQ_ENVELOPE_PENDING; + sorted_insert(&ramqueue.pending, evp); } static void scheduler_ramqueue_delete(uint64_t evpid) { - struct rq_message *message; - struct rq_envelope *envelope; + struct rq_message *msg; + struct rq_envelope *evp; uint32_t msgid; + currtime = time(NULL); + msgid = evpid_to_msgid(evpid); - message = tree_xget(&ramqueue.messages, msgid); - envelope = tree_xget(&message->envelopes, evpid); + msg = tree_xget(&ramqueue.messages, msgid); + evp = tree_xget(&msg->envelopes, evpid); /* it *must* be in-flight */ - if (!(envelope->flags & RQ_ENVELOPE_INFLIGHT)) + if (!(evp->flags & RQ_ENVELOPE_INFLIGHT)) errx(1, "evp:%016" PRIx64 " not in-flight", evpid); - rq_envelope_delete(&ramqueue, envelope); -} - -static int -scheduler_ramqueue_next(int typemask, uint64_t *evpid, time_t *sched) -{ - struct rq_envelope *evp_mda = NULL; - struct rq_envelope *evp_mta = NULL; - struct rq_envelope *evp_bounce = NULL; - struct rq_envelope *envelope = NULL; - - if (verbose & TRACE_SCHEDULER) - rq_queue_dump(&ramqueue, "next", time(NULL)); - - *sched = 0; - - if (typemask & SCHED_REMOVE && tree_root(&ramqueue.removed, evpid, NULL)) - return (1); - if (typemask & SCHED_EXPIRE && tree_root(&ramqueue.expired, evpid, NULL)) - return (1); - - /* fetch first envelope from each queue */ - if (typemask & SCHED_BOUNCE) - evp_bounce = TAILQ_FIRST(&ramqueue.bounce); - if (typemask & SCHED_MDA) - evp_mda = TAILQ_FIRST(&ramqueue.mda); - if (typemask & SCHED_MTA) - evp_mta = TAILQ_FIRST(&ramqueue.mta); - - /* set current envelope to either one */ - if (evp_bounce) - envelope = evp_bounce; - else if (evp_mda) - envelope = evp_mda; - else if (evp_mta) - envelope = evp_mta; - else - return (0); - - /* figure out which one should be scheduled first */ - if (evp_bounce && evp_bounce->sched < envelope->sched) - envelope = evp_bounce; - if (evp_mda && evp_mda->sched < envelope->sched) - envelope = evp_mda; - if (evp_mta && evp_mta->sched < envelope->sched) - envelope = evp_mta; - - *evpid = envelope->evpid; - *sched = envelope->sched; - - return (1); + evp->flags &= ~RQ_ENVELOPE_INFLIGHT; + rq_envelope_delete(&ramqueue, evp); } static void -scheduler_ramqueue_batch(int typemask, time_t curr, struct scheduler_batch *ret) +scheduler_ramqueue_batch(int typemask, struct scheduler_batch *ret) { - struct rq_message *message; - struct rq_envelope *envelope; + struct rq_envelope *evp, *tmp, **batch; struct id_list *item; - uint64_t evpid; - void *i; - int type; - time_t sched; - ret->evpids = NULL; + currtime = time(NULL); - if (!scheduler_ramqueue_next(typemask, &evpid, &sched)) { - ret->type = SCHED_NONE; - return; - } + rq_queue_schedule(&ramqueue); + if (verbose & TRACE_SCHEDULER) + rq_queue_dump(&ramqueue, "scheduler_ramqueue_batch()"); - if (tree_get(&ramqueue.removed, evpid)) { + if (typemask & SCHED_REMOVE && ramqueue.sched_removed) { + batch = &ramqueue.sched_removed; ret->type = SCHED_REMOVE; - while (tree_poproot(&ramqueue.removed, &evpid, NULL)) { - item = xmalloc(sizeof *item, "schedule_batch"); - item->id = evpid; - item->next = ret->evpids; - ret->evpids = item; - } - return; } - - message = tree_xget(&ramqueue.messages, evpid_to_msgid(evpid)); - envelope = tree_xget(&message->envelopes, evpid); - - /* if the envelope has expired, return the expired list */ - if (envelope->flags & RQ_ENVELOPE_EXPIRED) { + else if (typemask & SCHED_EXPIRE && ramqueue.sched_expired) { + batch = &ramqueue.sched_expired; ret->type = SCHED_EXPIRE; - while (tree_poproot(&ramqueue.expired, &evpid, (void**)&envelope)) { - TAILQ_REMOVE(envelope->queue, envelope, entry); - TAILQ_INSERT_TAIL(&ramqueue.inflight, envelope, entry); - envelope->flags |= RQ_ENVELOPE_INFLIGHT; - item = xmalloc(sizeof *item, "schedule_batch"); - item->id = evpid; - item->next = ret->evpids; - ret->evpids = item; - } - return; } - - if (sched > curr) { + else if (typemask & SCHED_BOUNCE && ramqueue.sched_bounce) { + batch = &ramqueue.sched_bounce; + ret->type = SCHED_BOUNCE; + } + else if (typemask & SCHED_MDA && ramqueue.sched_mda) { + batch = &ramqueue.sched_mda; + ret->type = SCHED_MDA; + } + else if (typemask & SCHED_MTA && ramqueue.sched_mta) { + batch = &ramqueue.sched_mta->sched_mta; + ramqueue.sched_mta = ramqueue.sched_mta->sched_next; + ret->type = SCHED_MTA; + } + else if ((evp = TAILQ_FIRST(&ramqueue.pending))) { ret->type = SCHED_DELAY; - ret->delay = sched - curr; + if (evp->sched < evp->expire) + ret->delay = evp->sched - currtime; + else + ret->delay = evp->expire - currtime; + return; + } + else { + ret->type = SCHED_NONE; return; } - type = envelope->type; - if (type == D_BOUNCE) - ret->type = SCHED_BOUNCE; - else if (type == D_MDA) - ret->type = SCHED_MDA; - else if (type == D_MTA) - ret->type = SCHED_MTA; + ret->evpids = NULL; + for(evp = *batch; evp; evp = tmp) { + tmp = evp->sched_next; + + /* consistency check */ + if (!(evp->flags & RQ_ENVELOPE_SCHEDULED)) + errx(1, "evp:%016" PRIx64 " not scheduled", evp->evpid); - i = NULL; - while((tree_iter(&message->envelopes, &i, &evpid, (void*)&envelope))) { - if (envelope->type != type) - continue; - if (envelope->sched > curr) - continue; - if (envelope->flags & RQ_ENVELOPE_INFLIGHT) - continue; - if (envelope->flags & RQ_ENVELOPE_EXPIRED) - continue; - TAILQ_REMOVE(envelope->queue, envelope, entry); - TAILQ_INSERT_TAIL(&ramqueue.inflight, envelope, entry); - envelope->queue = &ramqueue.inflight; - envelope->flags |= RQ_ENVELOPE_INFLIGHT; item = xmalloc(sizeof *item, "schedule_batch"); - item->id = evpid; + item->id = evp->evpid; item->next = ret->evpids; ret->evpids = item; + evp->sched_next = NULL; + if (ret->type == SCHED_REMOVE || ret->type == SCHED_EXPIRE) + rq_envelope_delete(&ramqueue, evp); + else { + evp->flags &= ~RQ_ENVELOPE_SCHEDULED; + evp->flags |= RQ_ENVELOPE_INFLIGHT; + evp->t_inflight = currtime; + } } -} + *batch = NULL; +} static void scheduler_ramqueue_schedule(uint64_t evpid) { - struct rq_message *message; - struct rq_envelope *envelope; + struct rq_message *msg; + struct rq_envelope *evp; uint32_t msgid; void *i, *j; + currtime = time(NULL); + if (evpid == 0) { j = NULL; - while (tree_iter(&ramqueue.messages, &j, NULL, - (void*)(&message))) { - + while (tree_iter(&ramqueue.messages, &j, NULL, (void*)(&msg))) { i = NULL; - while (tree_iter(&message->envelopes, &i, &evpid, - (void*)(&envelope))) { - if (envelope->flags & RQ_ENVELOPE_INFLIGHT) - continue; - - envelope->sched = time(NULL); - TAILQ_REMOVE(envelope->queue, envelope, entry); - sorted_insert(envelope->queue, envelope); - } + while (tree_iter(&msg->envelopes, &i, NULL, + (void*)(&evp))) + rq_envelope_schedule(&ramqueue, evp); } } else if (evpid > 0xffffffff) { msgid = evpid_to_msgid(evpid); - if ((message = tree_get(&ramqueue.messages, msgid)) == NULL) - return; - if ((envelope = tree_get(&message->envelopes, evpid)) == NULL) + if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) return; - if (envelope->flags & RQ_ENVELOPE_INFLIGHT) + if ((evp = tree_get(&msg->envelopes, evpid)) == NULL) return; - - envelope->sched = time(NULL); - TAILQ_REMOVE(envelope->queue, envelope, entry); - sorted_insert(envelope->queue, envelope); + rq_envelope_schedule(&ramqueue, evp); } else { msgid = evpid; - if ((message = tree_get(&ramqueue.messages, msgid)) == NULL) + if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) return; - i = NULL; - while (tree_iter(&message->envelopes, &i, &evpid, - (void*)(&envelope))) { - if (envelope->flags & RQ_ENVELOPE_INFLIGHT) - continue; - - envelope->sched = time(NULL); - TAILQ_REMOVE(envelope->queue, envelope, entry); - sorted_insert(envelope->queue, envelope); - } + while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) + rq_envelope_schedule(&ramqueue, evp); } - } static void scheduler_ramqueue_remove(uint64_t evpid) { - struct rq_message *message; - struct rq_envelope *envelope; + struct rq_message *msg; + struct rq_envelope *evp; uint32_t msgid; - struct evplist rmlist; void *i; + currtime = time(NULL); + if (evpid > 0xffffffff) { msgid = evpid_to_msgid(evpid); - if ((message = tree_get(&ramqueue.messages, msgid)) == NULL) - return; - if ((envelope = tree_get(&message->envelopes, evpid)) == NULL) + if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) return; - if (envelope->flags & RQ_ENVELOPE_INFLIGHT) + if ((evp = tree_get(&msg->envelopes, evpid)) == NULL) return; - rq_envelope_delete(&ramqueue, envelope); - tree_xset(&ramqueue.removed, evpid, &ramqueue); + rq_envelope_remove(&ramqueue, evp); } else { msgid = evpid; - if ((message = tree_get(&ramqueue.messages, msgid)) == NULL) + if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) return; - - TAILQ_INIT(&rmlist); i = NULL; - while (tree_iter(&message->envelopes, &i, &evpid, - (void*)(&envelope))) { - if (envelope->flags & RQ_ENVELOPE_INFLIGHT) - continue; - tree_xset(&ramqueue.removed, evpid, &ramqueue); - TAILQ_REMOVE(envelope->queue, envelope, entry); - envelope->queue = &rmlist; - TAILQ_INSERT_HEAD(&rmlist, envelope, entry); - } - while((envelope = TAILQ_FIRST(&rmlist))) - rq_envelope_delete(&ramqueue, envelope); + while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) + rq_envelope_remove(&ramqueue, evp); } } @@ -506,9 +404,11 @@ static void sorted_insert(struct evplist *list, struct rq_envelope *evp) { struct rq_envelope *item; + time_t ref; TAILQ_FOREACH(item, list, entry) { - if (evp->sched < item->sched) { + ref = (evp->sched < evp->expire) ? evp->sched : evp->expire; + if (ref <= item->expire && ref <= item->sched) { TAILQ_INSERT_BEFORE(item, evp, entry); return; } @@ -525,7 +425,6 @@ sorted_merge(struct evplist *list, struct evplist *from) while ((e = TAILQ_LAST(from, evplist))) { TAILQ_REMOVE(from, e, entry); sorted_insert(list, e); - e->queue = list; } } @@ -533,14 +432,8 @@ static void rq_queue_init(struct rq_queue *rq) { bzero(rq, sizeof *rq); - tree_init(&rq->messages); - tree_init(&rq->expired); - tree_init(&rq->removed); - TAILQ_INIT(&rq->mda); - TAILQ_INIT(&rq->mta); - TAILQ_INIT(&rq->bounce); - TAILQ_INIT(&rq->inflight); + TAILQ_INIT(&rq->pending); } static void @@ -566,40 +459,98 @@ rq_queue_merge(struct rq_queue *rq, struct rq_queue *update) free(message); } - sorted_merge(&rq->bounce, &update->bounce); - sorted_merge(&rq->mda, &update->mda); - sorted_merge(&rq->mta, &update->mta); + sorted_merge(&rq->pending, &update->pending); +} - tree_merge(&rq->expired, &update->expired); - tree_merge(&rq->removed, &update->removed); +static void +rq_queue_schedule(struct rq_queue *rq) +{ + struct rq_envelope *evp; + + while ((evp = TAILQ_FIRST(&rq->pending))) { + if (evp->sched > currtime && evp->expire > currtime) + break; + + /* it *must* be pending */ + if (evp->flags != RQ_ENVELOPE_PENDING) + errx(1, "evp:%016" PRIx64 " flags=0x%x", evp->evpid, + evp->flags); + + if (evp->expire <= currtime) { + TAILQ_REMOVE(&rq->pending, evp, entry); + evp->flags &= ~RQ_ENVELOPE_PENDING; + evp->flags |= RQ_ENVELOPE_EXPIRED; + evp->flags |= RQ_ENVELOPE_SCHEDULED; + evp->t_scheduled = currtime; + evp->sched_next = rq->sched_expired; + rq->sched_expired = evp; + continue; + } + rq_envelope_schedule(rq, evp); + } } static void -rq_envelope_delete(struct rq_queue *rq, struct rq_envelope *envelope) +rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp) { - struct rq_message *message; - uint32_t msgid; + if (evp->flags & (RQ_ENVELOPE_SCHEDULED | RQ_ENVELOPE_INFLIGHT)) + return; - if (envelope->flags & RQ_ENVELOPE_EXPIRED) - tree_pop(&rq->expired, envelope->evpid); + if (evp->flags & RQ_ENVELOPE_PENDING) + TAILQ_REMOVE(&rq->pending, evp, entry); - TAILQ_REMOVE(envelope->queue, envelope, entry); - message = envelope->message; - msgid = message->msgid; + if (evp->type == D_MTA) { + if (evp->message->sched_mta == NULL) { + evp->message->sched_next = rq->sched_mta; + rq->sched_mta = evp->message; + } + evp->sched_next = evp->message->sched_mta; + evp->message->sched_mta = evp; + } + else if (evp->type == D_MDA) { + evp->sched_next = rq->sched_mda; + rq->sched_mda = evp; + } + else if (evp->type == D_BOUNCE) { + evp->sched_next = rq->sched_bounce; + rq->sched_bounce = evp; + } + evp->flags &= ~RQ_ENVELOPE_PENDING; + evp->flags |= RQ_ENVELOPE_SCHEDULED; + evp->t_scheduled = currtime; +} - tree_xpop(&message->envelopes, envelope->evpid); - if (tree_empty(&message->envelopes)) { - tree_xpop(&rq->messages, msgid); - free(message); +static void +rq_envelope_remove(struct rq_queue *rq, struct rq_envelope *evp) +{ + if (!(evp->flags & (RQ_ENVELOPE_PENDING))) + return; + + TAILQ_REMOVE(&rq->pending, evp, entry); + evp->sched_next = rq->sched_removed; + rq->sched_removed = evp; + evp->flags &= ~RQ_ENVELOPE_PENDING; + evp->flags |= RQ_ENVELOPE_REMOVED; + evp->flags |= RQ_ENVELOPE_SCHEDULED; + evp->t_scheduled = currtime; +} + +static void +rq_envelope_delete(struct rq_queue *rq, struct rq_envelope *evp) +{ + tree_xpop(&evp->message->envelopes, evp->evpid); + if (tree_empty(&evp->message->envelopes)) { + tree_xpop(&rq->messages, evp->message->msgid); + free(evp->message); stat_decrement("scheduler.ramqueue.message", 1); } - free(envelope); + free(evp); stat_decrement("scheduler.ramqueue.envelope", 1); } static const char * -rq_envelope_to_text(struct rq_envelope *e, time_t tref) +rq_envelope_to_text(struct rq_envelope *e) { static char buf[256]; char t[64]; @@ -613,15 +564,28 @@ rq_envelope_to_text(struct rq_envelope *e, time_t tref) else if (e->type == D_MTA) strlcat(buf, "mta", sizeof buf); - snprintf(t, sizeof t, ",sched=%s", duration_to_text(e->sched - tref)); - strlcat(buf, t, sizeof buf); - snprintf(t, sizeof t, ",exp=%s", duration_to_text(e->expire - tref)); + snprintf(t, sizeof t, ",expire=%s", duration_to_text(e->expire - currtime)); strlcat(buf, t, sizeof buf); + if (e->flags & RQ_ENVELOPE_PENDING) { + snprintf(t, sizeof t, ",pending=%s", + duration_to_text(e->sched - currtime)); + strlcat(buf, t, sizeof buf); + } + if (e->flags & RQ_ENVELOPE_SCHEDULED) { + snprintf(t, sizeof t, ",scheduled=%s", + duration_to_text(currtime - e->t_scheduled)); + strlcat(buf, t, sizeof buf); + } + if (e->flags & RQ_ENVELOPE_INFLIGHT) { + snprintf(t, sizeof t, ",inflight=%s", + duration_to_text(currtime - e->t_inflight)); + strlcat(buf, t, sizeof buf); + } + if (e->flags & RQ_ENVELOPE_REMOVED) + strlcat(buf, ",removed", sizeof buf); if (e->flags & RQ_ENVELOPE_EXPIRED) strlcat(buf, ",expired", sizeof buf); - if (e->flags & RQ_ENVELOPE_INFLIGHT) - strlcat(buf, ",in-flight", sizeof buf); strlcat(buf, "]", sizeof buf); @@ -629,7 +593,7 @@ rq_envelope_to_text(struct rq_envelope *e, time_t tref) } static void -rq_queue_dump(struct rq_queue *rq, const char * name, time_t tref) +rq_queue_dump(struct rq_queue *rq, const char * name) { struct rq_message *message; struct rq_envelope *envelope; @@ -644,20 +608,8 @@ rq_queue_dump(struct rq_queue *rq, const char * name, time_t tref) j = NULL; while((tree_iter(&message->envelopes, &j, &id, (void*)&envelope))) - log_debug("| %s", rq_envelope_to_text(envelope, tref)); + log_debug("| %s", + rq_envelope_to_text(envelope)); } - - log_debug("| bounces:"); - TAILQ_FOREACH(envelope, &rq->bounce, entry) - log_debug("| %s", rq_envelope_to_text(envelope, tref)); - log_debug("| mda:"); - TAILQ_FOREACH(envelope, &rq->mda, entry) - log_debug("| %s", rq_envelope_to_text(envelope, tref)); - log_debug("| mta:"); - TAILQ_FOREACH(envelope, &rq->mta, entry) - log_debug("| %s", rq_envelope_to_text(envelope, tref)); - log_debug("| in-flight:"); - TAILQ_FOREACH(envelope, &rq->inflight, entry) - log_debug("| %s", rq_envelope_to_text(envelope, tref)); log_debug("\\---"); } diff --git a/usr.sbin/smtpd/smtpd.h b/usr.sbin/smtpd/smtpd.h index 6737076a756..9847456ed3a 100644 --- a/usr.sbin/smtpd/smtpd.h +++ b/usr.sbin/smtpd/smtpd.h @@ -1,4 +1,4 @@ -/* $OpenBSD: smtpd.h,v 1.346 2012/09/01 16:25:27 gilles Exp $ */ +/* $OpenBSD: smtpd.h,v 1.347 2012/09/11 08:37:52 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> @@ -894,7 +894,7 @@ struct scheduler_backend { void (*update)(struct scheduler_info *); void (*delete)(uint64_t); - void (*batch)(int, time_t, struct scheduler_batch *); + void (*batch)(int, struct scheduler_batch *); void (*schedule)(uint64_t); void (*remove)(uint64_t); |