summaryrefslogtreecommitdiffstats
path: root/usr.sbin/smtpd/scheduler_ramqueue.c
diff options
context:
space:
mode:
authoreric <eric@openbsd.org>2012-11-20 09:47:45 +0000
committereric <eric@openbsd.org>2012-11-20 09:47:45 +0000
commit4fe02f32a80cb9b3c25e45b3b7ba3f76b64f896b (patch)
tree257fece714ecc5fa1072d30f7a1705da6342314f /usr.sbin/smtpd/scheduler_ramqueue.c
parentDo not crash on stray .Ta macros found outside column lists. (diff)
downloadwireguard-openbsd-4fe02f32a80cb9b3c25e45b3b7ba3f76b64f896b.tar.xz
wireguard-openbsd-4fe02f32a80cb9b3c25e45b3b7ba3f76b64f896b.zip
Allow "smtpctl show queue" to run in "online" mode if the smtpd server
is running. The scheduler sends the runtime state of each envelope to the queue process which loads the envelope, fills the runtime bits and sends the envelope back to the client. Iteration over the envelope set happens in small chunks to make the request interruptible and to allow the server to keep doing its job in the meantime. Adpat "smtpctl schedule-all" to schedule the messages one by one using the same iteration mechanism. Document "smtpctl monitor" and "smtpctl show queue". ok gilles@
Diffstat (limited to 'usr.sbin/smtpd/scheduler_ramqueue.c')
-rw-r--r--usr.sbin/smtpd/scheduler_ramqueue.c259
1 files changed, 179 insertions, 80 deletions
diff --git a/usr.sbin/smtpd/scheduler_ramqueue.c b/usr.sbin/smtpd/scheduler_ramqueue.c
index 055d2c8b4d2..e073c4b35d1 100644
--- a/usr.sbin/smtpd/scheduler_ramqueue.c
+++ b/usr.sbin/smtpd/scheduler_ramqueue.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: scheduler_ramqueue.c,v 1.24 2012/11/12 14:58:53 eric Exp $ */
+/* $OpenBSD: scheduler_ramqueue.c,v 1.25 2012/11/20 09:47:45 eric Exp $ */
/*
* Copyright (c) 2012 Gilles Chehade <gilles@openbsd.org>
@@ -42,8 +42,8 @@ TAILQ_HEAD(evplist, rq_envelope);
struct rq_message {
uint32_t msgid;
struct tree envelopes;
- struct rq_message *sched_next;
- struct rq_envelope *sched_mta;
+ struct rq_message *q_next;
+ struct evplist q_mta;
};
struct rq_envelope {
@@ -64,7 +64,6 @@ struct rq_envelope {
struct rq_message *message;
- struct rq_envelope *sched_next;
time_t t_inflight;
time_t t_scheduled;
};
@@ -73,14 +72,14 @@ struct rq_queue {
size_t evpcount;
struct tree messages;
- struct evplist pending;
-
- struct rq_message *sched_mta;
- struct rq_envelope *sched_mda;
- struct rq_envelope *sched_bounce;
- struct rq_envelope *sched_expired;
- struct rq_envelope *sched_removed;
+ struct evplist q_pending;
+ struct evplist q_inflight;
+ struct rq_message *q_mtabatch;
+ struct evplist q_mda;
+ struct evplist q_bounce;
+ struct evplist q_expired;
+ struct evplist q_removed;
};
static void scheduler_ramqueue_init(void);
@@ -90,6 +89,8 @@ static size_t scheduler_ramqueue_rollback(uint32_t);
static void scheduler_ramqueue_update(struct scheduler_info *);
static void scheduler_ramqueue_delete(uint64_t);
static void scheduler_ramqueue_batch(int, struct scheduler_batch *);
+static size_t scheduler_ramqueue_messages(uint32_t, uint32_t *, size_t);
+static size_t scheduler_ramqueue_envelopes(uint64_t, struct evpstate *, size_t);
static void scheduler_ramqueue_schedule(uint64_t);
static void scheduler_ramqueue_remove(uint64_t);
@@ -117,6 +118,8 @@ struct scheduler_backend scheduler_backend_ramqueue = {
scheduler_ramqueue_batch,
+ scheduler_ramqueue_messages,
+ scheduler_ramqueue_envelopes,
scheduler_ramqueue_schedule,
scheduler_ramqueue_remove,
};
@@ -124,7 +127,7 @@ struct scheduler_backend scheduler_backend_ramqueue = {
static struct rq_queue ramqueue;
static struct tree updates;
-static time_t currtime;
+static time_t currtime;
extern int verbose;
@@ -159,6 +162,7 @@ scheduler_ramqueue_insert(struct scheduler_info *si)
if ((message = tree_get(&update->messages, msgid)) == NULL) {
message = xcalloc(1, sizeof *message, "scheduler_insert");
message->msgid = msgid;
+ TAILQ_INIT(&message->q_mta);
tree_init(&message->envelopes);
tree_xset(&update->messages, msgid, message);
stat_increment("scheduler.ramqueue.message", 1);
@@ -177,7 +181,7 @@ scheduler_ramqueue_insert(struct scheduler_info *si)
stat_increment("scheduler.ramqueue.envelope", 1);
envelope->flags = RQ_ENVELOPE_PENDING;
- sorted_insert(&update->pending, envelope);
+ sorted_insert(&update->q_pending, envelope);
}
static size_t
@@ -195,6 +199,10 @@ scheduler_ramqueue_commit(uint32_t msgid)
rq_queue_dump(update, "update to commit");
rq_queue_merge(&ramqueue, update);
+
+ if (verbose & TRACE_SCHEDULER)
+ rq_queue_dump(&ramqueue, "resulting queue");
+
rq_queue_schedule(&ramqueue);
free(update);
@@ -216,8 +224,8 @@ scheduler_ramqueue_rollback(uint32_t msgid)
return (0);
r = update->evpcount;
- while ((evp = TAILQ_FIRST(&update->pending))) {
- TAILQ_REMOVE(&update->pending, evp, entry);
+ while ((evp = TAILQ_FIRST(&update->q_pending))) {
+ TAILQ_REMOVE(&update->q_pending, evp, entry);
rq_envelope_delete(update, evp);
}
@@ -247,9 +255,10 @@ scheduler_ramqueue_update(struct scheduler_info *si)
while ((evp->sched = scheduler_compute_schedule(si)) <= currtime)
si->retry += 1;
+ TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
+ sorted_insert(&ramqueue.q_pending, evp);
evp->flags &= ~RQ_ENVELOPE_INFLIGHT;
evp->flags |= RQ_ENVELOPE_PENDING;
- sorted_insert(&ramqueue.pending, evp);
}
static void
@@ -269,6 +278,7 @@ scheduler_ramqueue_delete(uint64_t evpid)
if (!(evp->flags & RQ_ENVELOPE_INFLIGHT))
errx(1, "evp:%016" PRIx64 " not in-flight", evpid);
+ TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
evp->flags &= ~RQ_ENVELOPE_INFLIGHT;
rq_envelope_delete(&ramqueue, evp);
}
@@ -276,7 +286,9 @@ scheduler_ramqueue_delete(uint64_t evpid)
static void
scheduler_ramqueue_batch(int typemask, struct scheduler_batch *ret)
{
- struct rq_envelope *evp, *tmp, **batch;
+ struct evplist *q;
+ struct rq_envelope *evp;
+ struct rq_message *msg;
struct id_list *item;
currtime = time(NULL);
@@ -285,28 +297,30 @@ scheduler_ramqueue_batch(int typemask, struct scheduler_batch *ret)
if (verbose & TRACE_SCHEDULER)
rq_queue_dump(&ramqueue, "scheduler_ramqueue_batch()");
- if (typemask & SCHED_REMOVE && ramqueue.sched_removed) {
- batch = &ramqueue.sched_removed;
+ if (typemask & SCHED_REMOVE && TAILQ_FIRST(&ramqueue.q_removed)) {
+ q = &ramqueue.q_removed;
ret->type = SCHED_REMOVE;
}
- else if (typemask & SCHED_EXPIRE && ramqueue.sched_expired) {
- batch = &ramqueue.sched_expired;
+ else if (typemask & SCHED_EXPIRE && TAILQ_FIRST(&ramqueue.q_expired)) {
+ q = &ramqueue.q_expired;
ret->type = SCHED_EXPIRE;
}
- else if (typemask & SCHED_BOUNCE && ramqueue.sched_bounce) {
- batch = &ramqueue.sched_bounce;
+ else if (typemask & SCHED_BOUNCE && TAILQ_FIRST(&ramqueue.q_bounce)) {
+ q = &ramqueue.q_bounce;
ret->type = SCHED_BOUNCE;
}
- else if (typemask & SCHED_MDA && ramqueue.sched_mda) {
- batch = &ramqueue.sched_mda;
+ else if (typemask & SCHED_MDA && TAILQ_FIRST(&ramqueue.q_mda)) {
+ q = &ramqueue.q_mda;
ret->type = SCHED_MDA;
}
- else if (typemask & SCHED_MTA && ramqueue.sched_mta) {
- batch = &ramqueue.sched_mta->sched_mta;
- ramqueue.sched_mta = ramqueue.sched_mta->sched_next;
+ else if (typemask & SCHED_MTA && ramqueue.q_mtabatch) {
+ msg = ramqueue.q_mtabatch;
+ ramqueue.q_mtabatch = msg->q_next;
+ msg->q_next = NULL;
+ q = &msg->q_mta;
ret->type = SCHED_MTA;
}
- else if ((evp = TAILQ_FIRST(&ramqueue.pending))) {
+ else if ((evp = TAILQ_FIRST(&ramqueue.q_pending))) {
ret->type = SCHED_DELAY;
if (evp->sched < evp->expire)
ret->delay = evp->sched - currtime;
@@ -321,8 +335,10 @@ scheduler_ramqueue_batch(int typemask, struct scheduler_batch *ret)
ret->evpids = NULL;
ret->evpcount = 0;
- for(evp = *batch; evp; evp = tmp) {
- tmp = evp->sched_next;
+
+ while ((evp = TAILQ_FIRST(q))) {
+
+ TAILQ_REMOVE(q, evp, entry);
/* consistency check */
if (!(evp->flags & RQ_ENVELOPE_SCHEDULED))
@@ -332,18 +348,17 @@ scheduler_ramqueue_batch(int typemask, struct scheduler_batch *ret)
item->id = evp->evpid;
item->next = ret->evpids;
ret->evpids = item;
- evp->sched_next = NULL;
+
if (ret->type == SCHED_REMOVE || ret->type == SCHED_EXPIRE)
rq_envelope_delete(&ramqueue, evp);
else {
+ TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry);
evp->flags &= ~RQ_ENVELOPE_SCHEDULED;
evp->flags |= RQ_ENVELOPE_INFLIGHT;
evp->t_inflight = currtime;
}
ret->evpcount++;
}
-
- *batch = NULL;
}
static void
@@ -352,26 +367,18 @@ scheduler_ramqueue_schedule(uint64_t evpid)
struct rq_message *msg;
struct rq_envelope *evp;
uint32_t msgid;
- void *i, *j;
+ void *i;
currtime = time(NULL);
- if (evpid == 0) {
- j = NULL;
- while (tree_iter(&ramqueue.messages, &j, NULL, (void*)(&msg))) {
- i = NULL;
- while (tree_iter(&msg->envelopes, &i, NULL,
- (void*)(&evp)))
- rq_envelope_schedule(&ramqueue, evp);
- }
- }
- else if (evpid > 0xffffffff) {
+ if (evpid > 0xffffffff) {
msgid = evpid_to_msgid(evpid);
if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
return;
if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
return;
- rq_envelope_schedule(&ramqueue, evp);
+ if (evp->flags & RQ_ENVELOPE_PENDING)
+ rq_envelope_schedule(&ramqueue, evp);
}
else {
msgid = evpid;
@@ -379,7 +386,8 @@ scheduler_ramqueue_schedule(uint64_t evpid)
return;
i = NULL;
while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
- rq_envelope_schedule(&ramqueue, evp);
+ if (evp->flags & RQ_ENVELOPE_PENDING)
+ rq_envelope_schedule(&ramqueue, evp);
}
}
@@ -411,6 +419,62 @@ scheduler_ramqueue_remove(uint64_t evpid)
}
}
+static size_t
+scheduler_ramqueue_messages(uint32_t from, uint32_t *dst, size_t size)
+{
+ uint64_t id;
+ size_t n;
+ void *i;
+
+ for (n = 0, i = NULL; n < size; n++) {
+ if (tree_iterfrom(&ramqueue.messages, &i, from, &id, NULL) == 0)
+ break;
+ dst[n] = id;
+ }
+
+ return (n);
+}
+
+static size_t
+scheduler_ramqueue_envelopes(uint64_t from, struct evpstate *dst, size_t size)
+{
+ struct rq_message *msg;
+ struct rq_envelope *evp;
+ void *i;
+ size_t n;
+
+ if ((msg = tree_get(&ramqueue.messages, evpid_to_msgid(from))) == NULL)
+ return (0);
+
+ for (n = 0, i = NULL; n < size; ) {
+
+ if (tree_iterfrom(&msg->envelopes, &i, from, NULL,
+ (void**)&evp) == 0)
+ break;
+
+ if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED))
+ continue;
+
+ dst[n].retry = 0;
+ dst[n].evpid = evp->evpid;
+ if (evp->flags & RQ_ENVELOPE_PENDING) {
+ dst[n].time = evp->sched;
+ dst[n].flags = DF_PENDING;
+ }
+ else if (evp->flags & RQ_ENVELOPE_SCHEDULED) {
+ dst[n].time = evp->t_scheduled;
+ dst[n].flags = DF_PENDING;
+ }
+ else if (evp->flags & RQ_ENVELOPE_INFLIGHT) {
+ dst[n].time = evp->t_inflight;
+ dst[n].flags = DF_INFLIGHT;
+ }
+ n++;
+ }
+
+ return (n);
+}
+
static void
sorted_insert(struct evplist *list, struct rq_envelope *evp)
{
@@ -444,7 +508,12 @@ rq_queue_init(struct rq_queue *rq)
{
bzero(rq, sizeof *rq);
tree_init(&rq->messages);
- TAILQ_INIT(&rq->pending);
+ TAILQ_INIT(&rq->q_pending);
+ TAILQ_INIT(&rq->q_inflight);
+ TAILQ_INIT(&rq->q_mda);
+ TAILQ_INIT(&rq->q_bounce);
+ TAILQ_INIT(&rq->q_expired);
+ TAILQ_INIT(&rq->q_removed);
}
static void
@@ -463,7 +532,7 @@ rq_queue_merge(struct rq_queue *rq, struct rq_queue *update)
}
/* need to re-link all envelopes before merging them */
i = NULL;
- while((tree_iter(&message->envelopes, &i, &id,
+ while ((tree_iter(&message->envelopes, &i, &id,
(void*)&envelope)))
envelope->message = tomessage;
tree_merge(&tomessage->envelopes, &message->envelopes);
@@ -471,7 +540,8 @@ rq_queue_merge(struct rq_queue *rq, struct rq_queue *update)
stat_decrement("scheduler.ramqueue.message", 1);
}
- sorted_merge(&rq->pending, &update->pending);
+ sorted_merge(&rq->q_pending, &update->q_pending);
+ rq->evpcount += update->evpcount;
}
static void
@@ -479,23 +549,21 @@ rq_queue_schedule(struct rq_queue *rq)
{
struct rq_envelope *evp;
- while ((evp = TAILQ_FIRST(&rq->pending))) {
+ while ((evp = TAILQ_FIRST(&rq->q_pending))) {
if (evp->sched > currtime && evp->expire > currtime)
break;
- /* it *must* be pending */
if (evp->flags != RQ_ENVELOPE_PENDING)
errx(1, "evp:%016" PRIx64 " flags=0x%x", evp->evpid,
evp->flags);
if (evp->expire <= currtime) {
- TAILQ_REMOVE(&rq->pending, evp, entry);
+ TAILQ_REMOVE(&rq->q_pending, evp, entry);
+ TAILQ_INSERT_TAIL(&rq->q_expired, evp, entry);
evp->flags &= ~RQ_ENVELOPE_PENDING;
evp->flags |= RQ_ENVELOPE_EXPIRED;
evp->flags |= RQ_ENVELOPE_SCHEDULED;
evp->t_scheduled = currtime;
- evp->sched_next = rq->sched_expired;
- rq->sched_expired = evp;
continue;
}
rq_envelope_schedule(rq, evp);
@@ -505,28 +573,22 @@ rq_queue_schedule(struct rq_queue *rq)
static void
rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp)
{
- if (evp->flags & (RQ_ENVELOPE_SCHEDULED | RQ_ENVELOPE_INFLIGHT))
- return;
-
- if (evp->flags & RQ_ENVELOPE_PENDING)
- TAILQ_REMOVE(&rq->pending, evp, entry);
+ struct evplist *q = NULL;
if (evp->type == D_MTA) {
- if (evp->message->sched_mta == NULL) {
- evp->message->sched_next = rq->sched_mta;
- rq->sched_mta = evp->message;
+ if (TAILQ_EMPTY(&evp->message->q_mta)) {
+ evp->message->q_next = rq->q_mtabatch;
+ rq->q_mtabatch = evp->message;
}
- evp->sched_next = evp->message->sched_mta;
- evp->message->sched_mta = evp;
- }
- else if (evp->type == D_MDA) {
- evp->sched_next = rq->sched_mda;
- rq->sched_mda = evp;
- }
- else if (evp->type == D_BOUNCE) {
- evp->sched_next = rq->sched_bounce;
- rq->sched_bounce = evp;
+ q = &evp->message->q_mta;
}
+ else if (evp->type == D_MDA)
+ q = &rq->q_mda;
+ else if (evp->type == D_BOUNCE)
+ q = &rq->q_bounce;
+
+ TAILQ_REMOVE(&rq->q_pending, evp, entry);
+ TAILQ_INSERT_TAIL(q, evp, entry);
evp->flags &= ~RQ_ENVELOPE_PENDING;
evp->flags |= RQ_ENVELOPE_SCHEDULED;
evp->t_scheduled = currtime;
@@ -535,16 +597,51 @@ rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp)
static void
rq_envelope_remove(struct rq_queue *rq, struct rq_envelope *evp)
{
- if (!(evp->flags & (RQ_ENVELOPE_PENDING)))
+ struct rq_message *m;
+ struct evplist *q = NULL;
+
+ if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED))
+ return;
+ /*
+ * For now we just ignore it, but we could mark the envelope for
+ * removal and possibly send a cancellation to the agent.
+ */
+ if (evp->flags & (RQ_ENVELOPE_INFLIGHT))
return;
- TAILQ_REMOVE(&rq->pending, evp, entry);
- evp->sched_next = rq->sched_removed;
- rq->sched_removed = evp;
+ if (evp->flags & RQ_ENVELOPE_SCHEDULED) {
+ if (evp->type == D_MTA)
+ q = &evp->message->q_mta;
+ else if (evp->type == D_MDA)
+ q = &rq->q_mda;
+ else if (evp->type == D_BOUNCE)
+ q = &rq->q_bounce;
+ } else
+ q = &rq->q_pending;
+
+ TAILQ_REMOVE(q, evp, entry);
+ TAILQ_INSERT_TAIL(&rq->q_removed, evp, entry);
evp->flags &= ~RQ_ENVELOPE_PENDING;
evp->flags |= RQ_ENVELOPE_REMOVED;
evp->flags |= RQ_ENVELOPE_SCHEDULED;
evp->t_scheduled = currtime;
+
+ /*
+ * We might need to unschedule the message if it was the only
+ * scheduled envelope
+ */
+ if (q == &evp->message->q_mta && TAILQ_EMPTY(q)) {
+ if (rq->q_mtabatch == evp->message)
+ rq->q_mtabatch = evp->message->q_next;
+ else {
+ for (m = rq->q_mtabatch; m->q_next; m = m->q_next)
+ if (m->q_next == evp->message) {
+ m->q_next = evp->message->q_next;
+ break;
+ }
+ }
+ evp->message->q_next = NULL;
+ }
}
static void
@@ -558,6 +655,7 @@ rq_envelope_delete(struct rq_queue *rq, struct rq_envelope *evp)
}
free(evp);
+ rq->evpcount--;
stat_decrement("scheduler.ramqueue.envelope", 1);
}
@@ -576,7 +674,8 @@ rq_envelope_to_text(struct rq_envelope *e)
else if (e->type == D_MTA)
strlcat(buf, "mta", sizeof buf);
- snprintf(t, sizeof t, ",expire=%s", duration_to_text(e->expire - currtime));
+ snprintf(t, sizeof t, ",expire=%s",
+ duration_to_text(e->expire - currtime));
strlcat(buf, t, sizeof buf);
if (e->flags & RQ_ENVELOPE_PENDING) {
@@ -615,10 +714,10 @@ rq_queue_dump(struct rq_queue *rq, const char * name)
log_debug("debug: /--- ramqueue: %s", name);
i = NULL;
- while((tree_iter(&rq->messages, &i, &id, (void*)&message))) {
+ while ((tree_iter(&rq->messages, &i, &id, (void*)&message))) {
log_debug("debug: | msg:%08" PRIx32, message->msgid);
j = NULL;
- while((tree_iter(&message->envelopes, &j, &id,
+ while ((tree_iter(&message->envelopes, &j, &id,
(void*)&envelope)))
log_debug("debug: | %s",
rq_envelope_to_text(envelope));