summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--usr.sbin/smtpd/control.c40
-rw-r--r--usr.sbin/smtpd/queue.c36
-rw-r--r--usr.sbin/smtpd/scheduler.c64
-rw-r--r--usr.sbin/smtpd/scheduler_ramqueue.c259
-rw-r--r--usr.sbin/smtpd/smtpctl.8101
-rw-r--r--usr.sbin/smtpd/smtpctl.c251
-rw-r--r--usr.sbin/smtpd/smtpd.c4
-rw-r--r--usr.sbin/smtpd/smtpd.h21
-rw-r--r--usr.sbin/smtpd/tree.c34
9 files changed, 632 insertions, 178 deletions
diff --git a/usr.sbin/smtpd/control.c b/usr.sbin/smtpd/control.c
index e1eb8a925a9..86de6d06357 100644
--- a/usr.sbin/smtpd/control.c
+++ b/usr.sbin/smtpd/control.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: control.c,v 1.78 2012/11/12 14:58:53 eric Exp $ */
+/* $OpenBSD: control.c,v 1.79 2012/11/20 09:47:45 eric Exp $ */
/*
* Copyright (c) 2012 Gilles Chehade <gilles@openbsd.org>
@@ -94,6 +94,28 @@ control_imsg(struct imsgev *iev, struct imsg *imsg)
return;
}
}
+ if (iev->proc == PROC_SCHEDULER) {
+ switch (imsg->hdr.type) {
+ case IMSG_SCHEDULER_MESSAGES:
+ c = control_connbyfd(imsg->hdr.peerid);
+ if (c == NULL)
+ return;
+ imsg_compose_event(&c->iev, IMSG_SCHEDULER_MESSAGES, 0,
+ 0, -1, imsg->data, imsg->hdr.len-sizeof imsg->hdr);
+ return;
+ }
+ }
+ if (iev->proc == PROC_QUEUE) {
+ switch (imsg->hdr.type) {
+ case IMSG_SCHEDULER_ENVELOPES:
+ c = control_connbyfd(imsg->hdr.peerid);
+ if (c == NULL)
+ return;
+ imsg_compose_event(&c->iev, IMSG_SCHEDULER_ENVELOPES, 0,
+ 0, -1, imsg->data, imsg->hdr.len-sizeof imsg->hdr);
+ return;
+ }
+ }
switch (imsg->hdr.type) {
case IMSG_STAT_INCREMENT:
@@ -591,6 +613,22 @@ control_dispatch_ext(int fd, short event, void *arg)
imsg_compose_event(&c->iev, IMSG_CTL_OK, 0, 0, -1, NULL, 0);
break;
+ case IMSG_SCHEDULER_MESSAGES:
+ if (euid)
+ goto badcred;
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
+ IMSG_SCHEDULER_MESSAGES, fd, 0, -1, imsg.data,
+ imsg.hdr.len - sizeof(imsg.hdr));
+ break;
+
+ case IMSG_SCHEDULER_ENVELOPES:
+ if (euid)
+ goto badcred;
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
+ IMSG_SCHEDULER_ENVELOPES, fd, 0, -1, imsg.data,
+ imsg.hdr.len - sizeof(imsg.hdr));
+ break;
+
case IMSG_SCHEDULER_SCHEDULE:
if (euid)
goto badcred;
diff --git a/usr.sbin/smtpd/queue.c b/usr.sbin/smtpd/queue.c
index a4f5a19ecec..8bde4360274 100644
--- a/usr.sbin/smtpd/queue.c
+++ b/usr.sbin/smtpd/queue.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: queue.c,v 1.142 2012/11/13 13:23:23 eric Exp $ */
+/* $OpenBSD: queue.c,v 1.143 2012/11/20 09:47:45 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -51,6 +51,7 @@ static void queue_sig_handler(int, short, void *);
static void
queue_imsg(struct imsgev *iev, struct imsg *imsg)
{
+ struct evpstate *state;
static uint64_t batch_id;
struct submit_status ss;
struct envelope *e, evp;
@@ -197,7 +198,38 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg)
IMSG_BATCH_CLOSE, 0, 0, -1,
&batch_id, sizeof batch_id);
return;
- }
+
+ case IMSG_SCHEDULER_ENVELOPES:
+ if (imsg->hdr.len == sizeof imsg->hdr) {
+ imsg_compose_event(env->sc_ievs[PROC_CONTROL],
+ IMSG_SCHEDULER_ENVELOPES, imsg->hdr.peerid,
+ 0, -1, NULL, 0);
+ return;
+ }
+ state = imsg->data;
+ if (queue_envelope_load(state->evpid, &evp) == 0)
+ return; /* Envelope is gone, drop it */
+ /*
+ * XXX consistency: The envelope might already be on
+ * its way back to the scheduler. We need to detect
+ * this properly and report that state.
+ */
+ evp.flags |= state->flags;
+ /* In the past if running or runnable */
+ evp.nexttry = state->time;
+ if (state->flags == DF_INFLIGHT) {
+ /*
+ * Not exactly correct but pretty close: The
+ * value is not recorded on the envelope unless
+ * a tempfail occurs.
+ */
+ evp.lasttry = state->time;
+ }
+ imsg_compose_event(env->sc_ievs[PROC_CONTROL],
+ IMSG_SCHEDULER_ENVELOPES, imsg->hdr.peerid, 0, -1,
+ &evp, sizeof evp);
+ return;
+ }
}
if (iev->proc == PROC_MTA || iev->proc == PROC_MDA) {
diff --git a/usr.sbin/smtpd/scheduler.c b/usr.sbin/smtpd/scheduler.c
index 44f0f15eec1..31cddeecaef 100644
--- a/usr.sbin/smtpd/scheduler.c
+++ b/usr.sbin/smtpd/scheduler.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: scheduler.c,v 1.23 2012/11/12 14:58:53 eric Exp $ */
+/* $OpenBSD: scheduler.c,v 1.24 2012/11/20 09:47:45 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -60,14 +60,18 @@ static struct scheduler_backend *backend = NULL;
extern const char *backend_scheduler;
+#define MSGBATCHSIZE 1024
+#define EVPBATCHSIZE 256
+
void
scheduler_imsg(struct imsgev *iev, struct imsg *imsg)
{
+ struct evpstate state[EVPBATCHSIZE];
struct envelope *e;
struct scheduler_info si;
uint64_t id;
- uint32_t msgid;
- size_t n;
+ uint32_t msgid, msgids[MSGBATCHSIZE];
+ size_t n, i;
switch (imsg->hdr.type) {
@@ -169,14 +173,33 @@ scheduler_imsg(struct imsgev *iev, struct imsg *imsg)
log_verbose(*(int *)imsg->data);
return;
+ case IMSG_SCHEDULER_MESSAGES:
+ msgid = *(uint32_t *)(imsg->data);
+ n = backend->messages(msgid, msgids, MSGBATCHSIZE);
+ imsg_compose_event(iev, IMSG_SCHEDULER_MESSAGES,
+ imsg->hdr.peerid, 0, -1, msgids, n * sizeof (*msgids));
+ return;
+
+ case IMSG_SCHEDULER_ENVELOPES:
+ id = *(uint64_t *)(imsg->data);
+ n = backend->envelopes(id, state, EVPBATCHSIZE);
+ for (i = 0; i < n; i++) {
+ imsg_compose_event(env->sc_ievs[PROC_QUEUE],
+ IMSG_SCHEDULER_ENVELOPES, imsg->hdr.peerid, 0, -1,
+ &state[i], sizeof state[i]);
+ }
+ imsg_compose_event(env->sc_ievs[PROC_QUEUE],
+ IMSG_SCHEDULER_ENVELOPES, imsg->hdr.peerid, 0, -1, NULL, 0);
+ return;
+
case IMSG_SCHEDULER_SCHEDULE:
id = *(uint64_t *)(imsg->data);
- if (id == 0)
- log_debug("debug: scheduler: scheduling all envelopes");
- else if (id <= 0xffffffffL)
- log_debug("debug: scheduler: scheduling msg:%08" PRIx64, id);
+ if (id <= 0xffffffffL)
+ log_debug("debug: scheduler: "
+ "scheduling msg:%08" PRIx64, id);
else
- log_debug("debug: scheduler: scheduling evp:%016" PRIx64, id);
+ log_debug("debug: scheduler: "
+ "scheduling evp:%016" PRIx64, id);
backend->schedule(id);
scheduler_reset_events();
return;
@@ -184,15 +207,18 @@ scheduler_imsg(struct imsgev *iev, struct imsg *imsg)
case IMSG_SCHEDULER_REMOVE:
id = *(uint64_t *)(imsg->data);
if (id <= 0xffffffffL)
- log_debug("debug: scheduler: removing msg:%08" PRIx64, id);
+ log_debug("debug: scheduler: "
+ "removing msg:%08" PRIx64, id);
else
- log_debug("debug: scheduler: removing evp:%016" PRIx64, id);
+ log_debug("debug: scheduler: "
+ "removing evp:%016" PRIx64, id);
backend->remove(id);
scheduler_reset_events();
return;
}
- errx(1, "scheduler_imsg: unexpected %s imsg", imsg_to_str(imsg->hdr.type));
+ errx(1, "scheduler_imsg: unexpected %s imsg",
+ imsg_to_str(imsg->hdr.type));
}
static void
@@ -349,7 +375,7 @@ scheduler_timeout(int fd, short event, void *p)
fatalx("scheduler_timeout: unknown batch type");
}
- evtimer_add(&env->sc_ev, &tv);
+ evtimer_add(&env->sc_ev, &tv);
}
static void
@@ -395,8 +421,8 @@ scheduler_process_bounce(struct scheduler_batch *batch)
while ((e = batch->evpids)) {
batch->evpids = e->next;
- log_debug("debug: scheduler: evp:%016" PRIx64 " scheduled (bounce)",
- e->id);
+ log_debug("debug: scheduler: evp:%016" PRIx64
+ " scheduled (bounce)", e->id);
imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_SMTP_ENQUEUE,
0, 0, -1, &e->id, sizeof e->id);
free(e);
@@ -412,8 +438,8 @@ scheduler_process_mda(struct scheduler_batch *batch)
while ((e = batch->evpids)) {
batch->evpids = e->next;
- log_debug("debug: scheduler: evp:%016" PRIx64 " scheduled (mda)",
- e->id);
+ log_debug("debug: scheduler: evp:%016" PRIx64
+ " scheduled (mda)", e->id);
imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_MDA_SESS_NEW,
0, 0, -1, &e->id, sizeof e->id);
free(e);
@@ -425,15 +451,15 @@ scheduler_process_mda(struct scheduler_batch *batch)
static void
scheduler_process_mta(struct scheduler_batch *batch)
{
- struct id_list *e;
+ struct id_list *e;
imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_BATCH_CREATE,
0, 0, -1, NULL, 0);
while ((e = batch->evpids)) {
batch->evpids = e->next;
- log_debug("debug: scheduler: evp:%016" PRIx64 " scheduled (mta)",
- e->id);
+ log_debug("debug: scheduler: evp:%016" PRIx64
+ " scheduled (mta)", e->id);
imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_BATCH_APPEND,
0, 0, -1, &e->id, sizeof e->id);
free(e);
diff --git a/usr.sbin/smtpd/scheduler_ramqueue.c b/usr.sbin/smtpd/scheduler_ramqueue.c
index 055d2c8b4d2..e073c4b35d1 100644
--- a/usr.sbin/smtpd/scheduler_ramqueue.c
+++ b/usr.sbin/smtpd/scheduler_ramqueue.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: scheduler_ramqueue.c,v 1.24 2012/11/12 14:58:53 eric Exp $ */
+/* $OpenBSD: scheduler_ramqueue.c,v 1.25 2012/11/20 09:47:45 eric Exp $ */
/*
* Copyright (c) 2012 Gilles Chehade <gilles@openbsd.org>
@@ -42,8 +42,8 @@ TAILQ_HEAD(evplist, rq_envelope);
struct rq_message {
uint32_t msgid;
struct tree envelopes;
- struct rq_message *sched_next;
- struct rq_envelope *sched_mta;
+ struct rq_message *q_next;
+ struct evplist q_mta;
};
struct rq_envelope {
@@ -64,7 +64,6 @@ struct rq_envelope {
struct rq_message *message;
- struct rq_envelope *sched_next;
time_t t_inflight;
time_t t_scheduled;
};
@@ -73,14 +72,14 @@ struct rq_queue {
size_t evpcount;
struct tree messages;
- struct evplist pending;
-
- struct rq_message *sched_mta;
- struct rq_envelope *sched_mda;
- struct rq_envelope *sched_bounce;
- struct rq_envelope *sched_expired;
- struct rq_envelope *sched_removed;
+ struct evplist q_pending;
+ struct evplist q_inflight;
+ struct rq_message *q_mtabatch;
+ struct evplist q_mda;
+ struct evplist q_bounce;
+ struct evplist q_expired;
+ struct evplist q_removed;
};
static void scheduler_ramqueue_init(void);
@@ -90,6 +89,8 @@ static size_t scheduler_ramqueue_rollback(uint32_t);
static void scheduler_ramqueue_update(struct scheduler_info *);
static void scheduler_ramqueue_delete(uint64_t);
static void scheduler_ramqueue_batch(int, struct scheduler_batch *);
+static size_t scheduler_ramqueue_messages(uint32_t, uint32_t *, size_t);
+static size_t scheduler_ramqueue_envelopes(uint64_t, struct evpstate *, size_t);
static void scheduler_ramqueue_schedule(uint64_t);
static void scheduler_ramqueue_remove(uint64_t);
@@ -117,6 +118,8 @@ struct scheduler_backend scheduler_backend_ramqueue = {
scheduler_ramqueue_batch,
+ scheduler_ramqueue_messages,
+ scheduler_ramqueue_envelopes,
scheduler_ramqueue_schedule,
scheduler_ramqueue_remove,
};
@@ -124,7 +127,7 @@ struct scheduler_backend scheduler_backend_ramqueue = {
static struct rq_queue ramqueue;
static struct tree updates;
-static time_t currtime;
+static time_t currtime;
extern int verbose;
@@ -159,6 +162,7 @@ scheduler_ramqueue_insert(struct scheduler_info *si)
if ((message = tree_get(&update->messages, msgid)) == NULL) {
message = xcalloc(1, sizeof *message, "scheduler_insert");
message->msgid = msgid;
+ TAILQ_INIT(&message->q_mta);
tree_init(&message->envelopes);
tree_xset(&update->messages, msgid, message);
stat_increment("scheduler.ramqueue.message", 1);
@@ -177,7 +181,7 @@ scheduler_ramqueue_insert(struct scheduler_info *si)
stat_increment("scheduler.ramqueue.envelope", 1);
envelope->flags = RQ_ENVELOPE_PENDING;
- sorted_insert(&update->pending, envelope);
+ sorted_insert(&update->q_pending, envelope);
}
static size_t
@@ -195,6 +199,10 @@ scheduler_ramqueue_commit(uint32_t msgid)
rq_queue_dump(update, "update to commit");
rq_queue_merge(&ramqueue, update);
+
+ if (verbose & TRACE_SCHEDULER)
+ rq_queue_dump(&ramqueue, "resulting queue");
+
rq_queue_schedule(&ramqueue);
free(update);
@@ -216,8 +224,8 @@ scheduler_ramqueue_rollback(uint32_t msgid)
return (0);
r = update->evpcount;
- while ((evp = TAILQ_FIRST(&update->pending))) {
- TAILQ_REMOVE(&update->pending, evp, entry);
+ while ((evp = TAILQ_FIRST(&update->q_pending))) {
+ TAILQ_REMOVE(&update->q_pending, evp, entry);
rq_envelope_delete(update, evp);
}
@@ -247,9 +255,10 @@ scheduler_ramqueue_update(struct scheduler_info *si)
while ((evp->sched = scheduler_compute_schedule(si)) <= currtime)
si->retry += 1;
+ TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
+ sorted_insert(&ramqueue.q_pending, evp);
evp->flags &= ~RQ_ENVELOPE_INFLIGHT;
evp->flags |= RQ_ENVELOPE_PENDING;
- sorted_insert(&ramqueue.pending, evp);
}
static void
@@ -269,6 +278,7 @@ scheduler_ramqueue_delete(uint64_t evpid)
if (!(evp->flags & RQ_ENVELOPE_INFLIGHT))
errx(1, "evp:%016" PRIx64 " not in-flight", evpid);
+ TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
evp->flags &= ~RQ_ENVELOPE_INFLIGHT;
rq_envelope_delete(&ramqueue, evp);
}
@@ -276,7 +286,9 @@ scheduler_ramqueue_delete(uint64_t evpid)
static void
scheduler_ramqueue_batch(int typemask, struct scheduler_batch *ret)
{
- struct rq_envelope *evp, *tmp, **batch;
+ struct evplist *q;
+ struct rq_envelope *evp;
+ struct rq_message *msg;
struct id_list *item;
currtime = time(NULL);
@@ -285,28 +297,30 @@ scheduler_ramqueue_batch(int typemask, struct scheduler_batch *ret)
if (verbose & TRACE_SCHEDULER)
rq_queue_dump(&ramqueue, "scheduler_ramqueue_batch()");
- if (typemask & SCHED_REMOVE && ramqueue.sched_removed) {
- batch = &ramqueue.sched_removed;
+ if (typemask & SCHED_REMOVE && TAILQ_FIRST(&ramqueue.q_removed)) {
+ q = &ramqueue.q_removed;
ret->type = SCHED_REMOVE;
}
- else if (typemask & SCHED_EXPIRE && ramqueue.sched_expired) {
- batch = &ramqueue.sched_expired;
+ else if (typemask & SCHED_EXPIRE && TAILQ_FIRST(&ramqueue.q_expired)) {
+ q = &ramqueue.q_expired;
ret->type = SCHED_EXPIRE;
}
- else if (typemask & SCHED_BOUNCE && ramqueue.sched_bounce) {
- batch = &ramqueue.sched_bounce;
+ else if (typemask & SCHED_BOUNCE && TAILQ_FIRST(&ramqueue.q_bounce)) {
+ q = &ramqueue.q_bounce;
ret->type = SCHED_BOUNCE;
}
- else if (typemask & SCHED_MDA && ramqueue.sched_mda) {
- batch = &ramqueue.sched_mda;
+ else if (typemask & SCHED_MDA && TAILQ_FIRST(&ramqueue.q_mda)) {
+ q = &ramqueue.q_mda;
ret->type = SCHED_MDA;
}
- else if (typemask & SCHED_MTA && ramqueue.sched_mta) {
- batch = &ramqueue.sched_mta->sched_mta;
- ramqueue.sched_mta = ramqueue.sched_mta->sched_next;
+ else if (typemask & SCHED_MTA && ramqueue.q_mtabatch) {
+ msg = ramqueue.q_mtabatch;
+ ramqueue.q_mtabatch = msg->q_next;
+ msg->q_next = NULL;
+ q = &msg->q_mta;
ret->type = SCHED_MTA;
}
- else if ((evp = TAILQ_FIRST(&ramqueue.pending))) {
+ else if ((evp = TAILQ_FIRST(&ramqueue.q_pending))) {
ret->type = SCHED_DELAY;
if (evp->sched < evp->expire)
ret->delay = evp->sched - currtime;
@@ -321,8 +335,10 @@ scheduler_ramqueue_batch(int typemask, struct scheduler_batch *ret)
ret->evpids = NULL;
ret->evpcount = 0;
- for(evp = *batch; evp; evp = tmp) {
- tmp = evp->sched_next;
+
+ while ((evp = TAILQ_FIRST(q))) {
+
+ TAILQ_REMOVE(q, evp, entry);
/* consistency check */
if (!(evp->flags & RQ_ENVELOPE_SCHEDULED))
@@ -332,18 +348,17 @@ scheduler_ramqueue_batch(int typemask, struct scheduler_batch *ret)
item->id = evp->evpid;
item->next = ret->evpids;
ret->evpids = item;
- evp->sched_next = NULL;
+
if (ret->type == SCHED_REMOVE || ret->type == SCHED_EXPIRE)
rq_envelope_delete(&ramqueue, evp);
else {
+ TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry);
evp->flags &= ~RQ_ENVELOPE_SCHEDULED;
evp->flags |= RQ_ENVELOPE_INFLIGHT;
evp->t_inflight = currtime;
}
ret->evpcount++;
}
-
- *batch = NULL;
}
static void
@@ -352,26 +367,18 @@ scheduler_ramqueue_schedule(uint64_t evpid)
struct rq_message *msg;
struct rq_envelope *evp;
uint32_t msgid;
- void *i, *j;
+ void *i;
currtime = time(NULL);
- if (evpid == 0) {
- j = NULL;
- while (tree_iter(&ramqueue.messages, &j, NULL, (void*)(&msg))) {
- i = NULL;
- while (tree_iter(&msg->envelopes, &i, NULL,
- (void*)(&evp)))
- rq_envelope_schedule(&ramqueue, evp);
- }
- }
- else if (evpid > 0xffffffff) {
+ if (evpid > 0xffffffff) {
msgid = evpid_to_msgid(evpid);
if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
return;
if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
return;
- rq_envelope_schedule(&ramqueue, evp);
+ if (evp->flags & RQ_ENVELOPE_PENDING)
+ rq_envelope_schedule(&ramqueue, evp);
}
else {
msgid = evpid;
@@ -379,7 +386,8 @@ scheduler_ramqueue_schedule(uint64_t evpid)
return;
i = NULL;
while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
- rq_envelope_schedule(&ramqueue, evp);
+ if (evp->flags & RQ_ENVELOPE_PENDING)
+ rq_envelope_schedule(&ramqueue, evp);
}
}
@@ -411,6 +419,62 @@ scheduler_ramqueue_remove(uint64_t evpid)
}
}
+static size_t
+scheduler_ramqueue_messages(uint32_t from, uint32_t *dst, size_t size)
+{
+ uint64_t id;
+ size_t n;
+ void *i;
+
+ for (n = 0, i = NULL; n < size; n++) {
+ if (tree_iterfrom(&ramqueue.messages, &i, from, &id, NULL) == 0)
+ break;
+ dst[n] = id;
+ }
+
+ return (n);
+}
+
+static size_t
+scheduler_ramqueue_envelopes(uint64_t from, struct evpstate *dst, size_t size)
+{
+ struct rq_message *msg;
+ struct rq_envelope *evp;
+ void *i;
+ size_t n;
+
+ if ((msg = tree_get(&ramqueue.messages, evpid_to_msgid(from))) == NULL)
+ return (0);
+
+ for (n = 0, i = NULL; n < size; ) {
+
+ if (tree_iterfrom(&msg->envelopes, &i, from, NULL,
+ (void**)&evp) == 0)
+ break;
+
+ if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED))
+ continue;
+
+ dst[n].retry = 0;
+ dst[n].evpid = evp->evpid;
+ if (evp->flags & RQ_ENVELOPE_PENDING) {
+ dst[n].time = evp->sched;
+ dst[n].flags = DF_PENDING;
+ }
+ else if (evp->flags & RQ_ENVELOPE_SCHEDULED) {
+ dst[n].time = evp->t_scheduled;
+ dst[n].flags = DF_PENDING;
+ }
+ else if (evp->flags & RQ_ENVELOPE_INFLIGHT) {
+ dst[n].time = evp->t_inflight;
+ dst[n].flags = DF_INFLIGHT;
+ }
+ n++;
+ }
+
+ return (n);
+}
+
static void
sorted_insert(struct evplist *list, struct rq_envelope *evp)
{
@@ -444,7 +508,12 @@ rq_queue_init(struct rq_queue *rq)
{
bzero(rq, sizeof *rq);
tree_init(&rq->messages);
- TAILQ_INIT(&rq->pending);
+ TAILQ_INIT(&rq->q_pending);
+ TAILQ_INIT(&rq->q_inflight);
+ TAILQ_INIT(&rq->q_mda);
+ TAILQ_INIT(&rq->q_bounce);
+ TAILQ_INIT(&rq->q_expired);
+ TAILQ_INIT(&rq->q_removed);
}
static void
@@ -463,7 +532,7 @@ rq_queue_merge(struct rq_queue *rq, struct rq_queue *update)
}
/* need to re-link all envelopes before merging them */
i = NULL;
- while((tree_iter(&message->envelopes, &i, &id,
+ while ((tree_iter(&message->envelopes, &i, &id,
(void*)&envelope)))
envelope->message = tomessage;
tree_merge(&tomessage->envelopes, &message->envelopes);
@@ -471,7 +540,8 @@ rq_queue_merge(struct rq_queue *rq, struct rq_queue *update)
stat_decrement("scheduler.ramqueue.message", 1);
}
- sorted_merge(&rq->pending, &update->pending);
+ sorted_merge(&rq->q_pending, &update->q_pending);
+ rq->evpcount += update->evpcount;
}
static void
@@ -479,23 +549,21 @@ rq_queue_schedule(struct rq_queue *rq)
{
struct rq_envelope *evp;
- while ((evp = TAILQ_FIRST(&rq->pending))) {
+ while ((evp = TAILQ_FIRST(&rq->q_pending))) {
if (evp->sched > currtime && evp->expire > currtime)
break;
- /* it *must* be pending */
if (evp->flags != RQ_ENVELOPE_PENDING)
errx(1, "evp:%016" PRIx64 " flags=0x%x", evp->evpid,
evp->flags);
if (evp->expire <= currtime) {
- TAILQ_REMOVE(&rq->pending, evp, entry);
+ TAILQ_REMOVE(&rq->q_pending, evp, entry);
+ TAILQ_INSERT_TAIL(&rq->q_expired, evp, entry);
evp->flags &= ~RQ_ENVELOPE_PENDING;
evp->flags |= RQ_ENVELOPE_EXPIRED;
evp->flags |= RQ_ENVELOPE_SCHEDULED;
evp->t_scheduled = currtime;
- evp->sched_next = rq->sched_expired;
- rq->sched_expired = evp;
continue;
}
rq_envelope_schedule(rq, evp);
@@ -505,28 +573,22 @@ rq_queue_schedule(struct rq_queue *rq)
static void
rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp)
{
- if (evp->flags & (RQ_ENVELOPE_SCHEDULED | RQ_ENVELOPE_INFLIGHT))
- return;
-
- if (evp->flags & RQ_ENVELOPE_PENDING)
- TAILQ_REMOVE(&rq->pending, evp, entry);
+ struct evplist *q = NULL;
if (evp->type == D_MTA) {
- if (evp->message->sched_mta == NULL) {
- evp->message->sched_next = rq->sched_mta;
- rq->sched_mta = evp->message;
+ if (TAILQ_EMPTY(&evp->message->q_mta)) {
+ evp->message->q_next = rq->q_mtabatch;
+ rq->q_mtabatch = evp->message;
}
- evp->sched_next = evp->message->sched_mta;
- evp->message->sched_mta = evp;
- }
- else if (evp->type == D_MDA) {
- evp->sched_next = rq->sched_mda;
- rq->sched_mda = evp;
- }
- else if (evp->type == D_BOUNCE) {
- evp->sched_next = rq->sched_bounce;
- rq->sched_bounce = evp;
+ q = &evp->message->q_mta;
}
+ else if (evp->type == D_MDA)
+ q = &rq->q_mda;
+ else if (evp->type == D_BOUNCE)
+ q = &rq->q_bounce;
+
+ TAILQ_REMOVE(&rq->q_pending, evp, entry);
+ TAILQ_INSERT_TAIL(q, evp, entry);
evp->flags &= ~RQ_ENVELOPE_PENDING;
evp->flags |= RQ_ENVELOPE_SCHEDULED;
evp->t_scheduled = currtime;
@@ -535,16 +597,51 @@ rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp)
static void
rq_envelope_remove(struct rq_queue *rq, struct rq_envelope *evp)
{
- if (!(evp->flags & (RQ_ENVELOPE_PENDING)))
+ struct rq_message *m;
+ struct evplist *q = NULL;
+
+ if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED))
+ return;
+ /*
+ * For now we just ignore it, but we could mark the envelope for
+ * removal and possibly send a cancellation to the agent.
+ */
+ if (evp->flags & (RQ_ENVELOPE_INFLIGHT))
return;
- TAILQ_REMOVE(&rq->pending, evp, entry);
- evp->sched_next = rq->sched_removed;
- rq->sched_removed = evp;
+ if (evp->flags & RQ_ENVELOPE_SCHEDULED) {
+ if (evp->type == D_MTA)
+ q = &evp->message->q_mta;
+ else if (evp->type == D_MDA)
+ q = &rq->q_mda;
+ else if (evp->type == D_BOUNCE)
+ q = &rq->q_bounce;
+ } else
+ q = &rq->q_pending;
+
+ TAILQ_REMOVE(q, evp, entry);
+ TAILQ_INSERT_TAIL(&rq->q_removed, evp, entry);
evp->flags &= ~RQ_ENVELOPE_PENDING;
evp->flags |= RQ_ENVELOPE_REMOVED;
evp->flags |= RQ_ENVELOPE_SCHEDULED;
evp->t_scheduled = currtime;
+
+ /*
+ * We might need to unschedule the message if it was the only
+ * scheduled envelope
+ */
+ if (q == &evp->message->q_mta && TAILQ_EMPTY(q)) {
+ if (rq->q_mtabatch == evp->message)
+ rq->q_mtabatch = evp->message->q_next;
+ else {
+ for (m = rq->q_mtabatch; m->q_next; m = m->q_next)
+ if (m->q_next == evp->message) {
+ m->q_next = evp->message->q_next;
+ break;
+ }
+ }
+ evp->message->q_next = NULL;
+ }
}
static void
@@ -558,6 +655,7 @@ rq_envelope_delete(struct rq_queue *rq, struct rq_envelope *evp)
}
free(evp);
+ rq->evpcount--;
stat_decrement("scheduler.ramqueue.envelope", 1);
}
@@ -576,7 +674,8 @@ rq_envelope_to_text(struct rq_envelope *e)
else if (e->type == D_MTA)
strlcat(buf, "mta", sizeof buf);
- snprintf(t, sizeof t, ",expire=%s", duration_to_text(e->expire - currtime));
+ snprintf(t, sizeof t, ",expire=%s",
+ duration_to_text(e->expire - currtime));
strlcat(buf, t, sizeof buf);
if (e->flags & RQ_ENVELOPE_PENDING) {
@@ -615,10 +714,10 @@ rq_queue_dump(struct rq_queue *rq, const char * name)
log_debug("debug: /--- ramqueue: %s", name);
i = NULL;
- while((tree_iter(&rq->messages, &i, &id, (void*)&message))) {
+ while ((tree_iter(&rq->messages, &i, &id, (void*)&message))) {
log_debug("debug: | msg:%08" PRIx32, message->msgid);
j = NULL;
- while((tree_iter(&message->envelopes, &j, &id,
+ while ((tree_iter(&message->envelopes, &j, &id,
(void*)&envelope)))
log_debug("debug: | %s",
rq_envelope_to_text(envelope));
diff --git a/usr.sbin/smtpd/smtpctl.8 b/usr.sbin/smtpd/smtpctl.8
index 96b9637fd05..b8db5ed7586 100644
--- a/usr.sbin/smtpd/smtpctl.8
+++ b/usr.sbin/smtpd/smtpctl.8
@@ -1,4 +1,4 @@
-.\" $OpenBSD: smtpctl.8,v 1.34 2012/10/17 08:38:48 eric Exp $
+.\" $OpenBSD: smtpctl.8,v 1.35 2012/11/20 09:47:46 eric Exp $
.\"
.\" Copyright (c) 2006 Pierre-Yves Ritschard <pyr@openbsd.org>
.\"
@@ -14,7 +14,7 @@
.\" ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
.\" OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
.\"
-.Dd $Mdocdate: October 17 2012 $
+.Dd $Mdocdate: November 20 2012 $
.Dt SMTPCTL 8
.Os
.Sh NAME
@@ -40,6 +40,43 @@ The following commands are available:
Disable verbose debug logging.
.It Cm log verbose
Enable verbose debug logging.
+.It Cm monitor
+Display updates of some
+.Xr smtpd 8
+internal counters in one second intervals.
+Each line reports the increment of all counters since the last update,
+except for some counters which are always absolute values.
+The first line reports the current value of each counter.
+The fields are:
+.Pp
+.Bl -compact -bullet
+.It
+Current number of active SMTP clients (absolute value).
+.It
+New SMTP clients.
+.It
+Disconnected clients.
+.It
+Current number of envelopes in the queue (absolute value).
+.It
+Newly enqueued envelopes.
+.It
+Dequeued envelopes.
+.It
+Successful deliveries.
+.It
+Temporary failures.
+.It
+Permanent failures.
+.It
+Message loops.
+.It
+Expired envelopes.
+.It
+Envelopes removed by the administrator.
+.It
+Generated bounces.
+.El
.It Cm pause mda
Temporarily stop deliveries to local users.
.It Cm pause mta
@@ -48,7 +85,7 @@ remote users.
.It Cm pause smtp
Temporarily stop accepting incoming sessions.
.It Cm remove Ar envelope-id | message-id
-Removes a single envelope, or all envelopes with the same message ID.
+Remove a single envelope, or all envelopes with the same message ID.
.It Cm resume mda
Resume deliveries to local users.
.It Cm resume mta
@@ -56,26 +93,64 @@ Resume relaying and deliveries to remote users.
.It Cm resume smtp
Resume accepting incoming sessions.
.It Cm schedule-all
-Marks all envelopes as ready for immediate delivery.
+Mark all envelopes as ready for immediate delivery.
.It Cm schedule-id Ar envelope-id | message-id
-Marks a single envelope, or all envelopes with the same message ID,
+Mark a single envelope, or all envelopes with the same message ID,
as ready for immediate delivery.
.It Cm show envelope Ar envelope-id
-Displays envelope's content for the given ID.
+Display envelope content for the given ID.
.It Cm show message Ar envelope-id
-Displays message content for the given ID.
+Display message content for the given ID.
.It Cm show queue
-Displays information concerning envelopes
-that are currently in a queue.
+Display information concerning envelopes that are currently in the queue.
+Each line of output describes a single envelope.
+It consists of the following fields, separated by a "|":
+.Pp
+.Bl -compact -bullet
+.It
+Envelope id.
+.It
+Address family of the client which enqueued the mail.
+.It
+Type of delivery: one of "mta", "mda" or "bounce".
+.It
+Various flags on the envelope.
+.It
+Sender address (return path).
+.It
+The original recipient address.
+.It
+The destination address.
+.It
+Time of creation.
+.It
+Time of expiration.
+.It
+Time of last delivery or relaying attempt.
+.It
+Number of delivery or relaying attempts.
+.It
+Current runstate: either "pending" or "inflight" if
+.Xr smtpd 8
+is running, or "offline" otherwise.
+.It
+Delay in seconds before the next attempt if pending, or time ellapsed
+if currently running.
+This field is blank if
+.Xr smtpd 8
+is not running.
+.It
+Error string for the last failed delivery or relay attempt.
+.El
.It Cm show stats
Displays runtime statistics concerning
.Xr smtpd 8 .
-.It Cm update map Ar name
-For map backends that provide caching, causes
-.Xr smtpd 8
-to update the cache.
.It Cm stop
Stop the server.
+.It Cm update table Ar name
+For table backends that provide caching, causes
+.Xr smtpd 8
+to update the cache.
.El
.Pp
When
diff --git a/usr.sbin/smtpd/smtpctl.c b/usr.sbin/smtpd/smtpctl.c
index dca5f2941dc..3ecf6c54ec1 100644
--- a/usr.sbin/smtpd/smtpctl.c
+++ b/usr.sbin/smtpd/smtpctl.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: smtpctl.c,v 1.96 2012/11/13 09:35:18 eric Exp $ */
+/* $OpenBSD: smtpctl.c,v 1.97 2012/11/20 09:47:46 eric Exp $ */
/*
* Copyright (c) 2006 Pierre-Yves Ritschard <pyr@openbsd.org>
@@ -61,6 +61,10 @@ static void show_monitor(struct stat_digest *);
static int try_connect(void);
static void flush(void);
static void next_message(struct imsg *);
+static int action_schedule_all(void);
+
+static int action_show_queue(void);
+static int action_show_queue_message(uint32_t);
int proctype;
struct imsgbuf *ibuf;
@@ -70,16 +74,19 @@ extern char *__progname;
struct smtpd *env = NULL;
+time_t now;
+
__dead void
usage(void)
{
extern char *__progname;
if (sendmail)
- fprintf(stderr, "usage: %s [-tv] [-f from] [-F name] to ..\n",
+ fprintf(stderr, "usage: %s [-tv] [-f from] [-F name] to ...\n",
__progname);
else
- fprintf(stderr, "usage: %s command [argument ...]\n", __progname);
+ fprintf(stderr, "usage: %s command [argument ...]\n",
+ __progname);
exit(1);
}
@@ -139,7 +146,7 @@ next_message(struct imsg *imsg)
{
ssize_t n;
- while(1) {
+ while (1) {
if ((n = imsg_get(ibuf, imsg)) == -1)
errx(1, "imsg_get error");
if (n)
@@ -175,7 +182,6 @@ main(int argc, char *argv[])
if (geteuid())
errx(1, "need root privileges");
- setup_env(&smtpd);
if (strcmp(__progname, "mailq") == 0)
action = SHOW_QUEUE;
@@ -186,24 +192,26 @@ main(int argc, char *argv[])
} else
errx(1, "unsupported mode");
- /* test for not connected actions */
- switch (action) {
- case SHOW_QUEUE:
- show_queue(0);
- return (0);
- case SHOW_ENVELOPE:
- show_envelope(res->data);
- return (0);
- case SHOW_MESSAGE:
- show_message(res->data);
+ if (action == SHOW_ENVELOPE ||
+ action == SHOW_MESSAGE ||
+ !try_connect()) {
+ setup_env(&smtpd);
+ switch (action) {
+ case SHOW_QUEUE:
+ show_queue(0);
+ break;
+ case SHOW_ENVELOPE:
+ show_envelope(res->data);
+ break;
+ case SHOW_MESSAGE:
+ show_message(res->data);
+ break;
+ default:
+ errx(1, "smtpd doesn't seem to be running");
+ }
return (0);
- default:
- break;
}
- if (!try_connect())
- errx(1, "smtpd doesn't seem to be running");
-
/* process user request */
switch (action) {
case NONE:
@@ -220,11 +228,10 @@ main(int argc, char *argv[])
imsg_compose(ibuf, IMSG_SCHEDULER_REMOVE, 0, 0, -1, &ulval,
sizeof(ulval));
break;
+ case SHOW_QUEUE:
+ return action_show_queue();
case SCHEDULE_ALL:
- ulval = 0;
- imsg_compose(ibuf, IMSG_SCHEDULER_SCHEDULE, 0, 0, -1, &ulval,
- sizeof(ulval));
- break;
+ return action_schedule_all();
case SHUTDOWN:
imsg_compose(ibuf, IMSG_CTL_SHUTDOWN, 0, 0, -1, NULL, 0);
break;
@@ -280,15 +287,13 @@ main(int argc, char *argv[])
errx(1, "unknown request (%d)", action);
}
- while (!done) {
-
+ do {
flush();
next_message(&imsg);
- switch(action) {
+ switch (action) {
case REMOVE:
case SCHEDULE:
- case SCHEDULE_ALL:
case SHUTDOWN:
case PAUSE_MDA:
case PAUSE_MTA:
@@ -315,12 +320,129 @@ main(int argc, char *argv[])
}
imsg_free(&imsg);
- }
+ } while (!done);
free(ibuf);
return (0);
}
+
+static int
+action_show_queue_message(uint32_t msgid)
+{
+ struct imsg imsg;
+ struct envelope *evp;
+ uint64_t evpid;
+ size_t found;
+
+ evpid = msgid_to_evpid(msgid);
+
+ nextbatch:
+
+ found = 0;
+ imsg_compose(ibuf, IMSG_SCHEDULER_ENVELOPES, 0, 0, -1,
+ &evpid, sizeof evpid);
+ flush();
+
+ while (1) {
+ next_message(&imsg);
+ if (imsg.hdr.type != IMSG_SCHEDULER_ENVELOPES)
+ errx(1, "unexpected message %i", imsg.hdr.type);
+
+ if (imsg.hdr.len == sizeof imsg.hdr) {
+ imsg_free(&imsg);
+ if (!found || evpid_to_msgid(++evpid) != msgid)
+ return (0);
+ goto nextbatch;
+ }
+ found++;
+ evp = imsg.data;
+ evpid = evp->id;
+ show_queue_envelope(evp, 1);
+ imsg_free(&imsg);
+ }
+
+}
+
+static int
+action_show_queue(void)
+{
+ struct imsg imsg;
+ uint32_t *msgids, msgid;
+ size_t i, n;
+
+ msgid = 0;
+ now = time(NULL);
+
+ do {
+ imsg_compose(ibuf, IMSG_SCHEDULER_MESSAGES, 0, 0, -1,
+ &msgid, sizeof msgid);
+ flush();
+ next_message(&imsg);
+ if (imsg.hdr.type != IMSG_SCHEDULER_MESSAGES)
+ errx(1, "unexpected message type %i", imsg.hdr.type);
+ msgids = imsg.data;
+ n = (imsg.hdr.len - sizeof imsg.hdr) / sizeof (*msgids);
+ if (n == 0) {
+ imsg_free(&imsg);
+ break;
+ }
+ for (i = 0; i < n; i++) {
+ msgid = msgids[i];
+ action_show_queue_message(msgid);
+ }
+ imsg_free(&imsg);
+
+ } while (++msgid);
+
+ return (0);
+}
+
+static int
+action_schedule_all(void)
+{
+ struct imsg imsg;
+ uint32_t *msgids, from;
+ uint64_t evpid;
+ size_t i, n;
+
+ from = 0;
+ while (1) {
+ imsg_compose(ibuf, IMSG_SCHEDULER_MESSAGES, 0, 0, -1,
+ &from, sizeof from);
+ flush();
+ next_message(&imsg);
+ if (imsg.hdr.type != IMSG_SCHEDULER_MESSAGES)
+ errx(1, "unexpected message type %i", imsg.hdr.type);
+ msgids = imsg.data;
+ n = (imsg.hdr.len - sizeof imsg.hdr) / sizeof (*msgids);
+ if (n == 0)
+ break;
+
+ for (i = 0; i < n; i++) {
+ evpid = msgids[i];
+ imsg_compose(ibuf, IMSG_SCHEDULER_SCHEDULE, 0,
+ 0, -1, &evpid, sizeof(evpid));
+ }
+ from = msgids[n - 1] + 1;
+
+ imsg_free(&imsg);
+ flush();
+
+ for (i = 0; i < n; i++) {
+ next_message(&imsg);
+ if (imsg.hdr.type != IMSG_CTL_OK)
+ errx(1, "unexpected message type %i",
+ imsg.hdr.type);
+ }
+
+ if (from == 0)
+ break;
+ }
+
+ return (0);
+}
+
static int
show_command_output(struct imsg *imsg)
{
@@ -346,7 +468,7 @@ show_stats_output(void)
bzero(&kv, sizeof kv);
- while(1) {
+ while (1) {
imsg_compose(ibuf, IMSG_STATS_GET, 0, 0, -1, &kv, sizeof kv);
flush();
next_message(&imsg);
@@ -361,7 +483,7 @@ show_stats_output(void)
if (strcmp(kvp->key, "uptime") == 0) {
duration = time(NULL) - kvp->val.u.counter;
- printf("uptime=%zd\n", (size_t)duration);
+ printf("uptime=%zd\n", (size_t)duration);
printf("uptime.human=%s\n",
duration_to_text(duration));
}
@@ -418,11 +540,12 @@ show_queue(int flags)
qwalk_close(q);
}
+
static void
-show_queue_envelope(struct envelope *e, int flags)
+show_queue_envelope(struct envelope *e, int online)
{
- const char *src = "?";
- char status[128];
+ const char *src = "?", *agent = "?";
+ char status[128], runstate[128];
status[0] = '\0';
@@ -433,28 +556,33 @@ show_queue_envelope(struct envelope *e, int flags)
getflag(&e->flags, DF_INTERNAL, "internal",
status, sizeof(status));
+ if (online) {
+ if (e->flags & DF_PENDING)
+ snprintf(runstate, sizeof runstate, "pending|%zi",
+ (ssize_t)(e->nexttry - now));
+ else if (e->flags & DF_INFLIGHT)
+ snprintf(runstate, sizeof runstate, "inflight|%zi",
+ (ssize_t)(now - e->lasttry));
+ else
+ snprintf(runstate, sizeof runstate, "invalid|");
+ e->flags &= ~(DF_PENDING|DF_INFLIGHT);
+ }
+ else
+ strlcpy(runstate, "offline|", sizeof runstate);
+
if (e->flags)
errx(1, "%016" PRIx64 ": unexpected flags 0x%04x", e->id,
e->flags);
-
+
if (status[0])
status[strlen(status) - 1] = '\0';
- else
- strlcpy(status, "-", sizeof(status));
- switch (e->type) {
- case D_MDA:
- printf("mda");
- break;
- case D_MTA:
- printf("mta");
- break;
- case D_BOUNCE:
- printf("bounce");
- break;
- default:
- printf("unknown");
- }
+ if (e->type == D_MDA)
+ agent = "mda";
+ else if (e->type == D_MTA)
+ agent = "mta";
+ else if (e->type == D_BOUNCE)
+ agent = "bounce";
if (e->ss.ss_family == AF_LOCAL)
src = "local";
@@ -463,20 +591,25 @@ show_queue_envelope(struct envelope *e, int flags)
else if (e->ss.ss_family == AF_INET6)
src = "inet6";
- printf("|%016" PRIx64 "|%s|%s|%s@%s|%s@%s|%" PRId64 "|%" PRId64 "|%u",
+ printf("%016"PRIx64
+ "|%s|%s|%s|%s@%s|%s@%s|%s@%s"
+ "|%zu|%zu|%zu|%zu|%s|%s\n",
+
e->id,
+
src,
+ agent,
status,
e->sender.user, e->sender.domain,
+ e->rcpt.user, e->rcpt.domain,
e->dest.user, e->dest.domain,
- (int64_t) e->lasttry,
- (int64_t) e->expire,
- e->retry);
-
- if (e->errorline[0] != '\0')
- printf("|%s", e->errorline);
-
- printf("\n");
+
+ (size_t) e->creation,
+ (size_t) (e->creation + e->expire),
+ (size_t) e->lasttry,
+ (size_t) e->retry,
+ runstate,
+ e->errorline);
}
static void
diff --git a/usr.sbin/smtpd/smtpd.c b/usr.sbin/smtpd/smtpd.c
index ffb57be4e46..4b73879089d 100644
--- a/usr.sbin/smtpd/smtpd.c
+++ b/usr.sbin/smtpd/smtpd.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: smtpd.c,v 1.181 2012/11/12 14:58:53 eric Exp $ */
+/* $OpenBSD: smtpd.c,v 1.182 2012/11/20 09:47:46 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -1339,6 +1339,8 @@ imsg_to_str(int type)
CASE(IMSG_QUEUE_REMOVE);
CASE(IMSG_QUEUE_EXPIRE);
+ CASE(IMSG_SCHEDULER_MESSAGES);
+ CASE(IMSG_SCHEDULER_ENVELOPES);
CASE(IMSG_SCHEDULER_REMOVE);
CASE(IMSG_SCHEDULER_SCHEDULE);
diff --git a/usr.sbin/smtpd/smtpd.h b/usr.sbin/smtpd/smtpd.h
index 5c3b342bd9b..fbc1d230baf 100644
--- a/usr.sbin/smtpd/smtpd.h
+++ b/usr.sbin/smtpd/smtpd.h
@@ -1,4 +1,4 @@
-/* $OpenBSD: smtpd.h,v 1.395 2012/11/12 14:58:53 eric Exp $ */
+/* $OpenBSD: smtpd.h,v 1.396 2012/11/20 09:47:46 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -164,6 +164,8 @@ enum imsg_type {
IMSG_QUEUE_REMOVE,
IMSG_QUEUE_EXPIRE,
+ IMSG_SCHEDULER_MESSAGES,
+ IMSG_SCHEDULER_ENVELOPES,
IMSG_SCHEDULER_REMOVE,
IMSG_SCHEDULER_SCHEDULE,
@@ -337,7 +339,11 @@ enum delivery_status {
enum delivery_flags {
DF_AUTHENTICATED = 0x1,
DF_BOUNCE = 0x4,
- DF_INTERNAL = 0x8 /* internal expansion forward */
+ DF_INTERNAL = 0x8, /* internal expansion forward */
+
+ /* runstate, not saved on disk */
+ DF_PENDING = 0x10,
+ DF_INFLIGHT = 0x20,
};
struct delivery_mda {
@@ -419,6 +425,7 @@ struct envelope {
time_t expire;
uint16_t retry;
enum delivery_flags flags;
+ time_t nexttry;
};
enum envelope_field {
@@ -813,6 +820,13 @@ struct delivery_backend {
void (*open)(struct deliver *);
};
+struct evpstate {
+ uint64_t evpid;
+ uint16_t flags;
+ uint16_t retry;
+ time_t time;
+};
+
struct scheduler_info {
uint64_t evpid;
enum delivery_type type;
@@ -854,6 +868,8 @@ struct scheduler_backend {
void (*batch)(int, struct scheduler_batch *);
+ size_t (*messages)(uint32_t, uint32_t *, size_t);
+ size_t (*envelopes)(uint64_t, struct evpstate *, size_t);
void (*schedule)(uint64_t);
void (*remove)(uint64_t);
};
@@ -1152,6 +1168,7 @@ void *tree_xpop(struct tree *, uint64_t);
int tree_poproot(struct tree *, uint64_t *, void **);
int tree_root(struct tree *, uint64_t *, void **);
int tree_iter(struct tree *, void **, uint64_t *, void **);
+int tree_iterfrom(struct tree *, void **, uint64_t, uint64_t *, void **);
void tree_merge(struct tree *, struct tree *);
diff --git a/usr.sbin/smtpd/tree.c b/usr.sbin/smtpd/tree.c
index ce147e33c02..bc5a7b22149 100644
--- a/usr.sbin/smtpd/tree.c
+++ b/usr.sbin/smtpd/tree.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: tree.c,v 1.1 2012/08/07 21:47:58 eric Exp $ */
+/* $OpenBSD: tree.c,v 1.2 2012/11/20 09:47:46 eric Exp $ */
/*
* Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
@@ -193,6 +193,38 @@ tree_iter(struct tree *t, void **hdl, uint64_t *id, void **data)
return (0);
}
+int
+tree_iterfrom(struct tree *t, void **hdl, uint64_t k, uint64_t *id, void **data)
+{
+ struct treeentry *curr = *hdl, key;
+
+ if (curr == NULL) {
+ if (k == 0)
+ curr = SPLAY_MIN(tree, t);
+ else {
+ key.id = k;
+ curr = SPLAY_FIND(tree, t, &key);
+ if (curr == NULL) {
+ SPLAY_INSERT(tree, t, &key);
+ curr = SPLAY_NEXT(tree, t, &key);
+ SPLAY_REMOVE(tree, t, &key);
+ }
+ }
+ } else
+ curr = SPLAY_NEXT(tree, t, curr);
+
+ if (curr) {
+ *hdl = curr;
+ if (id)
+ *id = curr->id;
+ if (data)
+ *data = curr->data;
+ return (1);
+ }
+
+ return (0);
+}
+
void
tree_merge(struct tree *dst, struct tree *src)
{