diff options
author | 2013-11-29 12:50:25 +0100 | |
---|---|---|
committer | 2013-11-29 12:50:25 +0100 | |
commit | bf5263d4a39d2611c81f878bf98de709f2e7f486 (patch) | |
tree | 5e3b566d64f552305f7ad58e21152ad0c4cf8414 | |
parent | Merge branch 'master' into portable (diff) | |
parent | On relay failure, send a special release to inform the scheduler to (diff) | |
download | OpenSMTPD-bf5263d4a39d2611c81f878bf98de709f2e7f486.tar.xz OpenSMTPD-bf5263d4a39d2611c81f878bf98de709f2e7f486.zip |
Merge branch 'master' into portable
-rw-r--r-- | smtpd/mta.c | 2 | ||||
-rw-r--r-- | smtpd/queue.c | 18 | ||||
-rw-r--r-- | smtpd/scheduler.c | 25 | ||||
-rw-r--r-- | smtpd/scheduler_ramqueue.c | 39 | ||||
-rw-r--r-- | smtpd/smtpd-api.h | 7 |
5 files changed, 85 insertions, 6 deletions
diff --git a/smtpd/mta.c b/smtpd/mta.c index 52ffc754..9f76c95e 100644 --- a/smtpd/mta.c +++ b/smtpd/mta.c @@ -1325,7 +1325,7 @@ mta_flush(struct mta_relay *relay, int fail, const char *error) /* release all waiting envelopes for the relay */ m_create(p_queue, IMSG_DELIVERY_RELEASE, 0, 0, -1); m_add_id(p_queue, relay->id); - m_add_int(p_queue, 0); + m_add_int(p_queue, -1); m_close(p_queue); } diff --git a/smtpd/queue.c b/smtpd/queue.c index a4186e66..6ec4d167 100644 --- a/smtpd/queue.c +++ b/smtpd/queue.c @@ -244,6 +244,24 @@ queue_imsg(struct mproc *p, struct imsg *imsg) log_warnx("warn: could not update envelope %016"PRIx64, evpid); return; + case IMSG_DELIVERY_TEMPFAIL: + /* tempfail from the scheduler */ + m_msg(&m, imsg); + m_get_evpid(&m, &evpid); + m_end(&m); + if (queue_envelope_load(evpid, &evp) == 0) { + log_warnx("queue: tempfail: failed to load envelope"); + m_create(p_scheduler, IMSG_QUEUE_REMOVE, 0, 0, -1); + m_add_evpid(p_scheduler, evpid); + m_add_u32(p_scheduler, 0); /* not in-flight */ + m_close(p_scheduler); + return; + } + evp.retry++; + if (!queue_envelope_update(&evp)) + log_warnx("warn: could not update envelope %016"PRIx64, evpid); + return; + case IMSG_MDA_DELIVER: m_msg(&m, imsg); m_get_evpid(&m, &evpid); diff --git a/smtpd/scheduler.c b/smtpd/scheduler.c index 442b4314..6229f3bc 100644 --- a/smtpd/scheduler.c +++ b/smtpd/scheduler.c @@ -54,6 +54,7 @@ static void scheduler_reset_events(void); static void scheduler_timeout(int, short, void *); static void scheduler_process_remove(struct scheduler_batch *); static void scheduler_process_expire(struct scheduler_batch *); +static void scheduler_process_update(struct scheduler_batch *); static void scheduler_process_bounce(struct scheduler_batch *); static void scheduler_process_mda(struct scheduler_batch *); static void scheduler_process_mta(struct scheduler_batch *); @@ -471,7 +472,7 @@ scheduler_timeout(int fd, short event, void *p) tv.tv_sec = 0; tv.tv_usec = 0; - typemask = SCHED_REMOVE | SCHED_EXPIRE | SCHED_BOUNCE; + typemask = SCHED_REMOVE | SCHED_EXPIRE | SCHED_UPDATE | SCHED_BOUNCE; if (ninflight < env->sc_scheduler_max_inflight && !(env->sc_flags & SMTPD_MDA_PAUSED)) typemask |= SCHED_MDA; @@ -508,6 +509,12 @@ scheduler_timeout(int fd, short event, void *p) scheduler_process_expire(&batch); break; + case SCHED_UPDATE: + log_trace(TRACE_SCHEDULER, "scheduler: SCHED_UPDATE %zu", + batch.evpcount); + scheduler_process_update(&batch); + break; + case SCHED_BOUNCE: log_trace(TRACE_SCHEDULER, "scheduler: SCHED_BOUNCE %zu", batch.evpcount); @@ -568,6 +575,22 @@ scheduler_process_expire(struct scheduler_batch *batch) } static void +scheduler_process_update(struct scheduler_batch *batch) +{ + size_t i; + + for (i = 0; i < batch->evpcount; i++) { + log_debug("debug: scheduler: evp:%016" PRIx64 + " scheduled (mta)", batch->evpids[i]); + m_create(p_queue, IMSG_DELIVERY_TEMPFAIL, 0, 0, -1); + m_add_evpid(p_queue, batch->evpids[i]); + m_close(p_queue); + } + + stat_increment("scheduler.envelope.update", batch->evpcount); +} + +static void scheduler_process_bounce(struct scheduler_batch *batch) { size_t i; diff --git a/smtpd/scheduler_ramqueue.c b/smtpd/scheduler_ramqueue.c index d051294f..b00ae7ff 100644 --- a/smtpd/scheduler_ramqueue.c +++ b/smtpd/scheduler_ramqueue.c @@ -61,8 +61,10 @@ struct rq_envelope { #define RQ_ENVELOPE_EXPIRED 0x01 #define RQ_ENVELOPE_REMOVED 0x02 #define RQ_ENVELOPE_SUSPEND 0x04 +#define RQ_ENVELOPE_UPDATE 0x08 uint8_t flags; + time_t ctime; time_t sched; time_t expire; @@ -86,6 +88,7 @@ struct rq_queue { struct evplist q_mta; struct evplist q_mda; struct evplist q_bounce; + struct evplist q_update; struct evplist q_expired; struct evplist q_removed; }; @@ -195,6 +198,7 @@ scheduler_ram_insert(struct scheduler_info *si) envelope->evpid = si->evpid; envelope->type = si->type; envelope->message = message; + envelope->ctime = si->creation; envelope->expire = si->creation + si->expire; envelope->sched = scheduler_compute_schedule(si); tree_xset(&message->envelopes, envelope->evpid, envelope); @@ -379,7 +383,7 @@ scheduler_ram_release(int type, uint64_t holdq, int n) { struct rq_holdq *hq; struct rq_envelope *evp; - int i; + int i, update; currtime = time(NULL); @@ -387,6 +391,13 @@ scheduler_ram_release(int type, uint64_t holdq, int n) if (hq == NULL) return (0); + if (n == -1) { + n = 0; + update = 1; + } + else + update = 0; + for (i = 0; n == 0 || i < n; i++) { evp = TAILQ_FIRST(&hq->q); if (evp == NULL) @@ -400,6 +411,8 @@ scheduler_ram_release(int type, uint64_t holdq, int n) * we could just schedule them directly. */ evp->state = RQ_EVPSTATE_PENDING; + if (update) + evp->flags |= RQ_ENVELOPE_UPDATE; sorted_insert(&ramqueue.q_pending, evp); } @@ -418,6 +431,7 @@ scheduler_ram_batch(int typemask, struct scheduler_batch *ret) struct evplist *q; struct rq_envelope *evp; size_t n; + int retry; currtime = time(NULL); @@ -433,6 +447,10 @@ scheduler_ram_batch(int typemask, struct scheduler_batch *ret) q = &ramqueue.q_expired; ret->type = SCHED_EXPIRE; } + else if (typemask & SCHED_UPDATE && TAILQ_FIRST(&ramqueue.q_update)) { + q = &ramqueue.q_update; + ret->type = SCHED_UPDATE; + } else if (typemask & SCHED_BOUNCE && TAILQ_FIRST(&ramqueue.q_bounce)) { q = &ramqueue.q_bounce; ret->type = SCHED_BOUNCE; @@ -472,6 +490,19 @@ scheduler_ram_batch(int typemask, struct scheduler_batch *ret) if (ret->type == SCHED_REMOVE || ret->type == SCHED_EXPIRE) rq_envelope_delete(&ramqueue, evp); + else if (ret->type == SCHED_UPDATE) { + + evp->flags &= ~RQ_ENVELOPE_UPDATE; + + /* XXX we can't really use scheduler_compute_schedule */ + retry = 0; + while ((evp->sched = evp->ctime + 800 * retry * retry / 2) <= currtime) + retry += 1; + + evp->state = RQ_EVPSTATE_PENDING; + if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) + sorted_insert(&ramqueue.q_pending, evp); + } else { TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry); evp->state = RQ_EVPSTATE_INFLIGHT; @@ -730,6 +761,7 @@ rq_queue_init(struct rq_queue *rq) TAILQ_INIT(&rq->q_mta); TAILQ_INIT(&rq->q_mda); TAILQ_INIT(&rq->q_bounce); + TAILQ_INIT(&rq->q_update); TAILQ_INIT(&rq->q_expired); TAILQ_INIT(&rq->q_removed); } @@ -799,6 +831,8 @@ rq_envelope_list(struct rq_queue *rq, struct rq_envelope *evp) return &rq->q_expired; if (evp->flags & RQ_ENVELOPE_REMOVED) return &rq->q_removed; + if (evp->flags & RQ_ENVELOPE_UPDATE) + return &rq->q_update; if (evp->type == D_MTA) return &rq->q_mta; if (evp->type == D_MDA) @@ -836,6 +870,9 @@ rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp) break; } + if (evp->flags & RQ_ENVELOPE_UPDATE) + q = &rq->q_update; + if (evp->state == RQ_EVPSTATE_HELD) { hq = tree_xget(&holdqs[evp->type], evp->holdq); TAILQ_REMOVE(&hq->q, evp, entry); diff --git a/smtpd/smtpd-api.h b/smtpd/smtpd-api.h index d52c6155..62f7e644 100644 --- a/smtpd/smtpd-api.h +++ b/smtpd/smtpd-api.h @@ -171,9 +171,10 @@ struct scheduler_info { #define SCHED_DELAY 0x01 #define SCHED_REMOVE 0x02 #define SCHED_EXPIRE 0x04 -#define SCHED_BOUNCE 0x08 -#define SCHED_MDA 0x10 -#define SCHED_MTA 0x20 +#define SCHED_UPDATE 0x08 +#define SCHED_BOUNCE 0x10 +#define SCHED_MDA 0x20 +#define SCHED_MTA 0x40 struct scheduler_batch { int type; |