aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGilles Chehade <gilles@poolp.org>2013-11-18 16:30:16 +0100
committerGilles Chehade <gilles@poolp.org>2013-11-18 16:30:16 +0100
commitc007d8d3a72a27515fed0d24720fd3ad7cb393c4 (patch)
tree5c6abe3409a28ab99401fd94301df0d20f857fef
parentMerge branch 'master' into portable (diff)
parentMerge branch 'master' of ssh://ssh.poolp.org/git/opensmtpd (diff)
downloadOpenSMTPD-c007d8d3a72a27515fed0d24720fd3ad7cb393c4.tar.xz
OpenSMTPD-c007d8d3a72a27515fed0d24720fd3ad7cb393c4.zip
Merge branch 'master' into portableopensmtpd-201311181634p1
-rw-r--r--smtpd/enqueue.c2
-rw-r--r--smtpd/parse.y38
-rw-r--r--smtpd/queue.c18
-rw-r--r--smtpd/scheduler.c8
-rw-r--r--smtpd/scheduler_api.c7
-rw-r--r--smtpd/scheduler_null.c4
-rw-r--r--smtpd/scheduler_proc.c4
-rw-r--r--smtpd/scheduler_ramqueue.c30
-rw-r--r--smtpd/smtpd-api.h6
-rw-r--r--smtpd/smtpd.conf.58
-rw-r--r--smtpd/smtpd.h4
11 files changed, 87 insertions, 42 deletions
diff --git a/smtpd/enqueue.c b/smtpd/enqueue.c
index e112abb0..ba4543d0 100644
--- a/smtpd/enqueue.c
+++ b/smtpd/enqueue.c
@@ -318,7 +318,7 @@ enqueue(int argc, char *argv[])
}
if (!msg.saw_user_agent)
send_line(fout, 0, "User-Agent: %s enqueuer (%s)\n",
- SMTPD_NAME, "Demoosh");
+ SMTPD_NAME, "Demoostik");
/* add separating newline */
if (noheader)
diff --git a/smtpd/parse.y b/smtpd/parse.y
index dbe42817..1a4e8498 100644
--- a/smtpd/parse.y
+++ b/smtpd/parse.y
@@ -147,10 +147,10 @@ typedef struct {
%}
-%token AS QUEUE COMPRESSION ENCRYPTION MAXMESSAGESIZE MAXMTADEFERRED MAXSCHEDULERINFLIGHT LISTEN ON ANY PORT EXPIRE
+%token AS QUEUE COMPRESSION ENCRYPTION MAXMESSAGESIZE MAXMTADEFERRED LISTEN ON ANY PORT EXPIRE
%token TABLE SECURE SMTPS CERTIFICATE DOMAIN BOUNCEWARN LIMIT INET4 INET6
%token RELAY BACKUP VIA DELIVER TO LMTP MAILDIR MBOX HOSTNAME HOSTNAMES
-%token ACCEPT REJECT INCLUDE ERROR MDA FROM FOR SOURCE MTA PKI
+%token ACCEPT REJECT INCLUDE ERROR MDA FROM FOR SOURCE MTA PKI SCHEDULER
%token ARROW AUTH TLS LOCAL VIRTUAL TAG TAGGED ALIAS FILTER FILTERCHAIN KEY CA DHPARAMS
%token AUTH_OPTIONAL TLS_REQUIRE USERBASE SENDER MASK_SOURCE VERIFY FORWARDONLY RECIPIENT
%token <v.string> STRING
@@ -262,7 +262,7 @@ bouncedelays : bouncedelays ',' bouncedelay
| /* EMPTY */
;
-opt_limit : INET4 {
+opt_limit_mta : INET4 {
limits->family = AF_INET;
}
| INET6 {
@@ -270,7 +270,7 @@ opt_limit : INET4 {
}
| STRING NUMBER {
if (!limit_mta_set(limits, $1, $2)) {
- yyerror("invalid limit keyword");
+ yyerror("invalid mta limit keyword: %s", $1);
free($1);
YYERROR;
}
@@ -278,7 +278,24 @@ opt_limit : INET4 {
}
;
-limits : opt_limit limits
+limits_mta : opt_limit_mta limits_mta
+ | /* empty */
+ ;
+
+opt_limit_scheduler : STRING NUMBER {
+ if (!strcmp($1, "max-inflight")) {
+ conf->sc_scheduler_max_inflight = $2;
+ }
+ else {
+ yyerror("invalid scheduler limit keyword: %s", $1);
+ free($1);
+ YYERROR;
+ }
+ free($1);
+ }
+ ;
+
+limits_scheduler: opt_limit_scheduler limits_scheduler
| /* empty */
;
@@ -540,9 +557,6 @@ main : BOUNCEWARN {
| MAXMTADEFERRED NUMBER {
conf->sc_mta_max_deferred = $2;
}
- | MAXSCHEDULERINFLIGHT NUMBER {
- conf->sc_scheduler_max_inflight = $2;
- }
| LIMIT MTA FOR DOMAIN STRING {
struct mta_limits *d;
@@ -554,10 +568,12 @@ main : BOUNCEWARN {
memmove(limits, d, sizeof(*limits));
}
free($5);
- } limits
+ } limits_mta
| LIMIT MTA {
limits = dict_get(conf->sc_limits_dict, "default");
- } limits
+ } limits_mta
+ | LIMIT SCHEDULER {
+ } limits_scheduler
| LISTEN {
bzero(&l, sizeof l);
bzero(&listen_opts, sizeof listen_opts);
@@ -1109,7 +1125,6 @@ lookup(char *s)
{ "mask-source", MASK_SOURCE },
{ "max-message-size", MAXMESSAGESIZE },
{ "max-mta-deferred", MAXMTADEFERRED },
- { "max-scheduler-inflight", MAXSCHEDULERINFLIGHT },
{ "mbox", MBOX },
{ "mda", MDA },
{ "mta", MTA },
@@ -1120,6 +1135,7 @@ lookup(char *s)
{ "recipient", RECIPIENT },
{ "reject", REJECT },
{ "relay", RELAY },
+ { "scheduler", SCHEDULER },
{ "secure", SECURE },
{ "sender", SENDER },
{ "smtps", SMTPS },
diff --git a/smtpd/queue.c b/smtpd/queue.c
index eb10052c..a4186e66 100644
--- a/smtpd/queue.c
+++ b/smtpd/queue.c
@@ -69,7 +69,7 @@ queue_imsg(struct mproc *p, struct imsg *imsg)
struct envelope evp;
struct msg m;
const char *reason;
- uint64_t reqid, evpid;
+ uint64_t reqid, evpid, holdq;
uint32_t msgid;
uint32_t penalty;
time_t nexttry;
@@ -419,10 +419,24 @@ queue_imsg(struct mproc *p, struct imsg *imsg)
return;
case IMSG_DELIVERY_HOLD:
- case IMSG_DELIVERY_RELEASE:
case IMSG_MTA_SCHEDULE:
m_forward(p_scheduler, imsg);
return;
+ case IMSG_DELIVERY_RELEASE:
+ m_msg(&m, imsg);
+ m_get_id(&m, &holdq);
+ m_get_int(&m, &v);
+ m_end(&m);
+
+ m_create(p_scheduler, IMSG_DELIVERY_RELEASE, 0, 0, -1);
+ if (p->proc == PROC_MTA)
+ m_add_int(p_scheduler, D_MTA);
+ else
+ m_add_int(p_scheduler, D_MDA);
+ m_add_id(p_scheduler, holdq);
+ m_add_int(p_scheduler, v);
+ m_close(p_scheduler);
+ return;
}
}
diff --git a/smtpd/scheduler.c b/smtpd/scheduler.c
index 6acf1bc4..442b4314 100644
--- a/smtpd/scheduler.c
+++ b/smtpd/scheduler.c
@@ -83,7 +83,7 @@ scheduler_imsg(struct mproc *p, struct imsg *imsg)
uint32_t penalty;
size_t n, i;
time_t timestamp;
- int v, r;
+ int v, r, type;
switch (imsg->hdr.type) {
@@ -231,12 +231,14 @@ scheduler_imsg(struct mproc *p, struct imsg *imsg)
case IMSG_DELIVERY_RELEASE:
m_msg(&m, imsg);
+ m_get_int(&m, &type);
m_get_id(&m, &holdq);
m_get_int(&m, &r);
m_end(&m);
log_trace(TRACE_SCHEDULER,
- "scheduler: releasing %d on holdq %016" PRIx64, r, holdq);
- backend->release(holdq, r);
+ "scheduler: releasing %d on holdq (%i, %016" PRIx64 ")",
+ r, type, holdq);
+ backend->release(type, holdq, r);
scheduler_reset_events();
return;
diff --git a/smtpd/scheduler_api.c b/smtpd/scheduler_api.c
index e77cffbd..76d92308 100644
--- a/smtpd/scheduler_api.c
+++ b/smtpd/scheduler_api.c
@@ -39,7 +39,7 @@ static size_t (*handler_rollback)(uint32_t);
static int (*handler_update)(struct scheduler_info *);
static int (*handler_delete)(uint64_t);
static int (*handler_hold)(uint64_t, uint64_t);
-static int (*handler_release)(uint64_t, int);
+static int (*handler_release)(int, uint64_t, int);
static int (*handler_batch)(int, struct scheduler_batch *);
static size_t (*handler_messages)(uint32_t, uint32_t *, size_t);
static size_t (*handler_envelopes)(uint64_t, struct evpstate *, size_t);
@@ -117,7 +117,7 @@ scheduler_msg_dispatch(void)
uint32_t msgids[MAX_BATCH_SIZE], version, msgid;
struct scheduler_info info;
struct scheduler_batch batch;
- int typemask, r;
+ int typemask, r, type;
switch (imsg.hdr.type) {
case PROC_SCHEDULER_INIT:
@@ -201,11 +201,12 @@ scheduler_msg_dispatch(void)
case PROC_SCHEDULER_RELEASE:
log_debug("scheduler-api: PROC_SCHEDULER_RELEASE");
+ scheduler_msg_get(&type, sizeof(type));
scheduler_msg_get(&u64, sizeof(u64));
scheduler_msg_get(&r, sizeof(r));
scheduler_msg_end();
- r = handler_release(u64, r);
+ r = handler_release(i, u64, r);
imsg_compose(&ibuf, PROC_SCHEDULER_OK, 0, 0, -1, &r, sizeof(r));
break;
diff --git a/smtpd/scheduler_null.c b/smtpd/scheduler_null.c
index a1b35630..dea51d22 100644
--- a/smtpd/scheduler_null.c
+++ b/smtpd/scheduler_null.c
@@ -39,7 +39,7 @@ static size_t scheduler_null_rollback(uint32_t);
static int scheduler_null_update(struct scheduler_info *);
static int scheduler_null_delete(uint64_t);
static int scheduler_null_hold(uint64_t, uint64_t);
-static int scheduler_null_release(uint64_t, int);
+static int scheduler_null_release(int, uint64_t, int);
static int scheduler_null_batch(int, struct scheduler_batch *);
static size_t scheduler_null_messages(uint32_t, uint32_t *, size_t);
static size_t scheduler_null_envelopes(uint64_t, struct evpstate *, size_t);
@@ -113,7 +113,7 @@ scheduler_null_hold(uint64_t evpid, uint64_t holdq)
}
static int
-scheduler_null_release(uint64_t holdq, int n)
+scheduler_null_release(int type, uint64_t holdq, int n)
{
return (0);
}
diff --git a/smtpd/scheduler_proc.c b/smtpd/scheduler_proc.c
index a43028fb..bcd6521b 100644
--- a/smtpd/scheduler_proc.c
+++ b/smtpd/scheduler_proc.c
@@ -274,7 +274,7 @@ scheduler_proc_hold(uint64_t evpid, uint64_t holdq)
}
static int
-scheduler_proc_release(uint64_t holdq, int n)
+scheduler_proc_release(int type, uint64_t holdq, int n)
{
struct ibuf *buf;
int r;
@@ -285,6 +285,8 @@ scheduler_proc_release(uint64_t holdq, int n)
sizeof(holdq) + sizeof(n));
if (buf == NULL)
return (-1);
+ if (imsg_add(buf, &type, sizeof(type)) == -1)
+ return (-1);
if (imsg_add(buf, &holdq, sizeof(holdq)) == -1)
return (-1);
if (imsg_add(buf, &n, sizeof(n)) == -1)
diff --git a/smtpd/scheduler_ramqueue.c b/smtpd/scheduler_ramqueue.c
index 452fe68a..d051294f 100644
--- a/smtpd/scheduler_ramqueue.c
+++ b/smtpd/scheduler_ramqueue.c
@@ -97,7 +97,7 @@ static size_t scheduler_ram_rollback(uint32_t);
static int scheduler_ram_update(struct scheduler_info *);
static int scheduler_ram_delete(uint64_t);
static int scheduler_ram_hold(uint64_t, uint64_t);
-static int scheduler_ram_release(uint64_t, int);
+static int scheduler_ram_release(int, uint64_t, int);
static int scheduler_ram_batch(int, struct scheduler_batch *);
static size_t scheduler_ram_messages(uint32_t, uint32_t *, size_t);
static size_t scheduler_ram_envelopes(uint64_t, struct evpstate *, size_t);
@@ -145,7 +145,7 @@ struct scheduler_backend scheduler_backend_ramqueue = {
static struct rq_queue ramqueue;
static struct tree updates;
-static struct tree holdqs;
+static struct tree holdqs[3]; /* delivery type */
static time_t currtime;
@@ -154,7 +154,9 @@ scheduler_ram_init(void)
{
rq_queue_init(&ramqueue);
tree_init(&updates);
- tree_init(&holdqs);
+ tree_init(&holdqs[D_MDA]);
+ tree_init(&holdqs[D_MTA]);
+ tree_init(&holdqs[D_BOUNCE]);
return (1);
}
@@ -351,11 +353,11 @@ scheduler_ram_hold(uint64_t evpid, uint64_t holdq)
return (0);
}
- hq = tree_get(&holdqs, holdq);
+ hq = tree_get(&holdqs[evp->type], holdq);
if (hq == NULL) {
hq = xcalloc(1, sizeof(*hq), "scheduler_hold");
TAILQ_INIT(&hq->q);
- tree_xset(&holdqs, holdq, hq);
+ tree_xset(&holdqs[evp->type], holdq, hq);
}
evp->state = RQ_EVPSTATE_HELD;
@@ -373,7 +375,7 @@ scheduler_ram_hold(uint64_t evpid, uint64_t holdq)
}
static int
-scheduler_ram_release(uint64_t holdq, int n)
+scheduler_ram_release(int type, uint64_t holdq, int n)
{
struct rq_holdq *hq;
struct rq_envelope *evp;
@@ -381,7 +383,7 @@ scheduler_ram_release(uint64_t holdq, int n)
currtime = time(NULL);
- hq = tree_get(&holdqs, holdq);
+ hq = tree_get(&holdqs[type], holdq);
if (hq == NULL)
return (0);
@@ -402,7 +404,7 @@ scheduler_ram_release(uint64_t holdq, int n)
}
if (TAILQ_EMPTY(&hq->q)) {
- tree_xpop(&holdqs, holdq);
+ tree_xpop(&holdqs[type], holdq);
free(hq);
}
stat_decrement("scheduler.ramqueue.hold", i);
@@ -835,10 +837,10 @@ rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp)
}
if (evp->state == RQ_EVPSTATE_HELD) {
- hq = tree_xget(&holdqs, evp->holdq);
+ hq = tree_xget(&holdqs[evp->type], evp->holdq);
TAILQ_REMOVE(&hq->q, evp, entry);
if (TAILQ_EMPTY(&hq->q)) {
- tree_xpop(&holdqs, evp->holdq);
+ tree_xpop(&holdqs[evp->type], evp->holdq);
free(hq);
}
evp->holdq = 0;
@@ -868,10 +870,10 @@ rq_envelope_remove(struct rq_queue *rq, struct rq_envelope *evp)
}
if (evp->state == RQ_EVPSTATE_HELD) {
- hq = tree_xget(&holdqs, evp->holdq);
+ hq = tree_xget(&holdqs[evp->type], evp->holdq);
TAILQ_REMOVE(&hq->q, evp, entry);
if (TAILQ_EMPTY(&hq->q)) {
- tree_xpop(&holdqs, evp->holdq);
+ tree_xpop(&holdqs[evp->type], evp->holdq);
free(hq);
}
evp->holdq = 0;
@@ -898,10 +900,10 @@ rq_envelope_suspend(struct rq_queue *rq, struct rq_envelope *evp)
return (0);
if (evp->state == RQ_EVPSTATE_HELD) {
- hq = tree_xget(&holdqs, evp->holdq);
+ hq = tree_xget(&holdqs[evp->type], evp->holdq);
TAILQ_REMOVE(&hq->q, evp, entry);
if (TAILQ_EMPTY(&hq->q)) {
- tree_xpop(&holdqs, evp->holdq);
+ tree_xpop(&holdqs[evp->type], evp->holdq);
free(hq);
}
evp->holdq = 0;
diff --git a/smtpd/smtpd-api.h b/smtpd/smtpd-api.h
index f20f4beb..d52c6155 100644
--- a/smtpd/smtpd-api.h
+++ b/smtpd/smtpd-api.h
@@ -237,8 +237,8 @@ void *dict_xget(struct dict *, const char *);
void *dict_pop(struct dict *, const char *);
void *dict_xpop(struct dict *, const char *);
int dict_poproot(struct dict *, void **);
-int dict_root(struct dict *, const char * *, void **);
-int dict_iter(struct dict *, void **, const char * *, void **);
+int dict_root(struct dict *, const char **, void **);
+int dict_iter(struct dict *, void **, const char **, void **);
int dict_iterfrom(struct dict *, void **, const char *, const char **, void **);
void dict_merge(struct dict *, struct dict *);
@@ -286,7 +286,7 @@ void scheduler_api_on_rollback(size_t(*)(uint32_t));
void scheduler_api_on_update(int(*)(struct scheduler_info *));
void scheduler_api_on_delete(int(*)(uint64_t));
void scheduler_api_on_hold(int(*)(uint64_t, uint64_t));
-void scheduler_api_on_release(int(*)(uint64_t, int));
+void scheduler_api_on_release(int(*)(int, uint64_t, int));
void scheduler_api_on_batch(int(*)(int, struct scheduler_batch *));
void scheduler_api_on_messages(size_t(*)(uint32_t, uint32_t *, size_t));
void scheduler_api_on_envelopes(size_t(*)(uint64_t, struct evpstate *, size_t));
diff --git a/smtpd/smtpd.conf.5 b/smtpd/smtpd.conf.5
index 7db43747..65183131 100644
--- a/smtpd/smtpd.conf.5
+++ b/smtpd/smtpd.conf.5
@@ -550,6 +550,14 @@ If a
is specified, the restriction only applies when connecting
to MXs for this domain.
.It Xo
+.Ic limit scheduler max-inflight
+.Ar num
+.Xc
+Suspend the scheduling of envelopes for deliver/relay until the number
+of inflight envelopes falls below
+.Ar num .
+Changing the default value might degrade performances.
+.It Xo
.Bk -words
.Ic listen on Ar interface
.Op Ar family
diff --git a/smtpd/smtpd.h b/smtpd/smtpd.h
index 9339a5f6..9e10e591 100644
--- a/smtpd/smtpd.h
+++ b/smtpd/smtpd.h
@@ -825,7 +825,7 @@ struct scheduler_backend {
int (*update)(struct scheduler_info *);
int (*delete)(uint64_t);
int (*hold)(uint64_t, uint64_t);
- int (*release)(uint64_t, int);
+ int (*release)(int, uint64_t, int);
int (*batch)(int, struct scheduler_batch *);
@@ -1000,8 +1000,8 @@ struct ca_cert_resp_msg {
};
struct ca_vrfy_req_msg {
- char pkiname[SMTPD_MAXHOSTNAMELEN];
uint64_t reqid;
+ char pkiname[SMTPD_MAXHOSTNAMELEN];
unsigned char *cert;
off_t cert_len;
size_t n_chain;