summaryrefslogtreecommitdiffstats
path: root/usr.sbin/smtpd/scheduler.c
diff options
context:
space:
mode:
Diffstat (limited to 'usr.sbin/smtpd/scheduler.c')
-rw-r--r--usr.sbin/smtpd/scheduler.c64
1 files changed, 45 insertions, 19 deletions
diff --git a/usr.sbin/smtpd/scheduler.c b/usr.sbin/smtpd/scheduler.c
index 44f0f15eec1..31cddeecaef 100644
--- a/usr.sbin/smtpd/scheduler.c
+++ b/usr.sbin/smtpd/scheduler.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: scheduler.c,v 1.23 2012/11/12 14:58:53 eric Exp $ */
+/* $OpenBSD: scheduler.c,v 1.24 2012/11/20 09:47:45 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -60,14 +60,18 @@ static struct scheduler_backend *backend = NULL;
extern const char *backend_scheduler;
+#define MSGBATCHSIZE 1024
+#define EVPBATCHSIZE 256
+
void
scheduler_imsg(struct imsgev *iev, struct imsg *imsg)
{
+ struct evpstate state[EVPBATCHSIZE];
struct envelope *e;
struct scheduler_info si;
uint64_t id;
- uint32_t msgid;
- size_t n;
+ uint32_t msgid, msgids[MSGBATCHSIZE];
+ size_t n, i;
switch (imsg->hdr.type) {
@@ -169,14 +173,33 @@ scheduler_imsg(struct imsgev *iev, struct imsg *imsg)
log_verbose(*(int *)imsg->data);
return;
+ case IMSG_SCHEDULER_MESSAGES:
+ msgid = *(uint32_t *)(imsg->data);
+ n = backend->messages(msgid, msgids, MSGBATCHSIZE);
+ imsg_compose_event(iev, IMSG_SCHEDULER_MESSAGES,
+ imsg->hdr.peerid, 0, -1, msgids, n * sizeof (*msgids));
+ return;
+
+ case IMSG_SCHEDULER_ENVELOPES:
+ id = *(uint64_t *)(imsg->data);
+ n = backend->envelopes(id, state, EVPBATCHSIZE);
+ for (i = 0; i < n; i++) {
+ imsg_compose_event(env->sc_ievs[PROC_QUEUE],
+ IMSG_SCHEDULER_ENVELOPES, imsg->hdr.peerid, 0, -1,
+ &state[i], sizeof state[i]);
+ }
+ imsg_compose_event(env->sc_ievs[PROC_QUEUE],
+ IMSG_SCHEDULER_ENVELOPES, imsg->hdr.peerid, 0, -1, NULL, 0);
+ return;
+
case IMSG_SCHEDULER_SCHEDULE:
id = *(uint64_t *)(imsg->data);
- if (id == 0)
- log_debug("debug: scheduler: scheduling all envelopes");
- else if (id <= 0xffffffffL)
- log_debug("debug: scheduler: scheduling msg:%08" PRIx64, id);
+ if (id <= 0xffffffffL)
+ log_debug("debug: scheduler: "
+ "scheduling msg:%08" PRIx64, id);
else
- log_debug("debug: scheduler: scheduling evp:%016" PRIx64, id);
+ log_debug("debug: scheduler: "
+ "scheduling evp:%016" PRIx64, id);
backend->schedule(id);
scheduler_reset_events();
return;
@@ -184,15 +207,18 @@ scheduler_imsg(struct imsgev *iev, struct imsg *imsg)
case IMSG_SCHEDULER_REMOVE:
id = *(uint64_t *)(imsg->data);
if (id <= 0xffffffffL)
- log_debug("debug: scheduler: removing msg:%08" PRIx64, id);
+ log_debug("debug: scheduler: "
+ "removing msg:%08" PRIx64, id);
else
- log_debug("debug: scheduler: removing evp:%016" PRIx64, id);
+ log_debug("debug: scheduler: "
+ "removing evp:%016" PRIx64, id);
backend->remove(id);
scheduler_reset_events();
return;
}
- errx(1, "scheduler_imsg: unexpected %s imsg", imsg_to_str(imsg->hdr.type));
+ errx(1, "scheduler_imsg: unexpected %s imsg",
+ imsg_to_str(imsg->hdr.type));
}
static void
@@ -349,7 +375,7 @@ scheduler_timeout(int fd, short event, void *p)
fatalx("scheduler_timeout: unknown batch type");
}
- evtimer_add(&env->sc_ev, &tv);
+ evtimer_add(&env->sc_ev, &tv);
}
static void
@@ -395,8 +421,8 @@ scheduler_process_bounce(struct scheduler_batch *batch)
while ((e = batch->evpids)) {
batch->evpids = e->next;
- log_debug("debug: scheduler: evp:%016" PRIx64 " scheduled (bounce)",
- e->id);
+ log_debug("debug: scheduler: evp:%016" PRIx64
+ " scheduled (bounce)", e->id);
imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_SMTP_ENQUEUE,
0, 0, -1, &e->id, sizeof e->id);
free(e);
@@ -412,8 +438,8 @@ scheduler_process_mda(struct scheduler_batch *batch)
while ((e = batch->evpids)) {
batch->evpids = e->next;
- log_debug("debug: scheduler: evp:%016" PRIx64 " scheduled (mda)",
- e->id);
+ log_debug("debug: scheduler: evp:%016" PRIx64
+ " scheduled (mda)", e->id);
imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_MDA_SESS_NEW,
0, 0, -1, &e->id, sizeof e->id);
free(e);
@@ -425,15 +451,15 @@ scheduler_process_mda(struct scheduler_batch *batch)
static void
scheduler_process_mta(struct scheduler_batch *batch)
{
- struct id_list *e;
+ struct id_list *e;
imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_BATCH_CREATE,
0, 0, -1, NULL, 0);
while ((e = batch->evpids)) {
batch->evpids = e->next;
- log_debug("debug: scheduler: evp:%016" PRIx64 " scheduled (mta)",
- e->id);
+ log_debug("debug: scheduler: evp:%016" PRIx64
+ " scheduled (mta)", e->id);
imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_BATCH_APPEND,
0, 0, -1, &e->id, sizeof e->id);
free(e);