aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEric Faurot <eric@faurot.net>2014-05-12 16:39:42 +0200
committerEric Faurot <eric@faurot.net>2014-05-12 16:39:42 +0200
commit38da9ce4f8691774961a60b4159017e982a9fb15 (patch)
tree25463bd50e26db2e521597b4f45477dddac9faff
parentFix a possible double free when tls is required but not advertised by (diff)
downloadOpenSMTPD-38da9ce4f8691774961a60b4159017e982a9fb15.tar.xz
OpenSMTPD-38da9ce4f8691774961a60b4159017e982a9fb15.zip
plug the message content through the filter fd chain.opensmtpd-201405121641
-rw-r--r--smtpd/filter.c211
-rw-r--r--smtpd/smtp_session.c259
-rw-r--r--smtpd/smtpd.h1
3 files changed, 337 insertions, 134 deletions
diff --git a/smtpd/filter.c b/smtpd/filter.c
index 58ebab0b..3ef3ae8e 100644
--- a/smtpd/filter.c
+++ b/smtpd/filter.c
@@ -67,12 +67,22 @@ struct filter {
};
TAILQ_HEAD(filter_lst, filter);
+TAILQ_HEAD(filter_query_lst, filter_query);
struct filter_session {
- uint64_t id;
- int terminate;
- TAILQ_HEAD(filter_queries, filter_query) queries;
- struct filter_lst *filters;
- struct filter *fcurr;
+ uint64_t id;
+ int terminate;
+ struct filter_lst *filters;
+ struct filter *fcurr;
+
+ struct filter_query_lst queries;
+
+ struct io io;
+ struct iobuf iobuf;
+ FILE *ofile;
+ size_t datain;
+ size_t datalen;
+ int error;
+ struct filter_query *eom;
};
struct filter_query {
@@ -110,7 +120,10 @@ static void filter_imsg(struct mproc *, struct imsg *);
static struct filter_query *filter_query(struct filter_session *, int, int);
static void filter_drain_query(struct filter_query *);
static void filter_run_query(struct filter *, struct filter_query *);
+static void filter_end_query(struct filter_query *);
static void filter_set_fdout(struct filter_session *, int);
+static int filter_tx(struct filter_session *, int);
+static void filter_tx_io(struct io *, int);
static TAILQ_HEAD(, filter_proc) procs;
struct dict chains;
@@ -127,7 +140,6 @@ static const char * filterimsg_to_str(int);
struct tree sessions;
struct tree queries;
-
static void
filter_extend_chain(struct filter_lst *chain, const char *name)
{
@@ -138,7 +150,7 @@ filter_extend_chain(struct filter_lst *chain, const char *name)
fconf = dict_xget(&env->sc_filters, name);
if (fconf->chain) {
- log_debug("mfa: extending with \"%s\"", name);
+ log_debug("filter: extending with \"%s\"", name);
for (i = 0; i < MAX_FILTER_PER_CHAIN; i++) {
if (!fconf->filters[i][0])
break;
@@ -146,7 +158,7 @@ filter_extend_chain(struct filter_lst *chain, const char *name)
}
}
else {
- log_debug("mfa: adding filter \"%s\"", name);
+ log_debug("filter: adding filter \"%s\"", name);
n = xcalloc(1, sizeof(*n), "filter_extend_chain");
fchain = dict_get(&chains, name);
n->proc = TAILQ_FIRST(fchain)->proc;
@@ -173,7 +185,7 @@ filter_postfork(void)
TAILQ_INIT(&procs);
dict_init(&chains);
- log_debug("mfa: building simple chains...");
+ log_debug("filter: building simple chains...");
/* create all filter proc and associated chains */
iter = NULL;
@@ -181,7 +193,7 @@ filter_postfork(void)
if (filter->chain)
continue;
- log_debug("mfa: building simple chain \"%s\"", filter->name);
+ log_debug("filter: building simple chain \"%s\"", filter->name);
proc = xcalloc(1, sizeof(*proc), "filter_postfork");
p = &proc->mproc;
@@ -192,7 +204,7 @@ filter_postfork(void)
if (mproc_fork(p, filter->path, filter->name) < 0)
fatalx("filter_postfork");
- log_debug("mfa: registering proc \"%s\"", filter->name);
+ log_debug("filter: registering proc \"%s\"", filter->name);
f = xcalloc(1, sizeof(*f), "filter_postfork");
f->proc = proc;
@@ -205,7 +217,7 @@ filter_postfork(void)
filter->done = 1;
}
- log_debug("mfa: building complex chains...");
+ log_debug("filter: building complex chains...");
/* resolve all chains */
done = 0;
@@ -229,20 +241,20 @@ filter_postfork(void)
continue;
fchain = xcalloc(1, sizeof(*fchain), "filter_postfork");
TAILQ_INIT(fchain);
- log_debug("mfa: building chain \"%s\"...", filter->name);
+ log_debug("filter: building chain \"%s\"...", filter->name);
for (i = 0; i < MAX_FILTER_PER_CHAIN; i++) {
if (!filter->filters[i][0])
break;
filter_extend_chain(fchain, filter->filters[i]);
}
- log_debug("mfa: done building chain \"%s\"", filter->name);
+ log_debug("filter: done building chain \"%s\"", filter->name);
dict_xset(&chains, filter->name, fchain);
}
}
- log_debug("mfa: done building complex chains");
+ log_debug("filter: done building complex chains");
if (dict_get(&chains, "default") == NULL) {
- log_debug("mfa: done building default chain");
+ log_debug("filter: done building default chain");
fchain = xcalloc(1, sizeof(*fchain), "filter_postfork");
TAILQ_INIT(fchain);
dict_xset(&chains, "default", fchain);
@@ -284,6 +296,7 @@ filter_event(uint64_t id, int event)
s = xcalloc(1, sizeof(*s), "filter_event");
s->id = id;
s->filters = dict_xget(&chains, "default");
+ s->io.sock = -1;
TAILQ_INIT(&s->queries);
tree_xset(&sessions, s->id, s);
}
@@ -365,10 +378,11 @@ static void
filter_set_fdout(struct filter_session *s, int fdout)
{
struct mproc *p;
+ int fd;
while(s->fcurr) {
if (s->fcurr->proc->hooks & HOOK_DATALINE) {
- log_trace(TRACE_MFA, "mfa: sending fd %d to %s", fdout, filter_to_text(s->fcurr));
+ log_trace(TRACE_MFA, "filter: sending fd %d to %s", fdout, filter_to_text(s->fcurr));
p = &s->fcurr->proc->mproc;
m_create(p, IMSG_FILTER_PIPE_SETUP, 0, 0, fdout);
m_add_id(p, s->id);
@@ -378,18 +392,10 @@ filter_set_fdout(struct filter_session *s, int fdout)
s->fcurr = TAILQ_PREV(s->fcurr, filter_lst, entry);
}
- log_trace(TRACE_MFA, "mfa: chain input is %d", fdout);
-
+ log_trace(TRACE_MFA, "filter: chain input is %d", fdout);
-#if 0
- XXX finish
-
- m_create(p_smtp, IMSG_QUEUE_MESSAGE_FILE, 0, 0, fdout);
- m_add_id(p_smtp, s->id);
- m_add_int(p_smtp, 1);
- m_close(p_smtp);
- return;
-#endif
+ fd = filter_tx(s, fdout);
+ smtp_filter_fd(s->id, fd);
}
void
@@ -399,6 +405,7 @@ filter_build_fd_chain(uint64_t id, int fdout)
s = tree_xget(&sessions, id);
s->fcurr = TAILQ_LAST(s->filters, filter_lst);
+
filter_set_fdout(s, fdout);
}
@@ -460,7 +467,7 @@ filter_drain_query(struct filter_query *q)
* Do not move forward if the query ahead of us is
* waiting on this filter.
*/
- prev = TAILQ_PREV(q, filter_queries, entry);
+ prev = TAILQ_PREV(q, filter_query_lst, entry);
if (prev && prev->current == q->current) {
q->state = QUERY_WAITING;
log_trace(TRACE_MFA,
@@ -475,34 +482,13 @@ filter_drain_query(struct filter_query *q)
q->state = QUERY_DONE;
}
- TAILQ_REMOVE(&q->session->queries, q, entry);
-
- if (q->type == QT_QUERY) {
- log_trace(TRACE_MFA,
- "filter: query %016"PRIx64" done: "
- "status=%s code=%d response=\"%s\"",
- q->qid,
- status_to_str(q->smtp.status),
- q->smtp.code,
- q->smtp.response);
-
- /* ...and send the SMTP response */
- if (q->hook == QUERY_EOM) {
-/*
- smtp_filter_eom(q->session->id, q->u.datalen);
-*/
- smtp_filter_response(q->session->id, q->hook,
- q->smtp.status, q->smtp.code, q->smtp.response);
- }
- else {
- smtp_filter_response(q->session->id, q->hook,
- q->smtp.status, q->smtp.code, q->smtp.response);
- }
- free(q->smtp.response);
+ /* Defer the response if the file is not closed yet. */
+ if (q->hook == HOOK_EOM && q->session->ofile) {
+ q->session->eom = q;
+ return;
}
- log_trace(TRACE_MFA, "filter: freeing query %016" PRIx64, q->qid);
- free(q);
+ filter_end_query(q);
}
static void
@@ -556,6 +542,45 @@ filter_run_query(struct filter *f, struct filter_query *q)
}
static void
+filter_end_query(struct filter_query *q)
+{
+ struct filter_session *s = q->session;
+
+ if (q->type == QT_EVENT)
+ goto done;
+
+ if (q->hook == QUERY_EOM) {
+ if (s->error) {
+ smtp_filter_response(s->id, QUERY_EOM, FILTER_FAIL, 0, NULL);
+ free(q->smtp.response);
+ goto done;
+ }
+ else if (q->u.datalen != s->datain) {
+ log_warn("filter: datalen mismatch on session %" PRIx64
+ ": %zu/%zu", s->id, s->datain, q->u.datalen);
+ smtp_filter_response(s->id, QUERY_EOM, FILTER_FAIL, 0, NULL);
+ free(q->smtp.response);
+ goto done;
+ }
+ }
+
+ log_trace(TRACE_MFA,
+ "filter: query %016"PRIx64" done: "
+ "status=%s code=%d response=\"%s\"",
+ q->qid,
+ status_to_str(q->smtp.status),
+ q->smtp.code,
+ q->smtp.response);
+ smtp_filter_response(s->id, q->hook, q->smtp.status, q->smtp.code,
+ q->smtp.response);
+ free(q->smtp.response);
+
+ done:
+ TAILQ_REMOVE(&s->queries, q, entry);
+ free(q);
+}
+
+static void
filter_imsg(struct mproc *p, struct imsg *imsg)
{
struct filter_proc *proc = p->data;
@@ -617,7 +642,7 @@ filter_imsg(struct mproc *p, struct imsg *imsg)
q = tree_xpop(&queries, qid);
if (q->hook != qhook) {
- log_warnx("warn: mfa: hook mismatch %d != %d", q->hook, qhook);
+ log_warnx("warn: filter: hook mismatch %d != %d", q->hook, qhook);
fatalx("exiting");
}
q->smtp.status = status;
@@ -658,6 +683,80 @@ filter_imsg(struct mproc *p, struct imsg *imsg)
}
}
+static int
+filter_tx(struct filter_session *s, int fdout)
+{
+ int sp[2];
+
+ /* reset */
+ s->datain = 0;
+ s->datalen = 0;
+ s->eom = NULL;
+ s->error = 0;
+
+ if (socketpair(AF_UNIX, SOCK_STREAM, PF_UNSPEC, sp) == -1) {
+ log_warn("warn: filter: socketpair");
+ return (-1);
+ }
+
+ if ((s->ofile = fdopen(fdout, "w")) == NULL) {
+ log_warn("warn: filter: fdopen");
+ close(sp[0]);
+ close(sp[1]);
+ return (-1);
+ }
+
+ iobuf_init(&s->iobuf, 0, 0);
+ io_init(&s->io, sp[0], s, filter_tx_io, &s->iobuf);
+ io_set_read(&s->io);
+
+ return (sp[1]);
+}
+
+static void
+filter_tx_io(struct io *io, int evt)
+{
+ struct filter_session *s = io->arg;
+ size_t len, n;
+ char *data;
+
+ switch (evt) {
+ case IO_DATAIN:
+ data = iobuf_data(&s->iobuf);
+ len = iobuf_len(&s->iobuf);
+ log_debug("debug: filter: tx data (%zu) for req %016"PRIx64,
+ len, s->id);
+ n = fwrite(data, 1, len, s->ofile);
+ if (n != len) {
+ log_warnx("warn: filter_tx_io: fwrite %zu/%zu", n, len);
+ s->error = 1;
+ break;
+ }
+ s->datain += n;
+ iobuf_drop(&s->iobuf, n);
+ iobuf_normalize(&s->iobuf);
+ return;
+
+ case IO_DISCONNECTED:
+ log_debug("debug: filter: tx done for req %016"PRIx64, s->id);
+ break;
+
+ default:
+ log_warn("warn: filter_tx_io: bad evt (%i) for req %016"PRIx64, evt, s->id);
+ s->error = 1;
+ break;
+ }
+
+ io_clear(&s->io);
+ iobuf_clear(&s->iobuf);
+ fclose(s->ofile);
+ s->ofile = NULL;
+
+ /* deferred eom request */
+ if (s->eom)
+ filter_end_query(s->eom);
+}
+
static const char *
filter_query_to_text(struct filter_query *q)
{
diff --git a/smtpd/smtp_session.c b/smtpd/smtp_session.c
index 1e735677..80cdb61d 100644
--- a/smtpd/smtp_session.c
+++ b/smtpd/smtp_session.c
@@ -49,6 +49,8 @@
#define SMTP_KICK_CMD 5
#define SMTP_KICK_RCPTFAIL 50
+#define DATA_HIWAT 65535
+
enum smtp_phase {
PHASE_INIT = 0,
PHASE_SETUP,
@@ -76,8 +78,9 @@ enum session_flags {
SF_BOUNCE = 0x0010,
SF_KICK = 0x0020,
SF_VERIFIED = 0x0040,
- SF_FILTERCONN = 0x0080,
- SF_BADINPUT = 0x0100,
+ SF_BADINPUT = 0x0080,
+ SF_FILTERCONN = 0x0100,
+ SF_FILTERDATA = 0x0200,
};
enum message_flags {
@@ -139,9 +142,10 @@ struct smtp_session {
size_t rcptfail;
TAILQ_HEAD(, smtp_rcpt) rcpts;
-
size_t datalen;
- FILE *ofile;
+ struct iobuf dataiobuf;
+ struct io dataio;
+ int dataeom;
struct event pause;
};
@@ -159,6 +163,8 @@ static int smtp_lookup_servername(struct smtp_session *);
static void smtp_connected(struct smtp_session *);
static void smtp_send_banner(struct smtp_session *);
static void smtp_io(struct io *, int);
+static void smtp_data_io(struct io *, int);
+static void smtp_data_io_done(struct smtp_session *);
static void smtp_enter_state(struct smtp_session *, int);
static void smtp_reply(struct smtp_session *, char *, ...);
static void smtp_command(struct smtp_session *, char *);
@@ -207,8 +213,8 @@ static struct { int code; const char *cmd; } commands[] = {
static struct tree wait_lka_ptr;
static struct tree wait_lka_helo;
static struct tree wait_lka_rcpt;
-static struct tree wait_mfa;
-static struct tree wait_mfa_data;
+static struct tree wait_filter;
+static struct tree wait_filter_data;
static struct tree wait_parent_auth;
static struct tree wait_queue_msg;
static struct tree wait_queue_fd;
@@ -225,8 +231,8 @@ smtp_session_init(void)
tree_init(&wait_lka_ptr);
tree_init(&wait_lka_helo);
tree_init(&wait_lka_rcpt);
- tree_init(&wait_mfa);
- tree_init(&wait_mfa_data);
+ tree_init(&wait_filter);
+ tree_init(&wait_filter_data);
tree_init(&wait_parent_auth);
tree_init(&wait_queue_msg);
tree_init(&wait_queue_fd);
@@ -302,7 +308,6 @@ smtp_session_imsg(struct mproc *p, struct imsg *imsg)
uint64_t reqid, evpid;
uint32_t msgid;
int status, success, dnserror;
- X509 *x;
void *ssl_ctx;
switch (imsg->hdr.type) {
@@ -387,8 +392,7 @@ smtp_session_imsg(struct mproc *p, struct imsg *imsg)
m_end(&m);
s = tree_xpop(&wait_queue_fd, reqid);
- if (!success || imsg->fd == -1 ||
- (s->ofile = fdopen(imsg->fd, "w")) == NULL) {
+ if (!success || imsg->fd == -1) {
if (imsg->fd != -1)
close(imsg->fd);
smtp_reply(s, "421 %s: Temporary Error",
@@ -398,47 +402,10 @@ smtp_session_imsg(struct mproc *p, struct imsg *imsg)
return;
}
- fprintf(s->ofile, "Received: ");
- if (! (s->listener->flags & F_MASK_SOURCE)) {
- fprintf(s->ofile, "from %s (%s [%s]);\n\t",
- s->evp.helo,
- s->hostname,
- ss_to_text(&s->ss));
- }
- fprintf(s->ofile, "by %s (%s) with %sSMTP%s%s id %08x;\n",
- s->smtpname,
- SMTPD_NAME,
- s->flags & SF_EHLO ? "E" : "",
- s->flags & SF_SECURE ? "S" : "",
- s->flags & SF_AUTHENTICATED ? "A" : "",
- evpid_to_msgid(s->evp.id));
-
- if (s->flags & SF_SECURE) {
- x = SSL_get_peer_certificate(s->io.ssl);
- fprintf(s->ofile,
- "\tTLS version=%s cipher=%s bits=%d verify=%s;\n",
- SSL_get_cipher_version(s->io.ssl),
- SSL_get_cipher_name(s->io.ssl),
- SSL_get_cipher_bits(s->io.ssl, NULL),
- (s->flags & SF_VERIFIED) ? "YES" : (x ? "FAIL" : "NO"));
- if (x)
- X509_free(x);
- }
-
- if (s->rcptcount == 1) {
- fprintf(s->ofile, "\tfor <%s@%s>;\n",
- s->evp.rcpt.user,
- s->evp.rcpt.domain);
- }
-
- fprintf(s->ofile, "\t%s\n", time_to_text(time(NULL)));
-
- smtp_enter_state(s, STATE_BODY);
- smtp_reply(s, "354 Enter mail, end with \".\""
- " on a line by itself");
+ log_debug("smtp: %p: fd %d from queue", s, imsg->fd);
- tree_xset(&wait_mfa_data, s->id, s);
- io_reload(&s->io);
+ tree_xset(&wait_filter, s->id, s);
+ filter_build_fd_chain(s->id, imsg->fd);
return;
case IMSG_QUEUE_ENVELOPE_SUBMIT:
@@ -629,7 +596,7 @@ smtp_filter_response(uint64_t id, int query, int status, uint32_t code,
struct smtp_session *s;
struct ca_cert_req_msg req_ca_cert;
- s = tree_xpop(&wait_mfa, id);
+ s = tree_xpop(&wait_filter, id);
if (status == FILTER_CLOSE) {
code = code ? code : 421;
@@ -767,6 +734,78 @@ smtp_filter_response(uint64_t id, int query, int status, uint32_t code,
}
}
+void
+smtp_filter_fd(uint64_t id, int fd)
+{
+ struct smtp_session *s;
+ X509 *x;
+
+ s = tree_xpop(&wait_filter, id);
+
+ log_debug("smtp: %p: fd %d from filter", s, fd);
+
+ if (fd == -1) {
+ smtp_reply(s, "421 %s: Temporary Error",
+ esc_code(ESC_STATUS_TEMPFAIL, ESC_OTHER_MAIL_SYSTEM_STATUS));
+ smtp_enter_state(s, STATE_QUIT);
+ io_reload(&s->io);
+ return;
+ }
+
+ iobuf_init(&s->dataiobuf, 0, 0);
+ io_init(&s->dataio, fd, s, smtp_data_io, &s->dataiobuf);
+
+ iobuf_fqueue(&s->dataiobuf, "Received: ");
+ if (! (s->listener->flags & F_MASK_SOURCE)) {
+ iobuf_fqueue(&s->dataiobuf, "from %s (%s [%s]);\n\t",
+ s->evp.helo,
+ s->hostname,
+ ss_to_text(&s->ss));
+ }
+ iobuf_fqueue(&s->dataiobuf, "by %s (%s) with %sSMTP%s%s id %08x;\n",
+ s->smtpname,
+ SMTPD_NAME,
+ s->flags & SF_EHLO ? "E" : "",
+ s->flags & SF_SECURE ? "S" : "",
+ s->flags & SF_AUTHENTICATED ? "A" : "",
+ evpid_to_msgid(s->evp.id));
+
+ if (s->flags & SF_SECURE) {
+ x = SSL_get_peer_certificate(s->io.ssl);
+ iobuf_fqueue(&s->dataiobuf,
+ "\tTLS version=%s cipher=%s bits=%d verify=%s;\n",
+ SSL_get_cipher_version(s->io.ssl),
+ SSL_get_cipher_name(s->io.ssl),
+ SSL_get_cipher_bits(s->io.ssl, NULL),
+ (s->flags & SF_VERIFIED) ? "YES" : (x ? "FAIL" : "NO"));
+ if (x)
+ X509_free(x);
+ }
+
+ if (s->rcptcount == 1) {
+ iobuf_fqueue(&s->dataiobuf, "\tfor <%s@%s>;\n",
+ s->evp.rcpt.user,
+ s->evp.rcpt.domain);
+ }
+
+ iobuf_fqueue(&s->dataiobuf, "\t%s\n", time_to_text(time(NULL)));
+
+ /*
+ * XXX This is not exactly fair, since this is not really
+ * user data.
+ */
+ s->datalen = iobuf_queued(&s->dataiobuf);
+
+ io_set_write(&s->dataio);
+
+ smtp_enter_state(s, STATE_BODY);
+ smtp_reply(s, "354 Enter mail, end with \".\""
+ " on a line by itself");
+
+ tree_xset(&wait_filter_data, s->id, s);
+ io_reload(&s->io);
+}
+
static void
smtp_io(struct io *io, int evt)
{
@@ -856,6 +895,7 @@ smtp_io(struct io *io, int evt)
if (line[i] & 0x80)
line[i] = line[i] & 0x7f;
+ log_trace(TRACE_SMTP, "<<< [MSG] %s", line);
smtp_filter_dataline(s, line);
goto nextline;
}
@@ -873,10 +913,13 @@ smtp_io(struct io *io, int evt)
/* End of body */
if (s->state == STATE_BODY) {
+ log_trace(TRACE_SMTP, "<<< [EOM]");
iobuf_normalize(&s->iobuf);
io_set_write(io);
- smtp_filter_eom(s);
+ s->dataeom = 1;
+ if (iobuf_queued(&s->dataiobuf) == 0)
+ smtp_data_io_done(s);
return;
}
@@ -938,6 +981,69 @@ smtp_io(struct io *io, int evt)
}
static void
+smtp_data_io(struct io *io, int evt)
+{
+ struct smtp_session *s = io->arg;
+
+ log_trace(TRACE_IO, "smtp: %p (data): %s %s", s, io_strevent(evt),
+ io_strio(io));
+
+ switch (evt) {
+ case IO_TIMEOUT:
+ case IO_DISCONNECTED:
+ case IO_ERROR:
+ log_debug("debug: smtp: %p: io error on mfa", s);
+ io_clear(&s->dataio);
+ iobuf_clear(&s->dataiobuf);
+ s->msgflags |= MF_ERROR_IO;
+ if (s->io.flags & IO_PAUSE_IN) {
+ log_debug("debug: smtp: %p: resuming session after mfa error", s);
+ io_resume(&s->io, IO_PAUSE_IN);
+ }
+ break;
+
+ case IO_LOWAT:
+ if (s->dataeom && iobuf_queued(&s->dataiobuf) == 0) {
+ smtp_data_io_done(s);
+ } else if (s->io.flags & IO_PAUSE_IN) {
+ log_debug("debug: smtp: %p: filter congestion over: resuming session", s);
+ io_resume(&s->io, IO_PAUSE_IN);
+ }
+ break;
+
+ default:
+ fatalx("smtp_data_io()");
+ }
+}
+
+static void
+smtp_data_io_done(struct smtp_session *s)
+{
+ log_debug("debug: smtp: %p: data io done (%zu bytes)", s, s->datalen);
+ io_clear(&s->dataio);
+ iobuf_clear(&s->dataiobuf);
+
+ if (s->msgflags & MF_ERROR) {
+
+ smtp_filter_rollback(s);
+
+ m_create(p_queue, IMSG_SMTP_MESSAGE_ROLLBACK, 0, 0, -1);
+ m_add_msgid(p_queue, evpid_to_msgid(s->evp.id));
+ m_close(p_queue);
+ if (s->msgflags & MF_ERROR_SIZE)
+ smtp_reply(s, "554 Message too big");
+ else if (s->msgflags)
+ smtp_reply(s, "421 Internal server error");
+ smtp_message_reset(s, 0);
+ smtp_enter_state(s, STATE_HELO);
+ io_reload(&s->io);
+ }
+ else {
+ smtp_filter_eom(s);
+ }
+}
+
+static void
smtp_command(struct smtp_session *s, char *line)
{
char *args, *eom, *method;
@@ -1541,13 +1647,10 @@ smtp_message_end(struct smtp_session *s)
{
log_debug("debug: %p: end of message, msgflags=0x%04x", s, s->msgflags);
- tree_xpop(&wait_mfa_data, s->id);
+ tree_xpop(&wait_filter_data, s->id);
s->phase = PHASE_SETUP;
- fclose(s->ofile);
- s->ofile = NULL;
-
if (s->msgflags & MF_ERROR) {
m_create(p_queue, IMSG_SMTP_MESSAGE_ROLLBACK, 0, 0, -1);
m_add_msgid(p_queue, evpid_to_msgid(s->evp.id));
@@ -1646,10 +1749,12 @@ smtp_free(struct smtp_session *s, const char * reason)
log_debug("debug: smtp: %p: deleting session: %s", s, reason);
- tree_pop(&wait_mfa_data, s->id);
+ tree_pop(&wait_filter_data, s->id);
+/*
if (s->ofile)
fclose(s->ofile);
+*/
if (s->evp.id) {
m_create(p_queue, IMSG_SMTP_MESSAGE_ROLLBACK, 0, 0, -1);
@@ -1880,68 +1985,66 @@ smtp_filter_connect(struct smtp_session *s, struct sockaddr *sa)
{
filter_event(s->id, EVENT_CONNECT);
- tree_xset(&wait_mfa, s->id, s);
+ tree_xset(&wait_filter, s->id, s);
filter_connect(s->id, sa, (struct sockaddr *)&s->ss, s->hostname);
}
static void
smtp_filter_eom(struct smtp_session *s)
{
- tree_xset(&wait_mfa, s->id, s);
+ log_debug("smtp: %p: eom. datalen=%zu", s, s->datalen);
+ tree_xset(&wait_filter, s->id, s);
filter_eom(s->id, QUERY_EOM, s->datalen);
}
static void
smtp_filter_helo(struct smtp_session *s)
{
- tree_xset(&wait_mfa, s->id, s);
+ tree_xset(&wait_filter, s->id, s);
filter_line(s->id, QUERY_HELO, s->helo);
}
static void
smtp_filter_mail(struct smtp_session *s)
{
- tree_xset(&wait_mfa, s->id, s);
+ tree_xset(&wait_filter, s->id, s);
filter_mailaddr(s->id, QUERY_MAIL, &s->evp.sender);
}
static void
smtp_filter_rcpt(struct smtp_session *s)
{
- tree_xset(&wait_mfa, s->id, s);
+ tree_xset(&wait_filter, s->id, s);
filter_mailaddr(s->id, QUERY_RCPT, &s->evp.rcpt);
}
static void
smtp_filter_data(struct smtp_session *s)
{
- tree_xset(&wait_mfa, s->id, s);
+ tree_xset(&wait_filter, s->id, s);
filter_line(s->id, QUERY_DATA, NULL);
}
static void
smtp_filter_dataline(struct smtp_session *s, const char *line)
{
- size_t len;
-
- log_trace(TRACE_SMTP, "<<< [MSG] %s", line);
+ int n;
- /* Don't waste resources on message if it's going to bin anyway. */
+ /* ignore data line if an error flag is set */
if (s->msgflags & MF_ERROR)
return;
- len = strlen(line) + 1;
-
- if (s->datalen + len > env->sc_maxsize) {
- s->msgflags |= MF_ERROR_SIZE;
- return;
+ n = iobuf_fqueue(&s->dataiobuf, "%s\n", line);
+ if (n == -1) {
+ /* XXX */
+ fatalx("iobuf_fqueue");
}
- s->datalen += len;
-
- if (fprintf(s->ofile, "%s\n", line) != (int)len) {
- s->msgflags |= MF_ERROR_IO;
- return;
+ s->datalen += n;
+ if (iobuf_queued(&s->dataiobuf) > DATA_HIWAT && !(s->io.flags & IO_PAUSE_IN)) {
+ log_debug("debug: smtp: %p: filter congestion over: pausing session", s);
+ io_pause(&s->io, IO_PAUSE_IN);
}
+ io_reload(&s->dataio);
}
#define CASE(x) case x : return #x
diff --git a/smtpd/smtpd.h b/smtpd/smtpd.h
index 9a5e47b9..56e1e5ba 100644
--- a/smtpd/smtpd.h
+++ b/smtpd/smtpd.h
@@ -1312,6 +1312,7 @@ int smtp_session(struct listener *, int, const struct sockaddr_storage *,
const char *);
void smtp_session_imsg(struct mproc *, struct imsg *);
void smtp_filter_response(uint64_t, int, int, uint32_t, const char *);
+void smtp_filter_fd(uint64_t, int);
/* smtpd.c */
void imsg_dispatch(struct mproc *, struct imsg *);