aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEric Faurot <eric@faurot.net>2013-11-29 12:50:25 +0100
committerEric Faurot <eric@faurot.net>2013-11-29 12:50:25 +0100
commitbf5263d4a39d2611c81f878bf98de709f2e7f486 (patch)
tree5e3b566d64f552305f7ad58e21152ad0c4cf8414
parentMerge branch 'master' into portable (diff)
parentOn relay failure, send a special release to inform the scheduler to (diff)
downloadOpenSMTPD-bf5263d4a39d2611c81f878bf98de709f2e7f486.tar.xz
OpenSMTPD-bf5263d4a39d2611c81f878bf98de709f2e7f486.zip
Merge branch 'master' into portable
-rw-r--r--smtpd/mta.c2
-rw-r--r--smtpd/queue.c18
-rw-r--r--smtpd/scheduler.c25
-rw-r--r--smtpd/scheduler_ramqueue.c39
-rw-r--r--smtpd/smtpd-api.h7
5 files changed, 85 insertions, 6 deletions
diff --git a/smtpd/mta.c b/smtpd/mta.c
index 52ffc754..9f76c95e 100644
--- a/smtpd/mta.c
+++ b/smtpd/mta.c
@@ -1325,7 +1325,7 @@ mta_flush(struct mta_relay *relay, int fail, const char *error)
/* release all waiting envelopes for the relay */
m_create(p_queue, IMSG_DELIVERY_RELEASE, 0, 0, -1);
m_add_id(p_queue, relay->id);
- m_add_int(p_queue, 0);
+ m_add_int(p_queue, -1);
m_close(p_queue);
}
diff --git a/smtpd/queue.c b/smtpd/queue.c
index a4186e66..6ec4d167 100644
--- a/smtpd/queue.c
+++ b/smtpd/queue.c
@@ -244,6 +244,24 @@ queue_imsg(struct mproc *p, struct imsg *imsg)
log_warnx("warn: could not update envelope %016"PRIx64, evpid);
return;
+ case IMSG_DELIVERY_TEMPFAIL:
+ /* tempfail from the scheduler */
+ m_msg(&m, imsg);
+ m_get_evpid(&m, &evpid);
+ m_end(&m);
+ if (queue_envelope_load(evpid, &evp) == 0) {
+ log_warnx("queue: tempfail: failed to load envelope");
+ m_create(p_scheduler, IMSG_QUEUE_REMOVE, 0, 0, -1);
+ m_add_evpid(p_scheduler, evpid);
+ m_add_u32(p_scheduler, 0); /* not in-flight */
+ m_close(p_scheduler);
+ return;
+ }
+ evp.retry++;
+ if (!queue_envelope_update(&evp))
+ log_warnx("warn: could not update envelope %016"PRIx64, evpid);
+ return;
+
case IMSG_MDA_DELIVER:
m_msg(&m, imsg);
m_get_evpid(&m, &evpid);
diff --git a/smtpd/scheduler.c b/smtpd/scheduler.c
index 442b4314..6229f3bc 100644
--- a/smtpd/scheduler.c
+++ b/smtpd/scheduler.c
@@ -54,6 +54,7 @@ static void scheduler_reset_events(void);
static void scheduler_timeout(int, short, void *);
static void scheduler_process_remove(struct scheduler_batch *);
static void scheduler_process_expire(struct scheduler_batch *);
+static void scheduler_process_update(struct scheduler_batch *);
static void scheduler_process_bounce(struct scheduler_batch *);
static void scheduler_process_mda(struct scheduler_batch *);
static void scheduler_process_mta(struct scheduler_batch *);
@@ -471,7 +472,7 @@ scheduler_timeout(int fd, short event, void *p)
tv.tv_sec = 0;
tv.tv_usec = 0;
- typemask = SCHED_REMOVE | SCHED_EXPIRE | SCHED_BOUNCE;
+ typemask = SCHED_REMOVE | SCHED_EXPIRE | SCHED_UPDATE | SCHED_BOUNCE;
if (ninflight < env->sc_scheduler_max_inflight &&
!(env->sc_flags & SMTPD_MDA_PAUSED))
typemask |= SCHED_MDA;
@@ -508,6 +509,12 @@ scheduler_timeout(int fd, short event, void *p)
scheduler_process_expire(&batch);
break;
+ case SCHED_UPDATE:
+ log_trace(TRACE_SCHEDULER, "scheduler: SCHED_UPDATE %zu",
+ batch.evpcount);
+ scheduler_process_update(&batch);
+ break;
+
case SCHED_BOUNCE:
log_trace(TRACE_SCHEDULER, "scheduler: SCHED_BOUNCE %zu",
batch.evpcount);
@@ -568,6 +575,22 @@ scheduler_process_expire(struct scheduler_batch *batch)
}
static void
+scheduler_process_update(struct scheduler_batch *batch)
+{
+ size_t i;
+
+ for (i = 0; i < batch->evpcount; i++) {
+ log_debug("debug: scheduler: evp:%016" PRIx64
+ " scheduled (mta)", batch->evpids[i]);
+ m_create(p_queue, IMSG_DELIVERY_TEMPFAIL, 0, 0, -1);
+ m_add_evpid(p_queue, batch->evpids[i]);
+ m_close(p_queue);
+ }
+
+ stat_increment("scheduler.envelope.update", batch->evpcount);
+}
+
+static void
scheduler_process_bounce(struct scheduler_batch *batch)
{
size_t i;
diff --git a/smtpd/scheduler_ramqueue.c b/smtpd/scheduler_ramqueue.c
index d051294f..b00ae7ff 100644
--- a/smtpd/scheduler_ramqueue.c
+++ b/smtpd/scheduler_ramqueue.c
@@ -61,8 +61,10 @@ struct rq_envelope {
#define RQ_ENVELOPE_EXPIRED 0x01
#define RQ_ENVELOPE_REMOVED 0x02
#define RQ_ENVELOPE_SUSPEND 0x04
+#define RQ_ENVELOPE_UPDATE 0x08
uint8_t flags;
+ time_t ctime;
time_t sched;
time_t expire;
@@ -86,6 +88,7 @@ struct rq_queue {
struct evplist q_mta;
struct evplist q_mda;
struct evplist q_bounce;
+ struct evplist q_update;
struct evplist q_expired;
struct evplist q_removed;
};
@@ -195,6 +198,7 @@ scheduler_ram_insert(struct scheduler_info *si)
envelope->evpid = si->evpid;
envelope->type = si->type;
envelope->message = message;
+ envelope->ctime = si->creation;
envelope->expire = si->creation + si->expire;
envelope->sched = scheduler_compute_schedule(si);
tree_xset(&message->envelopes, envelope->evpid, envelope);
@@ -379,7 +383,7 @@ scheduler_ram_release(int type, uint64_t holdq, int n)
{
struct rq_holdq *hq;
struct rq_envelope *evp;
- int i;
+ int i, update;
currtime = time(NULL);
@@ -387,6 +391,13 @@ scheduler_ram_release(int type, uint64_t holdq, int n)
if (hq == NULL)
return (0);
+ if (n == -1) {
+ n = 0;
+ update = 1;
+ }
+ else
+ update = 0;
+
for (i = 0; n == 0 || i < n; i++) {
evp = TAILQ_FIRST(&hq->q);
if (evp == NULL)
@@ -400,6 +411,8 @@ scheduler_ram_release(int type, uint64_t holdq, int n)
* we could just schedule them directly.
*/
evp->state = RQ_EVPSTATE_PENDING;
+ if (update)
+ evp->flags |= RQ_ENVELOPE_UPDATE;
sorted_insert(&ramqueue.q_pending, evp);
}
@@ -418,6 +431,7 @@ scheduler_ram_batch(int typemask, struct scheduler_batch *ret)
struct evplist *q;
struct rq_envelope *evp;
size_t n;
+ int retry;
currtime = time(NULL);
@@ -433,6 +447,10 @@ scheduler_ram_batch(int typemask, struct scheduler_batch *ret)
q = &ramqueue.q_expired;
ret->type = SCHED_EXPIRE;
}
+ else if (typemask & SCHED_UPDATE && TAILQ_FIRST(&ramqueue.q_update)) {
+ q = &ramqueue.q_update;
+ ret->type = SCHED_UPDATE;
+ }
else if (typemask & SCHED_BOUNCE && TAILQ_FIRST(&ramqueue.q_bounce)) {
q = &ramqueue.q_bounce;
ret->type = SCHED_BOUNCE;
@@ -472,6 +490,19 @@ scheduler_ram_batch(int typemask, struct scheduler_batch *ret)
if (ret->type == SCHED_REMOVE || ret->type == SCHED_EXPIRE)
rq_envelope_delete(&ramqueue, evp);
+ else if (ret->type == SCHED_UPDATE) {
+
+ evp->flags &= ~RQ_ENVELOPE_UPDATE;
+
+ /* XXX we can't really use scheduler_compute_schedule */
+ retry = 0;
+ while ((evp->sched = evp->ctime + 800 * retry * retry / 2) <= currtime)
+ retry += 1;
+
+ evp->state = RQ_EVPSTATE_PENDING;
+ if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
+ sorted_insert(&ramqueue.q_pending, evp);
+ }
else {
TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry);
evp->state = RQ_EVPSTATE_INFLIGHT;
@@ -730,6 +761,7 @@ rq_queue_init(struct rq_queue *rq)
TAILQ_INIT(&rq->q_mta);
TAILQ_INIT(&rq->q_mda);
TAILQ_INIT(&rq->q_bounce);
+ TAILQ_INIT(&rq->q_update);
TAILQ_INIT(&rq->q_expired);
TAILQ_INIT(&rq->q_removed);
}
@@ -799,6 +831,8 @@ rq_envelope_list(struct rq_queue *rq, struct rq_envelope *evp)
return &rq->q_expired;
if (evp->flags & RQ_ENVELOPE_REMOVED)
return &rq->q_removed;
+ if (evp->flags & RQ_ENVELOPE_UPDATE)
+ return &rq->q_update;
if (evp->type == D_MTA)
return &rq->q_mta;
if (evp->type == D_MDA)
@@ -836,6 +870,9 @@ rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp)
break;
}
+ if (evp->flags & RQ_ENVELOPE_UPDATE)
+ q = &rq->q_update;
+
if (evp->state == RQ_EVPSTATE_HELD) {
hq = tree_xget(&holdqs[evp->type], evp->holdq);
TAILQ_REMOVE(&hq->q, evp, entry);
diff --git a/smtpd/smtpd-api.h b/smtpd/smtpd-api.h
index d52c6155..62f7e644 100644
--- a/smtpd/smtpd-api.h
+++ b/smtpd/smtpd-api.h
@@ -171,9 +171,10 @@ struct scheduler_info {
#define SCHED_DELAY 0x01
#define SCHED_REMOVE 0x02
#define SCHED_EXPIRE 0x04
-#define SCHED_BOUNCE 0x08
-#define SCHED_MDA 0x10
-#define SCHED_MTA 0x20
+#define SCHED_UPDATE 0x08
+#define SCHED_BOUNCE 0x10
+#define SCHED_MDA 0x20
+#define SCHED_MTA 0x40
struct scheduler_batch {
int type;