summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreric <eric@openbsd.org>2012-09-11 08:37:52 +0000
committereric <eric@openbsd.org>2012-09-11 08:37:52 +0000
commit010bda1b2945eae83dfcf0832c80e8f1456ac1b8 (patch)
treef73b3c8028185526dfc37af38ba54f66c3ae27cc
parentCheck that the host supports GET_SPEED as well as GET_VERSION before deciding (diff)
downloadwireguard-openbsd-010bda1b2945eae83dfcf0832c80e8f1456ac1b8.tar.xz
wireguard-openbsd-010bda1b2945eae83dfcf0832c80e8f1456ac1b8.zip
Rework the scheduler internals. Fix some scheduling loop issues and
handle envelope scheduling/expiration better. ok gilles@
-rw-r--r--usr.sbin/smtpd/scheduler.c7
-rw-r--r--usr.sbin/smtpd/scheduler_ramqueue.c548
-rw-r--r--usr.sbin/smtpd/smtpd.h4
3 files changed, 255 insertions, 304 deletions
diff --git a/usr.sbin/smtpd/scheduler.c b/usr.sbin/smtpd/scheduler.c
index 541edbcf090..ffc556a2b22 100644
--- a/usr.sbin/smtpd/scheduler.c
+++ b/usr.sbin/smtpd/scheduler.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: scheduler.c,v 1.19 2012/08/25 22:03:26 gilles Exp $ */
+/* $OpenBSD: scheduler.c,v 1.20 2012/09/11 08:37:52 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -302,7 +302,7 @@ scheduler_timeout(int fd, short event, void *p)
if (!(env->sc_flags & SMTPD_MTA_PAUSED))
typemask |= SCHED_MTA;
- backend->batch(typemask, time(NULL), &batch);
+ backend->batch(typemask, &batch);
switch (batch.type) {
case SCHED_NONE:
log_trace(TRACE_SCHEDULER, "scheduler: sleeping");
@@ -311,7 +311,7 @@ scheduler_timeout(int fd, short event, void *p)
case SCHED_DELAY:
tv.tv_sec = batch.delay;
log_trace(TRACE_SCHEDULER,
- "scheduler: pausing for %li seconds", tv.tv_sec);
+ "scheduler: pausing for %s", duration_to_text(tv.tv_sec));
break;
case SCHED_REMOVE:
@@ -366,7 +366,6 @@ scheduler_process_expire(struct scheduler_batch *batch)
batch->evpids = e->next;
log_debug("scheduler: evp:%016" PRIx64 " expired",
e->id);
- backend->delete(e->id);
imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_QUEUE_EXPIRE,
0, 0, -1, &e->id, sizeof e->id);
stat_increment("scheduler.expired", 1);
diff --git a/usr.sbin/smtpd/scheduler_ramqueue.c b/usr.sbin/smtpd/scheduler_ramqueue.c
index 69b87710a82..18179f24803 100644
--- a/usr.sbin/smtpd/scheduler_ramqueue.c
+++ b/usr.sbin/smtpd/scheduler_ramqueue.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: scheduler_ramqueue.c,v 1.20 2012/08/25 15:47:47 eric Exp $ */
+/* $OpenBSD: scheduler_ramqueue.c,v 1.21 2012/09/11 08:37:52 eric Exp $ */
/*
* Copyright (c) 2012 Gilles Chehade <gilles@openbsd.org>
@@ -42,6 +42,8 @@ TAILQ_HEAD(evplist, rq_envelope);
struct rq_message {
uint32_t msgid;
struct tree envelopes;
+ struct rq_message *sched_next;
+ struct rq_envelope *sched_mta;
};
struct rq_envelope {
@@ -50,25 +52,34 @@ struct rq_envelope {
uint64_t evpid;
int type;
-#define RQ_ENVELOPE_INFLIGHT 0x01
-#define RQ_ENVELOPE_EXPIRED 0x02
+#define RQ_ENVELOPE_PENDING 0x01
+#define RQ_ENVELOPE_SCHEDULED 0x02
+#define RQ_ENVELOPE_EXPIRED 0x04
+#define RQ_ENVELOPE_REMOVED 0x08
+#define RQ_ENVELOPE_INFLIGHT 0x10
uint8_t flags;
time_t sched;
time_t expire;
struct rq_message *message;
- struct evplist *queue;
+
+ struct rq_envelope *sched_next;
+ time_t t_inflight;
+ time_t t_scheduled;
};
struct rq_queue {
struct tree messages;
- struct evplist mda;
- struct evplist mta;
- struct evplist bounce;
- struct evplist inflight;
- struct tree expired;
- struct tree removed;
+
+ struct evplist pending;
+
+ struct rq_message *sched_mta;
+ struct rq_envelope *sched_mda;
+ struct rq_envelope *sched_bounce;
+ struct rq_envelope *sched_expired;
+ struct rq_envelope *sched_removed;
+
};
static void scheduler_ramqueue_init(void);
@@ -77,19 +88,21 @@ static void scheduler_ramqueue_commit(uint32_t);
static void scheduler_ramqueue_rollback(uint32_t);
static void scheduler_ramqueue_update(struct scheduler_info *);
static void scheduler_ramqueue_delete(uint64_t);
-static void scheduler_ramqueue_batch(int, time_t, struct scheduler_batch *);
+static void scheduler_ramqueue_batch(int, struct scheduler_batch *);
static void scheduler_ramqueue_schedule(uint64_t);
static void scheduler_ramqueue_remove(uint64_t);
-static int scheduler_ramqueue_next(int, uint64_t *, time_t *);
static void sorted_insert(struct evplist *, struct rq_envelope *);
static void sorted_merge(struct evplist *, struct evplist *);
static void rq_queue_init(struct rq_queue *);
static void rq_queue_merge(struct rq_queue *, struct rq_queue *);
-static void rq_queue_dump(struct rq_queue *, const char *, time_t);
+static void rq_queue_dump(struct rq_queue *, const char *);
+static void rq_queue_schedule(struct rq_queue *rq);
+static void rq_envelope_schedule(struct rq_queue *, struct rq_envelope *);
+static void rq_envelope_remove(struct rq_queue *, struct rq_envelope *);
static void rq_envelope_delete(struct rq_queue *, struct rq_envelope *);
-static const char *rq_envelope_to_text(struct rq_envelope *, time_t);
+static const char *rq_envelope_to_text(struct rq_envelope *);
struct scheduler_backend scheduler_backend_ramqueue = {
scheduler_ramqueue_init,
@@ -107,8 +120,10 @@ struct scheduler_backend scheduler_backend_ramqueue = {
scheduler_ramqueue_remove,
};
-static struct rq_queue ramqueue;
-static struct tree updates;
+static struct rq_queue ramqueue;
+static struct tree updates;
+
+static time_t currtime;
extern int verbose;
@@ -127,6 +142,8 @@ scheduler_ramqueue_insert(struct scheduler_info *si)
struct rq_message *message;
struct rq_envelope *envelope;
+ currtime = time(NULL);
+
msgid = evpid_to_msgid(si->evpid);
/* find/prepare a ramqueue update */
@@ -151,52 +168,33 @@ scheduler_ramqueue_insert(struct scheduler_info *si)
envelope->evpid = si->evpid;
envelope->type = si->type;
envelope->message = message;
- envelope->sched = scheduler_compute_schedule(si);
envelope->expire = si->creation + si->expire;
-
- stat_increment("scheduler.ramqueue.envelope", 1);
-
- if (envelope->expire < envelope->sched) {
- envelope->flags |= RQ_ENVELOPE_EXPIRED;
- tree_xset(&update->expired, envelope->evpid, envelope);
- }
-
+ envelope->sched = scheduler_compute_schedule(si);
tree_xset(&message->envelopes, envelope->evpid, envelope);
- if (si->type == D_BOUNCE)
- envelope->queue = &update->bounce;
- else if (si->type == D_MDA)
- envelope->queue = &update->mda;
- else if (si->type == D_MTA)
- envelope->queue = &update->mta;
- else
- errx(1, "bad type");
-
- sorted_insert(envelope->queue, envelope);
+ stat_increment("scheduler.ramqueue.envelope", 1);
- if (verbose & TRACE_SCHEDULER)
- rq_queue_dump(update, "inserted", time(NULL));
+ envelope->flags = RQ_ENVELOPE_PENDING;
+ sorted_insert(&update->pending, envelope);
}
static void
scheduler_ramqueue_commit(uint32_t msgid)
{
struct rq_queue *update;
- time_t now;
- update = tree_xpop(&updates, msgid);
+ currtime = time(NULL);
- if (verbose & TRACE_SCHEDULER) {
- now = time(NULL);
- rq_queue_dump(update, "commit update", now);
- rq_queue_dump(&ramqueue, "before commit", now);
- }
- rq_queue_merge(&ramqueue, update);
+ update = tree_xpop(&updates, msgid);
if (verbose & TRACE_SCHEDULER)
- rq_queue_dump(&ramqueue, "after commit", time(NULL));
+ rq_queue_dump(update, "update to commit");
+
+ rq_queue_merge(&ramqueue, update);
+ rq_queue_schedule(&ramqueue);
free(update);
+
stat_decrement("scheduler.ramqueue.update", 1);
}
@@ -204,17 +202,17 @@ static void
scheduler_ramqueue_rollback(uint32_t msgid)
{
struct rq_queue *update;
- struct rq_envelope *envelope;
+ struct rq_envelope *evp;
+
+ currtime = time(NULL);
if ((update = tree_pop(&updates, msgid)) == NULL)
return;
- while ((envelope = TAILQ_FIRST(&update->bounce)))
- rq_envelope_delete(update, envelope);
- while ((envelope = TAILQ_FIRST(&update->mda)))
- rq_envelope_delete(update, envelope);
- while ((envelope = TAILQ_FIRST(&update->mta)))
- rq_envelope_delete(update, envelope);
+ while ((evp = TAILQ_FIRST(&update->pending))) {
+ TAILQ_REMOVE(&update->pending, evp, entry);
+ rq_envelope_delete(update, evp);
+ }
free(update);
stat_decrement("scheduler.ramqueue.update", 1);
@@ -223,282 +221,182 @@ scheduler_ramqueue_rollback(uint32_t msgid)
static void
scheduler_ramqueue_update(struct scheduler_info *si)
{
- struct rq_message *message;
- struct rq_envelope *envelope;
+ struct rq_message *msg;
+ struct rq_envelope *evp;
uint32_t msgid;
- msgid = evpid_to_msgid(si->evpid);
- message = tree_xget(&ramqueue.messages, msgid);
- envelope = tree_xget(&message->envelopes, si->evpid);
+ currtime = time(NULL);
- /* it *should* be in-flight */
- if (!(envelope->flags & RQ_ENVELOPE_INFLIGHT))
- log_warnx("evp:%016" PRIx64 " not in-flight", si->evpid);
+ msgid = evpid_to_msgid(si->evpid);
+ msg = tree_xget(&ramqueue.messages, msgid);
+ evp = tree_xget(&msg->envelopes, si->evpid);
- envelope->flags &= ~RQ_ENVELOPE_INFLIGHT;
- envelope->sched = scheduler_compute_schedule(si);
- if (envelope->expire < envelope->sched) {
- envelope->flags |= RQ_ENVELOPE_EXPIRED;
- tree_xset(&ramqueue.expired, envelope->evpid, envelope);
- }
+ /* it *must* be in-flight */
+ if (!(evp->flags & RQ_ENVELOPE_INFLIGHT))
+ errx(1, "evp:%016" PRIx64 " not in-flight", si->evpid);
- TAILQ_REMOVE(envelope->queue, envelope, entry);
- if (si->type == D_BOUNCE)
- envelope->queue = &ramqueue.bounce;
- else if (si->type == D_MDA)
- envelope->queue = &ramqueue.mda;
- else if (si->type == D_MTA)
- envelope->queue = &ramqueue.mta;
+ while ((evp->sched = scheduler_compute_schedule(si)) <= currtime)
+ si->retry += 1;
- sorted_insert(envelope->queue, envelope);
+ evp->flags &= ~RQ_ENVELOPE_INFLIGHT;
+ evp->flags |= RQ_ENVELOPE_PENDING;
+ sorted_insert(&ramqueue.pending, evp);
}
static void
scheduler_ramqueue_delete(uint64_t evpid)
{
- struct rq_message *message;
- struct rq_envelope *envelope;
+ struct rq_message *msg;
+ struct rq_envelope *evp;
uint32_t msgid;
+ currtime = time(NULL);
+
msgid = evpid_to_msgid(evpid);
- message = tree_xget(&ramqueue.messages, msgid);
- envelope = tree_xget(&message->envelopes, evpid);
+ msg = tree_xget(&ramqueue.messages, msgid);
+ evp = tree_xget(&msg->envelopes, evpid);
/* it *must* be in-flight */
- if (!(envelope->flags & RQ_ENVELOPE_INFLIGHT))
+ if (!(evp->flags & RQ_ENVELOPE_INFLIGHT))
errx(1, "evp:%016" PRIx64 " not in-flight", evpid);
- rq_envelope_delete(&ramqueue, envelope);
-}
-
-static int
-scheduler_ramqueue_next(int typemask, uint64_t *evpid, time_t *sched)
-{
- struct rq_envelope *evp_mda = NULL;
- struct rq_envelope *evp_mta = NULL;
- struct rq_envelope *evp_bounce = NULL;
- struct rq_envelope *envelope = NULL;
-
- if (verbose & TRACE_SCHEDULER)
- rq_queue_dump(&ramqueue, "next", time(NULL));
-
- *sched = 0;
-
- if (typemask & SCHED_REMOVE && tree_root(&ramqueue.removed, evpid, NULL))
- return (1);
- if (typemask & SCHED_EXPIRE && tree_root(&ramqueue.expired, evpid, NULL))
- return (1);
-
- /* fetch first envelope from each queue */
- if (typemask & SCHED_BOUNCE)
- evp_bounce = TAILQ_FIRST(&ramqueue.bounce);
- if (typemask & SCHED_MDA)
- evp_mda = TAILQ_FIRST(&ramqueue.mda);
- if (typemask & SCHED_MTA)
- evp_mta = TAILQ_FIRST(&ramqueue.mta);
-
- /* set current envelope to either one */
- if (evp_bounce)
- envelope = evp_bounce;
- else if (evp_mda)
- envelope = evp_mda;
- else if (evp_mta)
- envelope = evp_mta;
- else
- return (0);
-
- /* figure out which one should be scheduled first */
- if (evp_bounce && evp_bounce->sched < envelope->sched)
- envelope = evp_bounce;
- if (evp_mda && evp_mda->sched < envelope->sched)
- envelope = evp_mda;
- if (evp_mta && evp_mta->sched < envelope->sched)
- envelope = evp_mta;
-
- *evpid = envelope->evpid;
- *sched = envelope->sched;
-
- return (1);
+ evp->flags &= ~RQ_ENVELOPE_INFLIGHT;
+ rq_envelope_delete(&ramqueue, evp);
}
static void
-scheduler_ramqueue_batch(int typemask, time_t curr, struct scheduler_batch *ret)
+scheduler_ramqueue_batch(int typemask, struct scheduler_batch *ret)
{
- struct rq_message *message;
- struct rq_envelope *envelope;
+ struct rq_envelope *evp, *tmp, **batch;
struct id_list *item;
- uint64_t evpid;
- void *i;
- int type;
- time_t sched;
- ret->evpids = NULL;
+ currtime = time(NULL);
- if (!scheduler_ramqueue_next(typemask, &evpid, &sched)) {
- ret->type = SCHED_NONE;
- return;
- }
+ rq_queue_schedule(&ramqueue);
+ if (verbose & TRACE_SCHEDULER)
+ rq_queue_dump(&ramqueue, "scheduler_ramqueue_batch()");
- if (tree_get(&ramqueue.removed, evpid)) {
+ if (typemask & SCHED_REMOVE && ramqueue.sched_removed) {
+ batch = &ramqueue.sched_removed;
ret->type = SCHED_REMOVE;
- while (tree_poproot(&ramqueue.removed, &evpid, NULL)) {
- item = xmalloc(sizeof *item, "schedule_batch");
- item->id = evpid;
- item->next = ret->evpids;
- ret->evpids = item;
- }
- return;
}
-
- message = tree_xget(&ramqueue.messages, evpid_to_msgid(evpid));
- envelope = tree_xget(&message->envelopes, evpid);
-
- /* if the envelope has expired, return the expired list */
- if (envelope->flags & RQ_ENVELOPE_EXPIRED) {
+ else if (typemask & SCHED_EXPIRE && ramqueue.sched_expired) {
+ batch = &ramqueue.sched_expired;
ret->type = SCHED_EXPIRE;
- while (tree_poproot(&ramqueue.expired, &evpid, (void**)&envelope)) {
- TAILQ_REMOVE(envelope->queue, envelope, entry);
- TAILQ_INSERT_TAIL(&ramqueue.inflight, envelope, entry);
- envelope->flags |= RQ_ENVELOPE_INFLIGHT;
- item = xmalloc(sizeof *item, "schedule_batch");
- item->id = evpid;
- item->next = ret->evpids;
- ret->evpids = item;
- }
- return;
}
-
- if (sched > curr) {
+ else if (typemask & SCHED_BOUNCE && ramqueue.sched_bounce) {
+ batch = &ramqueue.sched_bounce;
+ ret->type = SCHED_BOUNCE;
+ }
+ else if (typemask & SCHED_MDA && ramqueue.sched_mda) {
+ batch = &ramqueue.sched_mda;
+ ret->type = SCHED_MDA;
+ }
+ else if (typemask & SCHED_MTA && ramqueue.sched_mta) {
+ batch = &ramqueue.sched_mta->sched_mta;
+ ramqueue.sched_mta = ramqueue.sched_mta->sched_next;
+ ret->type = SCHED_MTA;
+ }
+ else if ((evp = TAILQ_FIRST(&ramqueue.pending))) {
ret->type = SCHED_DELAY;
- ret->delay = sched - curr;
+ if (evp->sched < evp->expire)
+ ret->delay = evp->sched - currtime;
+ else
+ ret->delay = evp->expire - currtime;
+ return;
+ }
+ else {
+ ret->type = SCHED_NONE;
return;
}
- type = envelope->type;
- if (type == D_BOUNCE)
- ret->type = SCHED_BOUNCE;
- else if (type == D_MDA)
- ret->type = SCHED_MDA;
- else if (type == D_MTA)
- ret->type = SCHED_MTA;
+ ret->evpids = NULL;
+ for(evp = *batch; evp; evp = tmp) {
+ tmp = evp->sched_next;
+
+ /* consistency check */
+ if (!(evp->flags & RQ_ENVELOPE_SCHEDULED))
+ errx(1, "evp:%016" PRIx64 " not scheduled", evp->evpid);
- i = NULL;
- while((tree_iter(&message->envelopes, &i, &evpid, (void*)&envelope))) {
- if (envelope->type != type)
- continue;
- if (envelope->sched > curr)
- continue;
- if (envelope->flags & RQ_ENVELOPE_INFLIGHT)
- continue;
- if (envelope->flags & RQ_ENVELOPE_EXPIRED)
- continue;
- TAILQ_REMOVE(envelope->queue, envelope, entry);
- TAILQ_INSERT_TAIL(&ramqueue.inflight, envelope, entry);
- envelope->queue = &ramqueue.inflight;
- envelope->flags |= RQ_ENVELOPE_INFLIGHT;
item = xmalloc(sizeof *item, "schedule_batch");
- item->id = evpid;
+ item->id = evp->evpid;
item->next = ret->evpids;
ret->evpids = item;
+ evp->sched_next = NULL;
+ if (ret->type == SCHED_REMOVE || ret->type == SCHED_EXPIRE)
+ rq_envelope_delete(&ramqueue, evp);
+ else {
+ evp->flags &= ~RQ_ENVELOPE_SCHEDULED;
+ evp->flags |= RQ_ENVELOPE_INFLIGHT;
+ evp->t_inflight = currtime;
+ }
}
-}
+ *batch = NULL;
+}
static void
scheduler_ramqueue_schedule(uint64_t evpid)
{
- struct rq_message *message;
- struct rq_envelope *envelope;
+ struct rq_message *msg;
+ struct rq_envelope *evp;
uint32_t msgid;
void *i, *j;
+ currtime = time(NULL);
+
if (evpid == 0) {
j = NULL;
- while (tree_iter(&ramqueue.messages, &j, NULL,
- (void*)(&message))) {
-
+ while (tree_iter(&ramqueue.messages, &j, NULL, (void*)(&msg))) {
i = NULL;
- while (tree_iter(&message->envelopes, &i, &evpid,
- (void*)(&envelope))) {
- if (envelope->flags & RQ_ENVELOPE_INFLIGHT)
- continue;
-
- envelope->sched = time(NULL);
- TAILQ_REMOVE(envelope->queue, envelope, entry);
- sorted_insert(envelope->queue, envelope);
- }
+ while (tree_iter(&msg->envelopes, &i, NULL,
+ (void*)(&evp)))
+ rq_envelope_schedule(&ramqueue, evp);
}
}
else if (evpid > 0xffffffff) {
msgid = evpid_to_msgid(evpid);
- if ((message = tree_get(&ramqueue.messages, msgid)) == NULL)
- return;
- if ((envelope = tree_get(&message->envelopes, evpid)) == NULL)
+ if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
return;
- if (envelope->flags & RQ_ENVELOPE_INFLIGHT)
+ if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
return;
-
- envelope->sched = time(NULL);
- TAILQ_REMOVE(envelope->queue, envelope, entry);
- sorted_insert(envelope->queue, envelope);
+ rq_envelope_schedule(&ramqueue, evp);
}
else {
msgid = evpid;
- if ((message = tree_get(&ramqueue.messages, msgid)) == NULL)
+ if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
return;
-
i = NULL;
- while (tree_iter(&message->envelopes, &i, &evpid,
- (void*)(&envelope))) {
- if (envelope->flags & RQ_ENVELOPE_INFLIGHT)
- continue;
-
- envelope->sched = time(NULL);
- TAILQ_REMOVE(envelope->queue, envelope, entry);
- sorted_insert(envelope->queue, envelope);
- }
+ while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
+ rq_envelope_schedule(&ramqueue, evp);
}
-
}
static void
scheduler_ramqueue_remove(uint64_t evpid)
{
- struct rq_message *message;
- struct rq_envelope *envelope;
+ struct rq_message *msg;
+ struct rq_envelope *evp;
uint32_t msgid;
- struct evplist rmlist;
void *i;
+ currtime = time(NULL);
+
if (evpid > 0xffffffff) {
msgid = evpid_to_msgid(evpid);
- if ((message = tree_get(&ramqueue.messages, msgid)) == NULL)
- return;
- if ((envelope = tree_get(&message->envelopes, evpid)) == NULL)
+ if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
return;
- if (envelope->flags & RQ_ENVELOPE_INFLIGHT)
+ if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
return;
- rq_envelope_delete(&ramqueue, envelope);
- tree_xset(&ramqueue.removed, evpid, &ramqueue);
+ rq_envelope_remove(&ramqueue, evp);
}
else {
msgid = evpid;
- if ((message = tree_get(&ramqueue.messages, msgid)) == NULL)
+ if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
return;
-
- TAILQ_INIT(&rmlist);
i = NULL;
- while (tree_iter(&message->envelopes, &i, &evpid,
- (void*)(&envelope))) {
- if (envelope->flags & RQ_ENVELOPE_INFLIGHT)
- continue;
- tree_xset(&ramqueue.removed, evpid, &ramqueue);
- TAILQ_REMOVE(envelope->queue, envelope, entry);
- envelope->queue = &rmlist;
- TAILQ_INSERT_HEAD(&rmlist, envelope, entry);
- }
- while((envelope = TAILQ_FIRST(&rmlist)))
- rq_envelope_delete(&ramqueue, envelope);
+ while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
+ rq_envelope_remove(&ramqueue, evp);
}
}
@@ -506,9 +404,11 @@ static void
sorted_insert(struct evplist *list, struct rq_envelope *evp)
{
struct rq_envelope *item;
+ time_t ref;
TAILQ_FOREACH(item, list, entry) {
- if (evp->sched < item->sched) {
+ ref = (evp->sched < evp->expire) ? evp->sched : evp->expire;
+ if (ref <= item->expire && ref <= item->sched) {
TAILQ_INSERT_BEFORE(item, evp, entry);
return;
}
@@ -525,7 +425,6 @@ sorted_merge(struct evplist *list, struct evplist *from)
while ((e = TAILQ_LAST(from, evplist))) {
TAILQ_REMOVE(from, e, entry);
sorted_insert(list, e);
- e->queue = list;
}
}
@@ -533,14 +432,8 @@ static void
rq_queue_init(struct rq_queue *rq)
{
bzero(rq, sizeof *rq);
-
tree_init(&rq->messages);
- tree_init(&rq->expired);
- tree_init(&rq->removed);
- TAILQ_INIT(&rq->mda);
- TAILQ_INIT(&rq->mta);
- TAILQ_INIT(&rq->bounce);
- TAILQ_INIT(&rq->inflight);
+ TAILQ_INIT(&rq->pending);
}
static void
@@ -566,40 +459,98 @@ rq_queue_merge(struct rq_queue *rq, struct rq_queue *update)
free(message);
}
- sorted_merge(&rq->bounce, &update->bounce);
- sorted_merge(&rq->mda, &update->mda);
- sorted_merge(&rq->mta, &update->mta);
+ sorted_merge(&rq->pending, &update->pending);
+}
- tree_merge(&rq->expired, &update->expired);
- tree_merge(&rq->removed, &update->removed);
+static void
+rq_queue_schedule(struct rq_queue *rq)
+{
+ struct rq_envelope *evp;
+
+ while ((evp = TAILQ_FIRST(&rq->pending))) {
+ if (evp->sched > currtime && evp->expire > currtime)
+ break;
+
+ /* it *must* be pending */
+ if (evp->flags != RQ_ENVELOPE_PENDING)
+ errx(1, "evp:%016" PRIx64 " flags=0x%x", evp->evpid,
+ evp->flags);
+
+ if (evp->expire <= currtime) {
+ TAILQ_REMOVE(&rq->pending, evp, entry);
+ evp->flags &= ~RQ_ENVELOPE_PENDING;
+ evp->flags |= RQ_ENVELOPE_EXPIRED;
+ evp->flags |= RQ_ENVELOPE_SCHEDULED;
+ evp->t_scheduled = currtime;
+ evp->sched_next = rq->sched_expired;
+ rq->sched_expired = evp;
+ continue;
+ }
+ rq_envelope_schedule(rq, evp);
+ }
}
static void
-rq_envelope_delete(struct rq_queue *rq, struct rq_envelope *envelope)
+rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp)
{
- struct rq_message *message;
- uint32_t msgid;
+ if (evp->flags & (RQ_ENVELOPE_SCHEDULED | RQ_ENVELOPE_INFLIGHT))
+ return;
- if (envelope->flags & RQ_ENVELOPE_EXPIRED)
- tree_pop(&rq->expired, envelope->evpid);
+ if (evp->flags & RQ_ENVELOPE_PENDING)
+ TAILQ_REMOVE(&rq->pending, evp, entry);
- TAILQ_REMOVE(envelope->queue, envelope, entry);
- message = envelope->message;
- msgid = message->msgid;
+ if (evp->type == D_MTA) {
+ if (evp->message->sched_mta == NULL) {
+ evp->message->sched_next = rq->sched_mta;
+ rq->sched_mta = evp->message;
+ }
+ evp->sched_next = evp->message->sched_mta;
+ evp->message->sched_mta = evp;
+ }
+ else if (evp->type == D_MDA) {
+ evp->sched_next = rq->sched_mda;
+ rq->sched_mda = evp;
+ }
+ else if (evp->type == D_BOUNCE) {
+ evp->sched_next = rq->sched_bounce;
+ rq->sched_bounce = evp;
+ }
+ evp->flags &= ~RQ_ENVELOPE_PENDING;
+ evp->flags |= RQ_ENVELOPE_SCHEDULED;
+ evp->t_scheduled = currtime;
+}
- tree_xpop(&message->envelopes, envelope->evpid);
- if (tree_empty(&message->envelopes)) {
- tree_xpop(&rq->messages, msgid);
- free(message);
+static void
+rq_envelope_remove(struct rq_queue *rq, struct rq_envelope *evp)
+{
+ if (!(evp->flags & (RQ_ENVELOPE_PENDING)))
+ return;
+
+ TAILQ_REMOVE(&rq->pending, evp, entry);
+ evp->sched_next = rq->sched_removed;
+ rq->sched_removed = evp;
+ evp->flags &= ~RQ_ENVELOPE_PENDING;
+ evp->flags |= RQ_ENVELOPE_REMOVED;
+ evp->flags |= RQ_ENVELOPE_SCHEDULED;
+ evp->t_scheduled = currtime;
+}
+
+static void
+rq_envelope_delete(struct rq_queue *rq, struct rq_envelope *evp)
+{
+ tree_xpop(&evp->message->envelopes, evp->evpid);
+ if (tree_empty(&evp->message->envelopes)) {
+ tree_xpop(&rq->messages, evp->message->msgid);
+ free(evp->message);
stat_decrement("scheduler.ramqueue.message", 1);
}
- free(envelope);
+ free(evp);
stat_decrement("scheduler.ramqueue.envelope", 1);
}
static const char *
-rq_envelope_to_text(struct rq_envelope *e, time_t tref)
+rq_envelope_to_text(struct rq_envelope *e)
{
static char buf[256];
char t[64];
@@ -613,15 +564,28 @@ rq_envelope_to_text(struct rq_envelope *e, time_t tref)
else if (e->type == D_MTA)
strlcat(buf, "mta", sizeof buf);
- snprintf(t, sizeof t, ",sched=%s", duration_to_text(e->sched - tref));
- strlcat(buf, t, sizeof buf);
- snprintf(t, sizeof t, ",exp=%s", duration_to_text(e->expire - tref));
+ snprintf(t, sizeof t, ",expire=%s", duration_to_text(e->expire - currtime));
strlcat(buf, t, sizeof buf);
+ if (e->flags & RQ_ENVELOPE_PENDING) {
+ snprintf(t, sizeof t, ",pending=%s",
+ duration_to_text(e->sched - currtime));
+ strlcat(buf, t, sizeof buf);
+ }
+ if (e->flags & RQ_ENVELOPE_SCHEDULED) {
+ snprintf(t, sizeof t, ",scheduled=%s",
+ duration_to_text(currtime - e->t_scheduled));
+ strlcat(buf, t, sizeof buf);
+ }
+ if (e->flags & RQ_ENVELOPE_INFLIGHT) {
+ snprintf(t, sizeof t, ",inflight=%s",
+ duration_to_text(currtime - e->t_inflight));
+ strlcat(buf, t, sizeof buf);
+ }
+ if (e->flags & RQ_ENVELOPE_REMOVED)
+ strlcat(buf, ",removed", sizeof buf);
if (e->flags & RQ_ENVELOPE_EXPIRED)
strlcat(buf, ",expired", sizeof buf);
- if (e->flags & RQ_ENVELOPE_INFLIGHT)
- strlcat(buf, ",in-flight", sizeof buf);
strlcat(buf, "]", sizeof buf);
@@ -629,7 +593,7 @@ rq_envelope_to_text(struct rq_envelope *e, time_t tref)
}
static void
-rq_queue_dump(struct rq_queue *rq, const char * name, time_t tref)
+rq_queue_dump(struct rq_queue *rq, const char * name)
{
struct rq_message *message;
struct rq_envelope *envelope;
@@ -644,20 +608,8 @@ rq_queue_dump(struct rq_queue *rq, const char * name, time_t tref)
j = NULL;
while((tree_iter(&message->envelopes, &j, &id,
(void*)&envelope)))
- log_debug("| %s", rq_envelope_to_text(envelope, tref));
+ log_debug("| %s",
+ rq_envelope_to_text(envelope));
}
-
- log_debug("| bounces:");
- TAILQ_FOREACH(envelope, &rq->bounce, entry)
- log_debug("| %s", rq_envelope_to_text(envelope, tref));
- log_debug("| mda:");
- TAILQ_FOREACH(envelope, &rq->mda, entry)
- log_debug("| %s", rq_envelope_to_text(envelope, tref));
- log_debug("| mta:");
- TAILQ_FOREACH(envelope, &rq->mta, entry)
- log_debug("| %s", rq_envelope_to_text(envelope, tref));
- log_debug("| in-flight:");
- TAILQ_FOREACH(envelope, &rq->inflight, entry)
- log_debug("| %s", rq_envelope_to_text(envelope, tref));
log_debug("\\---");
}
diff --git a/usr.sbin/smtpd/smtpd.h b/usr.sbin/smtpd/smtpd.h
index 6737076a756..9847456ed3a 100644
--- a/usr.sbin/smtpd/smtpd.h
+++ b/usr.sbin/smtpd/smtpd.h
@@ -1,4 +1,4 @@
-/* $OpenBSD: smtpd.h,v 1.346 2012/09/01 16:25:27 gilles Exp $ */
+/* $OpenBSD: smtpd.h,v 1.347 2012/09/11 08:37:52 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -894,7 +894,7 @@ struct scheduler_backend {
void (*update)(struct scheduler_info *);
void (*delete)(uint64_t);
- void (*batch)(int, time_t, struct scheduler_batch *);
+ void (*batch)(int, struct scheduler_batch *);
void (*schedule)(uint64_t);
void (*remove)(uint64_t);