diff options
Diffstat (limited to 'scheduler_ramqueue.c')
-rw-r--r-- | scheduler_ramqueue.c | 1202 |
1 files changed, 0 insertions, 1202 deletions
diff --git a/scheduler_ramqueue.c b/scheduler_ramqueue.c deleted file mode 100644 index 8d5efc10..00000000 --- a/scheduler_ramqueue.c +++ /dev/null @@ -1,1202 +0,0 @@ -/* $OpenBSD: scheduler_ramqueue.c,v 1.45 2018/05/31 21:06:12 gilles Exp $ */ - -/* - * Copyright (c) 2012 Gilles Chehade <gilles@poolp.org> - * Copyright (c) 2012 Eric Faurot <eric@openbsd.org> - * - * Permission to use, copy, modify, and distribute this software for any - * purpose with or without fee is hereby granted, provided that the above - * copyright notice and this permission notice appear in all copies. - * - * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - */ - -#include <sys/types.h> -#include <sys/queue.h> -#include <sys/tree.h> -#include <sys/socket.h> - -#include <ctype.h> -#include <err.h> -#include <event.h> -#include <fcntl.h> -#include <imsg.h> -#include <inttypes.h> -#include <stdio.h> -#include <stdlib.h> -#include <string.h> -#include <limits.h> -#include <time.h> - -#include "smtpd.h" -#include "log.h" - -TAILQ_HEAD(evplist, rq_envelope); - -struct rq_message { - uint32_t msgid; - struct tree envelopes; -}; - -struct rq_envelope { - TAILQ_ENTRY(rq_envelope) entry; - SPLAY_ENTRY(rq_envelope) t_entry; - - uint64_t evpid; - uint64_t holdq; - enum delivery_type type; - -#define RQ_EVPSTATE_PENDING 0 -#define RQ_EVPSTATE_SCHEDULED 1 -#define RQ_EVPSTATE_INFLIGHT 2 -#define RQ_EVPSTATE_HELD 3 - uint8_t state; - -#define RQ_ENVELOPE_EXPIRED 0x01 -#define RQ_ENVELOPE_REMOVED 0x02 -#define RQ_ENVELOPE_SUSPEND 0x04 -#define RQ_ENVELOPE_UPDATE 0x08 -#define RQ_ENVELOPE_OVERFLOW 0x10 - uint8_t flags; - - time_t ctime; - time_t sched; - time_t expire; - - struct rq_message *message; - - time_t t_inflight; - time_t t_scheduled; -}; - -struct rq_holdq { - struct evplist q; - size_t count; -}; - -struct rq_queue { - size_t evpcount; - struct tree messages; - SPLAY_HEAD(prioqtree, rq_envelope) q_priotree; - - struct evplist q_pending; - struct evplist q_inflight; - - struct evplist q_mta; - struct evplist q_mda; - struct evplist q_bounce; - struct evplist q_update; - struct evplist q_expired; - struct evplist q_removed; -}; - -static int rq_envelope_cmp(struct rq_envelope *, struct rq_envelope *); - -SPLAY_PROTOTYPE(prioqtree, rq_envelope, t_entry, rq_envelope_cmp); -static int scheduler_ram_init(const char *); -static int scheduler_ram_insert(struct scheduler_info *); -static size_t scheduler_ram_commit(uint32_t); -static size_t scheduler_ram_rollback(uint32_t); -static int scheduler_ram_update(struct scheduler_info *); -static int scheduler_ram_delete(uint64_t); -static int scheduler_ram_hold(uint64_t, uint64_t); -static int scheduler_ram_release(int, uint64_t, int); -static int scheduler_ram_batch(int, int *, size_t *, uint64_t *, int *); -static size_t scheduler_ram_messages(uint32_t, uint32_t *, size_t); -static size_t scheduler_ram_envelopes(uint64_t, struct evpstate *, size_t); -static int scheduler_ram_schedule(uint64_t); -static int scheduler_ram_remove(uint64_t); -static int scheduler_ram_suspend(uint64_t); -static int scheduler_ram_resume(uint64_t); -static int scheduler_ram_query(uint64_t); - -static void sorted_insert(struct rq_queue *, struct rq_envelope *); - -static void rq_queue_init(struct rq_queue *); -static void rq_queue_merge(struct rq_queue *, struct rq_queue *); -static void rq_queue_dump(struct rq_queue *, const char *); -static void rq_queue_schedule(struct rq_queue *rq); -static struct evplist *rq_envelope_list(struct rq_queue *, struct rq_envelope *); -static void rq_envelope_schedule(struct rq_queue *, struct rq_envelope *); -static int rq_envelope_remove(struct rq_queue *, struct rq_envelope *); -static int rq_envelope_suspend(struct rq_queue *, struct rq_envelope *); -static int rq_envelope_resume(struct rq_queue *, struct rq_envelope *); -static void rq_envelope_delete(struct rq_queue *, struct rq_envelope *); -static const char *rq_envelope_to_text(struct rq_envelope *); - -struct scheduler_backend scheduler_backend_ramqueue = { - scheduler_ram_init, - - scheduler_ram_insert, - scheduler_ram_commit, - scheduler_ram_rollback, - - scheduler_ram_update, - scheduler_ram_delete, - scheduler_ram_hold, - scheduler_ram_release, - - scheduler_ram_batch, - - scheduler_ram_messages, - scheduler_ram_envelopes, - scheduler_ram_schedule, - scheduler_ram_remove, - scheduler_ram_suspend, - scheduler_ram_resume, - scheduler_ram_query, -}; - -static struct rq_queue ramqueue; -static struct tree updates; -static struct tree holdqs[3]; /* delivery type */ - -static time_t currtime; - -#define BACKOFF_TRANSFER 400 -#define BACKOFF_DELIVERY 10 -#define BACKOFF_OVERFLOW 3 - -static time_t -scheduler_backoff(time_t t0, time_t base, uint32_t step) -{ - return (t0 + base * step * step); -} - -static time_t -scheduler_next(time_t t0, time_t base, uint32_t step) -{ - time_t t; - - /* XXX be more efficient */ - while ((t = scheduler_backoff(t0, base, step)) <= currtime) - step++; - - return (t); -} - -static int -scheduler_ram_init(const char *arg) -{ - rq_queue_init(&ramqueue); - tree_init(&updates); - tree_init(&holdqs[D_MDA]); - tree_init(&holdqs[D_MTA]); - tree_init(&holdqs[D_BOUNCE]); - - return (1); -} - -static int -scheduler_ram_insert(struct scheduler_info *si) -{ - struct rq_queue *update; - struct rq_message *message; - struct rq_envelope *envelope; - uint32_t msgid; - - currtime = time(NULL); - - msgid = evpid_to_msgid(si->evpid); - - /* find/prepare a ramqueue update */ - if ((update = tree_get(&updates, msgid)) == NULL) { - update = xcalloc(1, sizeof *update); - stat_increment("scheduler.ramqueue.update", 1); - rq_queue_init(update); - tree_xset(&updates, msgid, update); - } - - /* find/prepare the msgtree message in ramqueue update */ - if ((message = tree_get(&update->messages, msgid)) == NULL) { - message = xcalloc(1, sizeof *message); - message->msgid = msgid; - tree_init(&message->envelopes); - tree_xset(&update->messages, msgid, message); - stat_increment("scheduler.ramqueue.message", 1); - } - - /* create envelope in ramqueue message */ - envelope = xcalloc(1, sizeof *envelope); - envelope->evpid = si->evpid; - envelope->type = si->type; - envelope->message = message; - envelope->ctime = si->creation; - envelope->expire = si->creation + si->ttl; - envelope->sched = scheduler_backoff(si->creation, - (si->type == D_MTA) ? BACKOFF_TRANSFER : BACKOFF_DELIVERY, si->retry); - tree_xset(&message->envelopes, envelope->evpid, envelope); - - update->evpcount++; - stat_increment("scheduler.ramqueue.envelope", 1); - - envelope->state = RQ_EVPSTATE_PENDING; - TAILQ_INSERT_TAIL(&update->q_pending, envelope, entry); - - si->nexttry = envelope->sched; - - return (1); -} - -static size_t -scheduler_ram_commit(uint32_t msgid) -{ - struct rq_queue *update; - size_t r; - - currtime = time(NULL); - - update = tree_xpop(&updates, msgid); - r = update->evpcount; - - if (tracing & TRACE_SCHEDULER) - rq_queue_dump(update, "update to commit"); - - rq_queue_merge(&ramqueue, update); - - if (tracing & TRACE_SCHEDULER) - rq_queue_dump(&ramqueue, "resulting queue"); - - rq_queue_schedule(&ramqueue); - - free(update); - stat_decrement("scheduler.ramqueue.update", 1); - - return (r); -} - -static size_t -scheduler_ram_rollback(uint32_t msgid) -{ - struct rq_queue *update; - struct rq_envelope *evp; - size_t r; - - currtime = time(NULL); - - if ((update = tree_pop(&updates, msgid)) == NULL) - return (0); - r = update->evpcount; - - while ((evp = TAILQ_FIRST(&update->q_pending))) { - TAILQ_REMOVE(&update->q_pending, evp, entry); - rq_envelope_delete(update, evp); - } - - free(update); - stat_decrement("scheduler.ramqueue.update", 1); - - return (r); -} - -static int -scheduler_ram_update(struct scheduler_info *si) -{ - struct rq_message *msg; - struct rq_envelope *evp; - uint32_t msgid; - - currtime = time(NULL); - - msgid = evpid_to_msgid(si->evpid); - msg = tree_xget(&ramqueue.messages, msgid); - evp = tree_xget(&msg->envelopes, si->evpid); - - /* it *must* be in-flight */ - if (evp->state != RQ_EVPSTATE_INFLIGHT) - errx(1, "evp:%016" PRIx64 " not in-flight", si->evpid); - - TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry); - - /* - * If the envelope was removed while inflight, schedule it for - * removal immediately. - */ - if (evp->flags & RQ_ENVELOPE_REMOVED) { - TAILQ_INSERT_TAIL(&ramqueue.q_removed, evp, entry); - evp->state = RQ_EVPSTATE_SCHEDULED; - evp->t_scheduled = currtime; - return (1); - } - - evp->sched = scheduler_next(evp->ctime, - (si->type == D_MTA) ? BACKOFF_TRANSFER : BACKOFF_DELIVERY, si->retry); - - evp->state = RQ_EVPSTATE_PENDING; - if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) - sorted_insert(&ramqueue, evp); - - si->nexttry = evp->sched; - - return (1); -} - -static int -scheduler_ram_delete(uint64_t evpid) -{ - struct rq_message *msg; - struct rq_envelope *evp; - uint32_t msgid; - - currtime = time(NULL); - - msgid = evpid_to_msgid(evpid); - msg = tree_xget(&ramqueue.messages, msgid); - evp = tree_xget(&msg->envelopes, evpid); - - /* it *must* be in-flight */ - if (evp->state != RQ_EVPSTATE_INFLIGHT) - errx(1, "evp:%016" PRIx64 " not in-flight", evpid); - - TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry); - - rq_envelope_delete(&ramqueue, evp); - - return (1); -} - -#define HOLDQ_MAXSIZE 1000 - -static int -scheduler_ram_hold(uint64_t evpid, uint64_t holdq) -{ - struct rq_holdq *hq; - struct rq_message *msg; - struct rq_envelope *evp; - uint32_t msgid; - - currtime = time(NULL); - - msgid = evpid_to_msgid(evpid); - msg = tree_xget(&ramqueue.messages, msgid); - evp = tree_xget(&msg->envelopes, evpid); - - /* it *must* be in-flight */ - if (evp->state != RQ_EVPSTATE_INFLIGHT) - errx(1, "evp:%016" PRIx64 " not in-flight", evpid); - - TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry); - - /* If the envelope is suspended, just mark it as pending */ - if (evp->flags & RQ_ENVELOPE_SUSPEND) { - evp->state = RQ_EVPSTATE_PENDING; - return (0); - } - - hq = tree_get(&holdqs[evp->type], holdq); - if (hq == NULL) { - hq = xcalloc(1, sizeof(*hq)); - TAILQ_INIT(&hq->q); - tree_xset(&holdqs[evp->type], holdq, hq); - stat_increment("scheduler.ramqueue.holdq", 1); - } - - /* If the holdq is full, just "tempfail" the envelope */ - if (hq->count >= HOLDQ_MAXSIZE) { - evp->state = RQ_EVPSTATE_PENDING; - evp->flags |= RQ_ENVELOPE_UPDATE; - evp->flags |= RQ_ENVELOPE_OVERFLOW; - sorted_insert(&ramqueue, evp); - stat_increment("scheduler.ramqueue.hold-overflow", 1); - return (0); - } - - evp->state = RQ_EVPSTATE_HELD; - evp->holdq = holdq; - /* This is an optimization: upon release, the envelopes will be - * inserted in the pending queue from the first element to the last. - * Since elements already in the queue were received first, they - * were scheduled first, so they will be reinserted before the - * current element. - */ - TAILQ_INSERT_HEAD(&hq->q, evp, entry); - hq->count += 1; - stat_increment("scheduler.ramqueue.hold", 1); - - return (1); -} - -static int -scheduler_ram_release(int type, uint64_t holdq, int n) -{ - struct rq_holdq *hq; - struct rq_envelope *evp; - int i, update; - - currtime = time(NULL); - - hq = tree_get(&holdqs[type], holdq); - if (hq == NULL) - return (0); - - if (n == -1) { - n = 0; - update = 1; - } - else - update = 0; - - for (i = 0; n == 0 || i < n; i++) { - evp = TAILQ_FIRST(&hq->q); - if (evp == NULL) - break; - - TAILQ_REMOVE(&hq->q, evp, entry); - hq->count -= 1; - evp->holdq = 0; - - /* When released, all envelopes are put in the pending queue - * and will be rescheduled immediately. As an optimization, - * we could just schedule them directly. - */ - evp->state = RQ_EVPSTATE_PENDING; - if (update) - evp->flags |= RQ_ENVELOPE_UPDATE; - sorted_insert(&ramqueue, evp); - } - - if (TAILQ_EMPTY(&hq->q)) { - tree_xpop(&holdqs[type], holdq); - free(hq); - stat_decrement("scheduler.ramqueue.holdq", 1); - } - stat_decrement("scheduler.ramqueue.hold", i); - - return (i); -} - -static int -scheduler_ram_batch(int mask, int *delay, size_t *count, uint64_t *evpids, int *types) -{ - struct rq_envelope *evp; - size_t i, n; - time_t t; - - currtime = time(NULL); - - rq_queue_schedule(&ramqueue); - if (tracing & TRACE_SCHEDULER) - rq_queue_dump(&ramqueue, "scheduler_ram_batch()"); - - i = 0; - n = 0; - - for (;;) { - - if (mask & SCHED_REMOVE && (evp = TAILQ_FIRST(&ramqueue.q_removed))) { - TAILQ_REMOVE(&ramqueue.q_removed, evp, entry); - types[i] = SCHED_REMOVE; - evpids[i] = evp->evpid; - rq_envelope_delete(&ramqueue, evp); - - if (++i == *count) - break; - } - - if (mask & SCHED_EXPIRE && (evp = TAILQ_FIRST(&ramqueue.q_expired))) { - TAILQ_REMOVE(&ramqueue.q_expired, evp, entry); - types[i] = SCHED_EXPIRE; - evpids[i] = evp->evpid; - rq_envelope_delete(&ramqueue, evp); - - if (++i == *count) - break; - } - - if (mask & SCHED_UPDATE && (evp = TAILQ_FIRST(&ramqueue.q_update))) { - TAILQ_REMOVE(&ramqueue.q_update, evp, entry); - types[i] = SCHED_UPDATE; - evpids[i] = evp->evpid; - - if (evp->flags & RQ_ENVELOPE_OVERFLOW) - t = BACKOFF_OVERFLOW; - else if (evp->type == D_MTA) - t = BACKOFF_TRANSFER; - else - t = BACKOFF_DELIVERY; - - evp->sched = scheduler_next(evp->ctime, t, 0); - evp->flags &= ~(RQ_ENVELOPE_UPDATE|RQ_ENVELOPE_OVERFLOW); - evp->state = RQ_EVPSTATE_PENDING; - if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) - sorted_insert(&ramqueue, evp); - - if (++i == *count) - break; - } - - if (mask & SCHED_BOUNCE && (evp = TAILQ_FIRST(&ramqueue.q_bounce))) { - TAILQ_REMOVE(&ramqueue.q_bounce, evp, entry); - types[i] = SCHED_BOUNCE; - evpids[i] = evp->evpid; - - TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry); - evp->state = RQ_EVPSTATE_INFLIGHT; - evp->t_inflight = currtime; - - if (++i == *count) - break; - } - - if (mask & SCHED_MDA && (evp = TAILQ_FIRST(&ramqueue.q_mda))) { - TAILQ_REMOVE(&ramqueue.q_mda, evp, entry); - types[i] = SCHED_MDA; - evpids[i] = evp->evpid; - - TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry); - evp->state = RQ_EVPSTATE_INFLIGHT; - evp->t_inflight = currtime; - - if (++i == *count) - break; - } - - if (mask & SCHED_MTA && (evp = TAILQ_FIRST(&ramqueue.q_mta))) { - TAILQ_REMOVE(&ramqueue.q_mta, evp, entry); - types[i] = SCHED_MTA; - evpids[i] = evp->evpid; - - TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry); - evp->state = RQ_EVPSTATE_INFLIGHT; - evp->t_inflight = currtime; - - if (++i == *count) - break; - } - - /* nothing seen this round */ - if (i == n) - break; - - n = i; - } - - if (i) { - *count = i; - return (1); - } - - if ((evp = TAILQ_FIRST(&ramqueue.q_pending))) { - if (evp->sched < evp->expire) - t = evp->sched; - else - t = evp->expire; - *delay = (t < currtime) ? 0 : (t - currtime); - } - else - *delay = -1; - - return (0); -} - -static size_t -scheduler_ram_messages(uint32_t from, uint32_t *dst, size_t size) -{ - uint64_t id; - size_t n; - void *i; - - for (n = 0, i = NULL; n < size; n++) { - if (tree_iterfrom(&ramqueue.messages, &i, from, &id, NULL) == 0) - break; - dst[n] = id; - } - - return (n); -} - -static size_t -scheduler_ram_envelopes(uint64_t from, struct evpstate *dst, size_t size) -{ - struct rq_message *msg; - struct rq_envelope *evp; - void *i; - size_t n; - - if ((msg = tree_get(&ramqueue.messages, evpid_to_msgid(from))) == NULL) - return (0); - - for (n = 0, i = NULL; n < size; ) { - - if (tree_iterfrom(&msg->envelopes, &i, from, NULL, - (void**)&evp) == 0) - break; - - if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED)) - continue; - - dst[n].evpid = evp->evpid; - dst[n].flags = 0; - dst[n].retry = 0; - dst[n].time = 0; - - if (evp->state == RQ_EVPSTATE_PENDING) { - dst[n].time = evp->sched; - dst[n].flags = EF_PENDING; - } - else if (evp->state == RQ_EVPSTATE_SCHEDULED) { - dst[n].time = evp->t_scheduled; - dst[n].flags = EF_PENDING; - } - else if (evp->state == RQ_EVPSTATE_INFLIGHT) { - dst[n].time = evp->t_inflight; - dst[n].flags = EF_INFLIGHT; - } - else if (evp->state == RQ_EVPSTATE_HELD) { - /* same as scheduled */ - dst[n].time = evp->t_scheduled; - dst[n].flags = EF_PENDING; - dst[n].flags |= EF_HOLD; - } - if (evp->flags & RQ_ENVELOPE_SUSPEND) - dst[n].flags |= EF_SUSPEND; - - n++; - } - - return (n); -} - -static int -scheduler_ram_schedule(uint64_t evpid) -{ - struct rq_message *msg; - struct rq_envelope *evp; - uint32_t msgid; - void *i; - int r; - - currtime = time(NULL); - - if (evpid > 0xffffffff) { - msgid = evpid_to_msgid(evpid); - if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) - return (0); - if ((evp = tree_get(&msg->envelopes, evpid)) == NULL) - return (0); - if (evp->state == RQ_EVPSTATE_INFLIGHT) - return (0); - rq_envelope_schedule(&ramqueue, evp); - return (1); - } - else { - msgid = evpid; - if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) - return (0); - i = NULL; - r = 0; - while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) { - if (evp->state == RQ_EVPSTATE_INFLIGHT) - continue; - rq_envelope_schedule(&ramqueue, evp); - r++; - } - return (r); - } -} - -static int -scheduler_ram_remove(uint64_t evpid) -{ - struct rq_message *msg; - struct rq_envelope *evp; - uint32_t msgid; - void *i; - int r; - - currtime = time(NULL); - - if (evpid > 0xffffffff) { - msgid = evpid_to_msgid(evpid); - if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) - return (0); - if ((evp = tree_get(&msg->envelopes, evpid)) == NULL) - return (0); - if (rq_envelope_remove(&ramqueue, evp)) - return (1); - return (0); - } - else { - msgid = evpid; - if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) - return (0); - i = NULL; - r = 0; - while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) - if (rq_envelope_remove(&ramqueue, evp)) - r++; - return (r); - } -} - -static int -scheduler_ram_suspend(uint64_t evpid) -{ - struct rq_message *msg; - struct rq_envelope *evp; - uint32_t msgid; - void *i; - int r; - - currtime = time(NULL); - - if (evpid > 0xffffffff) { - msgid = evpid_to_msgid(evpid); - if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) - return (0); - if ((evp = tree_get(&msg->envelopes, evpid)) == NULL) - return (0); - if (rq_envelope_suspend(&ramqueue, evp)) - return (1); - return (0); - } - else { - msgid = evpid; - if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) - return (0); - i = NULL; - r = 0; - while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) - if (rq_envelope_suspend(&ramqueue, evp)) - r++; - return (r); - } -} - -static int -scheduler_ram_resume(uint64_t evpid) -{ - struct rq_message *msg; - struct rq_envelope *evp; - uint32_t msgid; - void *i; - int r; - - currtime = time(NULL); - - if (evpid > 0xffffffff) { - msgid = evpid_to_msgid(evpid); - if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) - return (0); - if ((evp = tree_get(&msg->envelopes, evpid)) == NULL) - return (0); - if (rq_envelope_resume(&ramqueue, evp)) - return (1); - return (0); - } - else { - msgid = evpid; - if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) - return (0); - i = NULL; - r = 0; - while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) - if (rq_envelope_resume(&ramqueue, evp)) - r++; - return (r); - } -} - -static int -scheduler_ram_query(uint64_t evpid) -{ - uint32_t msgid; - - if (evpid > 0xffffffff) - msgid = evpid_to_msgid(evpid); - else - msgid = evpid; - - if (tree_get(&ramqueue.messages, msgid) == NULL) - return (0); - - return (1); -} - -static void -sorted_insert(struct rq_queue *rq, struct rq_envelope *evp) -{ - struct rq_envelope *evp2; - - SPLAY_INSERT(prioqtree, &rq->q_priotree, evp); - evp2 = SPLAY_NEXT(prioqtree, &rq->q_priotree, evp); - if (evp2) - TAILQ_INSERT_BEFORE(evp2, evp, entry); - else - TAILQ_INSERT_TAIL(&rq->q_pending, evp, entry); -} - -static void -rq_queue_init(struct rq_queue *rq) -{ - memset(rq, 0, sizeof *rq); - tree_init(&rq->messages); - TAILQ_INIT(&rq->q_pending); - TAILQ_INIT(&rq->q_inflight); - TAILQ_INIT(&rq->q_mta); - TAILQ_INIT(&rq->q_mda); - TAILQ_INIT(&rq->q_bounce); - TAILQ_INIT(&rq->q_update); - TAILQ_INIT(&rq->q_expired); - TAILQ_INIT(&rq->q_removed); - SPLAY_INIT(&rq->q_priotree); -} - -static void -rq_queue_merge(struct rq_queue *rq, struct rq_queue *update) -{ - struct rq_message *message, *tomessage; - struct rq_envelope *envelope; - uint64_t id; - void *i; - - while (tree_poproot(&update->messages, &id, (void*)&message)) { - if ((tomessage = tree_get(&rq->messages, id)) == NULL) { - /* message does not exist. re-use structure */ - tree_xset(&rq->messages, id, message); - continue; - } - /* need to re-link all envelopes before merging them */ - i = NULL; - while ((tree_iter(&message->envelopes, &i, &id, - (void*)&envelope))) - envelope->message = tomessage; - tree_merge(&tomessage->envelopes, &message->envelopes); - free(message); - stat_decrement("scheduler.ramqueue.message", 1); - } - - /* Sorted insert in the pending queue */ - while ((envelope = TAILQ_FIRST(&update->q_pending))) { - TAILQ_REMOVE(&update->q_pending, envelope, entry); - sorted_insert(rq, envelope); - } - - rq->evpcount += update->evpcount; -} - -#define SCHEDULEMAX 1024 - -static void -rq_queue_schedule(struct rq_queue *rq) -{ - struct rq_envelope *evp; - size_t n; - - n = 0; - while ((evp = TAILQ_FIRST(&rq->q_pending))) { - if (evp->sched > currtime && evp->expire > currtime) - break; - - if (n == SCHEDULEMAX) - break; - - if (evp->state != RQ_EVPSTATE_PENDING) - errx(1, "evp:%016" PRIx64 " flags=0x%x", evp->evpid, - evp->flags); - - if (evp->expire <= currtime) { - TAILQ_REMOVE(&rq->q_pending, evp, entry); - SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp); - TAILQ_INSERT_TAIL(&rq->q_expired, evp, entry); - evp->state = RQ_EVPSTATE_SCHEDULED; - evp->flags |= RQ_ENVELOPE_EXPIRED; - evp->t_scheduled = currtime; - continue; - } - rq_envelope_schedule(rq, evp); - n += 1; - } -} - -static struct evplist * -rq_envelope_list(struct rq_queue *rq, struct rq_envelope *evp) -{ - switch (evp->state) { - case RQ_EVPSTATE_PENDING: - return &rq->q_pending; - - case RQ_EVPSTATE_SCHEDULED: - if (evp->flags & RQ_ENVELOPE_EXPIRED) - return &rq->q_expired; - if (evp->flags & RQ_ENVELOPE_REMOVED) - return &rq->q_removed; - if (evp->flags & RQ_ENVELOPE_UPDATE) - return &rq->q_update; - if (evp->type == D_MTA) - return &rq->q_mta; - if (evp->type == D_MDA) - return &rq->q_mda; - if (evp->type == D_BOUNCE) - return &rq->q_bounce; - errx(1, "%016" PRIx64 " bad evp type %d", evp->evpid, evp->type); - - case RQ_EVPSTATE_INFLIGHT: - return &rq->q_inflight; - - case RQ_EVPSTATE_HELD: - return (NULL); - } - - errx(1, "%016" PRIx64 " bad state %d", evp->evpid, evp->state); - return (NULL); -} - -static void -rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp) -{ - struct rq_holdq *hq; - struct evplist *q = NULL; - - switch (evp->type) { - case D_MTA: - q = &rq->q_mta; - break; - case D_MDA: - q = &rq->q_mda; - break; - case D_BOUNCE: - q = &rq->q_bounce; - break; - } - - if (evp->flags & RQ_ENVELOPE_UPDATE) - q = &rq->q_update; - - if (evp->state == RQ_EVPSTATE_HELD) { - hq = tree_xget(&holdqs[evp->type], evp->holdq); - TAILQ_REMOVE(&hq->q, evp, entry); - hq->count -= 1; - if (TAILQ_EMPTY(&hq->q)) { - tree_xpop(&holdqs[evp->type], evp->holdq); - free(hq); - } - evp->holdq = 0; - stat_decrement("scheduler.ramqueue.hold", 1); - } - else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) { - TAILQ_REMOVE(&rq->q_pending, evp, entry); - SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp); - } - - TAILQ_INSERT_TAIL(q, evp, entry); - evp->state = RQ_EVPSTATE_SCHEDULED; - evp->t_scheduled = currtime; -} - -static int -rq_envelope_remove(struct rq_queue *rq, struct rq_envelope *evp) -{ - struct rq_holdq *hq; - struct evplist *evl; - - if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED)) - return (0); - /* - * If envelope is inflight, mark it envelope for removal. - */ - if (evp->state == RQ_EVPSTATE_INFLIGHT) { - evp->flags |= RQ_ENVELOPE_REMOVED; - return (1); - } - - if (evp->state == RQ_EVPSTATE_HELD) { - hq = tree_xget(&holdqs[evp->type], evp->holdq); - TAILQ_REMOVE(&hq->q, evp, entry); - hq->count -= 1; - if (TAILQ_EMPTY(&hq->q)) { - tree_xpop(&holdqs[evp->type], evp->holdq); - free(hq); - } - evp->holdq = 0; - stat_decrement("scheduler.ramqueue.hold", 1); - } - else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) { - evl = rq_envelope_list(rq, evp); - TAILQ_REMOVE(evl, evp, entry); - if (evl == &rq->q_pending) - SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp); - } - - TAILQ_INSERT_TAIL(&rq->q_removed, evp, entry); - evp->state = RQ_EVPSTATE_SCHEDULED; - evp->flags |= RQ_ENVELOPE_REMOVED; - evp->t_scheduled = currtime; - - return (1); -} - -static int -rq_envelope_suspend(struct rq_queue *rq, struct rq_envelope *evp) -{ - struct rq_holdq *hq; - struct evplist *evl; - - if (evp->flags & RQ_ENVELOPE_SUSPEND) - return (0); - - if (evp->state == RQ_EVPSTATE_HELD) { - hq = tree_xget(&holdqs[evp->type], evp->holdq); - TAILQ_REMOVE(&hq->q, evp, entry); - hq->count -= 1; - if (TAILQ_EMPTY(&hq->q)) { - tree_xpop(&holdqs[evp->type], evp->holdq); - free(hq); - } - evp->holdq = 0; - evp->state = RQ_EVPSTATE_PENDING; - stat_decrement("scheduler.ramqueue.hold", 1); - } - else if (evp->state != RQ_EVPSTATE_INFLIGHT) { - evl = rq_envelope_list(rq, evp); - TAILQ_REMOVE(evl, evp, entry); - if (evl == &rq->q_pending) - SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp); - } - - evp->flags |= RQ_ENVELOPE_SUSPEND; - - return (1); -} - -static int -rq_envelope_resume(struct rq_queue *rq, struct rq_envelope *evp) -{ - struct evplist *evl; - - if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) - return (0); - - if (evp->state != RQ_EVPSTATE_INFLIGHT) { - evl = rq_envelope_list(rq, evp); - if (evl == &rq->q_pending) - sorted_insert(rq, evp); - else - TAILQ_INSERT_TAIL(evl, evp, entry); - } - - evp->flags &= ~RQ_ENVELOPE_SUSPEND; - - return (1); -} - -static void -rq_envelope_delete(struct rq_queue *rq, struct rq_envelope *evp) -{ - tree_xpop(&evp->message->envelopes, evp->evpid); - if (tree_empty(&evp->message->envelopes)) { - tree_xpop(&rq->messages, evp->message->msgid); - free(evp->message); - stat_decrement("scheduler.ramqueue.message", 1); - } - - free(evp); - rq->evpcount--; - stat_decrement("scheduler.ramqueue.envelope", 1); -} - -static const char * -rq_envelope_to_text(struct rq_envelope *e) -{ - static char buf[256]; - char t[64]; - - (void)snprintf(buf, sizeof buf, "evp:%016" PRIx64 " [", e->evpid); - - if (e->type == D_BOUNCE) - (void)strlcat(buf, "bounce", sizeof buf); - else if (e->type == D_MDA) - (void)strlcat(buf, "mda", sizeof buf); - else if (e->type == D_MTA) - (void)strlcat(buf, "mta", sizeof buf); - - (void)snprintf(t, sizeof t, ",expire=%s", - duration_to_text(e->expire - currtime)); - (void)strlcat(buf, t, sizeof buf); - - - switch (e->state) { - case RQ_EVPSTATE_PENDING: - (void)snprintf(t, sizeof t, ",pending=%s", - duration_to_text(e->sched - currtime)); - (void)strlcat(buf, t, sizeof buf); - break; - - case RQ_EVPSTATE_SCHEDULED: - (void)snprintf(t, sizeof t, ",scheduled=%s", - duration_to_text(currtime - e->t_scheduled)); - (void)strlcat(buf, t, sizeof buf); - break; - - case RQ_EVPSTATE_INFLIGHT: - (void)snprintf(t, sizeof t, ",inflight=%s", - duration_to_text(currtime - e->t_inflight)); - (void)strlcat(buf, t, sizeof buf); - break; - - case RQ_EVPSTATE_HELD: - (void)snprintf(t, sizeof t, ",held=%s", - duration_to_text(currtime - e->t_inflight)); - (void)strlcat(buf, t, sizeof buf); - break; - default: - errx(1, "%016" PRIx64 " bad state %d", e->evpid, e->state); - } - - if (e->flags & RQ_ENVELOPE_REMOVED) - (void)strlcat(buf, ",removed", sizeof buf); - if (e->flags & RQ_ENVELOPE_EXPIRED) - (void)strlcat(buf, ",expired", sizeof buf); - if (e->flags & RQ_ENVELOPE_SUSPEND) - (void)strlcat(buf, ",suspended", sizeof buf); - - (void)strlcat(buf, "]", sizeof buf); - - return (buf); -} - -static void -rq_queue_dump(struct rq_queue *rq, const char * name) -{ - struct rq_message *message; - struct rq_envelope *envelope; - void *i, *j; - uint64_t id; - - log_debug("debug: /--- ramqueue: %s", name); - - i = NULL; - while ((tree_iter(&rq->messages, &i, &id, (void*)&message))) { - log_debug("debug: | msg:%08" PRIx32, message->msgid); - j = NULL; - while ((tree_iter(&message->envelopes, &j, &id, - (void*)&envelope))) - log_debug("debug: | %s", - rq_envelope_to_text(envelope)); - } - log_debug("debug: \\---"); -} - -static int -rq_envelope_cmp(struct rq_envelope *e1, struct rq_envelope *e2) -{ - time_t ref1, ref2; - - ref1 = (e1->sched < e1->expire) ? e1->sched : e1->expire; - ref2 = (e2->sched < e2->expire) ? e2->sched : e2->expire; - if (ref1 != ref2) - return (ref1 < ref2) ? -1 : 1; - - if (e1->evpid != e2->evpid) - return (e1->evpid < e2->evpid) ? -1 : 1; - - return 0; -} - -SPLAY_GENERATE(prioqtree, rq_envelope, t_entry, rq_envelope_cmp); |