diff options
author | Eric Faurot <eric@faurot.net> | 2014-05-12 16:39:42 +0200 |
---|---|---|
committer | Eric Faurot <eric@faurot.net> | 2014-05-12 16:39:42 +0200 |
commit | 38da9ce4f8691774961a60b4159017e982a9fb15 (patch) | |
tree | 25463bd50e26db2e521597b4f45477dddac9faff | |
parent | Fix a possible double free when tls is required but not advertised by (diff) | |
download | OpenSMTPD-38da9ce4f8691774961a60b4159017e982a9fb15.tar.xz OpenSMTPD-38da9ce4f8691774961a60b4159017e982a9fb15.zip |
plug the message content through the filter fd chain.opensmtpd-201405121641
-rw-r--r-- | smtpd/filter.c | 211 | ||||
-rw-r--r-- | smtpd/smtp_session.c | 259 | ||||
-rw-r--r-- | smtpd/smtpd.h | 1 |
3 files changed, 337 insertions, 134 deletions
diff --git a/smtpd/filter.c b/smtpd/filter.c index 58ebab0b..3ef3ae8e 100644 --- a/smtpd/filter.c +++ b/smtpd/filter.c @@ -67,12 +67,22 @@ struct filter { }; TAILQ_HEAD(filter_lst, filter); +TAILQ_HEAD(filter_query_lst, filter_query); struct filter_session { - uint64_t id; - int terminate; - TAILQ_HEAD(filter_queries, filter_query) queries; - struct filter_lst *filters; - struct filter *fcurr; + uint64_t id; + int terminate; + struct filter_lst *filters; + struct filter *fcurr; + + struct filter_query_lst queries; + + struct io io; + struct iobuf iobuf; + FILE *ofile; + size_t datain; + size_t datalen; + int error; + struct filter_query *eom; }; struct filter_query { @@ -110,7 +120,10 @@ static void filter_imsg(struct mproc *, struct imsg *); static struct filter_query *filter_query(struct filter_session *, int, int); static void filter_drain_query(struct filter_query *); static void filter_run_query(struct filter *, struct filter_query *); +static void filter_end_query(struct filter_query *); static void filter_set_fdout(struct filter_session *, int); +static int filter_tx(struct filter_session *, int); +static void filter_tx_io(struct io *, int); static TAILQ_HEAD(, filter_proc) procs; struct dict chains; @@ -127,7 +140,6 @@ static const char * filterimsg_to_str(int); struct tree sessions; struct tree queries; - static void filter_extend_chain(struct filter_lst *chain, const char *name) { @@ -138,7 +150,7 @@ filter_extend_chain(struct filter_lst *chain, const char *name) fconf = dict_xget(&env->sc_filters, name); if (fconf->chain) { - log_debug("mfa: extending with \"%s\"", name); + log_debug("filter: extending with \"%s\"", name); for (i = 0; i < MAX_FILTER_PER_CHAIN; i++) { if (!fconf->filters[i][0]) break; @@ -146,7 +158,7 @@ filter_extend_chain(struct filter_lst *chain, const char *name) } } else { - log_debug("mfa: adding filter \"%s\"", name); + log_debug("filter: adding filter \"%s\"", name); n = xcalloc(1, sizeof(*n), "filter_extend_chain"); fchain = dict_get(&chains, name); n->proc = TAILQ_FIRST(fchain)->proc; @@ -173,7 +185,7 @@ filter_postfork(void) TAILQ_INIT(&procs); dict_init(&chains); - log_debug("mfa: building simple chains..."); + log_debug("filter: building simple chains..."); /* create all filter proc and associated chains */ iter = NULL; @@ -181,7 +193,7 @@ filter_postfork(void) if (filter->chain) continue; - log_debug("mfa: building simple chain \"%s\"", filter->name); + log_debug("filter: building simple chain \"%s\"", filter->name); proc = xcalloc(1, sizeof(*proc), "filter_postfork"); p = &proc->mproc; @@ -192,7 +204,7 @@ filter_postfork(void) if (mproc_fork(p, filter->path, filter->name) < 0) fatalx("filter_postfork"); - log_debug("mfa: registering proc \"%s\"", filter->name); + log_debug("filter: registering proc \"%s\"", filter->name); f = xcalloc(1, sizeof(*f), "filter_postfork"); f->proc = proc; @@ -205,7 +217,7 @@ filter_postfork(void) filter->done = 1; } - log_debug("mfa: building complex chains..."); + log_debug("filter: building complex chains..."); /* resolve all chains */ done = 0; @@ -229,20 +241,20 @@ filter_postfork(void) continue; fchain = xcalloc(1, sizeof(*fchain), "filter_postfork"); TAILQ_INIT(fchain); - log_debug("mfa: building chain \"%s\"...", filter->name); + log_debug("filter: building chain \"%s\"...", filter->name); for (i = 0; i < MAX_FILTER_PER_CHAIN; i++) { if (!filter->filters[i][0]) break; filter_extend_chain(fchain, filter->filters[i]); } - log_debug("mfa: done building chain \"%s\"", filter->name); + log_debug("filter: done building chain \"%s\"", filter->name); dict_xset(&chains, filter->name, fchain); } } - log_debug("mfa: done building complex chains"); + log_debug("filter: done building complex chains"); if (dict_get(&chains, "default") == NULL) { - log_debug("mfa: done building default chain"); + log_debug("filter: done building default chain"); fchain = xcalloc(1, sizeof(*fchain), "filter_postfork"); TAILQ_INIT(fchain); dict_xset(&chains, "default", fchain); @@ -284,6 +296,7 @@ filter_event(uint64_t id, int event) s = xcalloc(1, sizeof(*s), "filter_event"); s->id = id; s->filters = dict_xget(&chains, "default"); + s->io.sock = -1; TAILQ_INIT(&s->queries); tree_xset(&sessions, s->id, s); } @@ -365,10 +378,11 @@ static void filter_set_fdout(struct filter_session *s, int fdout) { struct mproc *p; + int fd; while(s->fcurr) { if (s->fcurr->proc->hooks & HOOK_DATALINE) { - log_trace(TRACE_MFA, "mfa: sending fd %d to %s", fdout, filter_to_text(s->fcurr)); + log_trace(TRACE_MFA, "filter: sending fd %d to %s", fdout, filter_to_text(s->fcurr)); p = &s->fcurr->proc->mproc; m_create(p, IMSG_FILTER_PIPE_SETUP, 0, 0, fdout); m_add_id(p, s->id); @@ -378,18 +392,10 @@ filter_set_fdout(struct filter_session *s, int fdout) s->fcurr = TAILQ_PREV(s->fcurr, filter_lst, entry); } - log_trace(TRACE_MFA, "mfa: chain input is %d", fdout); - + log_trace(TRACE_MFA, "filter: chain input is %d", fdout); -#if 0 - XXX finish - - m_create(p_smtp, IMSG_QUEUE_MESSAGE_FILE, 0, 0, fdout); - m_add_id(p_smtp, s->id); - m_add_int(p_smtp, 1); - m_close(p_smtp); - return; -#endif + fd = filter_tx(s, fdout); + smtp_filter_fd(s->id, fd); } void @@ -399,6 +405,7 @@ filter_build_fd_chain(uint64_t id, int fdout) s = tree_xget(&sessions, id); s->fcurr = TAILQ_LAST(s->filters, filter_lst); + filter_set_fdout(s, fdout); } @@ -460,7 +467,7 @@ filter_drain_query(struct filter_query *q) * Do not move forward if the query ahead of us is * waiting on this filter. */ - prev = TAILQ_PREV(q, filter_queries, entry); + prev = TAILQ_PREV(q, filter_query_lst, entry); if (prev && prev->current == q->current) { q->state = QUERY_WAITING; log_trace(TRACE_MFA, @@ -475,34 +482,13 @@ filter_drain_query(struct filter_query *q) q->state = QUERY_DONE; } - TAILQ_REMOVE(&q->session->queries, q, entry); - - if (q->type == QT_QUERY) { - log_trace(TRACE_MFA, - "filter: query %016"PRIx64" done: " - "status=%s code=%d response=\"%s\"", - q->qid, - status_to_str(q->smtp.status), - q->smtp.code, - q->smtp.response); - - /* ...and send the SMTP response */ - if (q->hook == QUERY_EOM) { -/* - smtp_filter_eom(q->session->id, q->u.datalen); -*/ - smtp_filter_response(q->session->id, q->hook, - q->smtp.status, q->smtp.code, q->smtp.response); - } - else { - smtp_filter_response(q->session->id, q->hook, - q->smtp.status, q->smtp.code, q->smtp.response); - } - free(q->smtp.response); + /* Defer the response if the file is not closed yet. */ + if (q->hook == HOOK_EOM && q->session->ofile) { + q->session->eom = q; + return; } - log_trace(TRACE_MFA, "filter: freeing query %016" PRIx64, q->qid); - free(q); + filter_end_query(q); } static void @@ -556,6 +542,45 @@ filter_run_query(struct filter *f, struct filter_query *q) } static void +filter_end_query(struct filter_query *q) +{ + struct filter_session *s = q->session; + + if (q->type == QT_EVENT) + goto done; + + if (q->hook == QUERY_EOM) { + if (s->error) { + smtp_filter_response(s->id, QUERY_EOM, FILTER_FAIL, 0, NULL); + free(q->smtp.response); + goto done; + } + else if (q->u.datalen != s->datain) { + log_warn("filter: datalen mismatch on session %" PRIx64 + ": %zu/%zu", s->id, s->datain, q->u.datalen); + smtp_filter_response(s->id, QUERY_EOM, FILTER_FAIL, 0, NULL); + free(q->smtp.response); + goto done; + } + } + + log_trace(TRACE_MFA, + "filter: query %016"PRIx64" done: " + "status=%s code=%d response=\"%s\"", + q->qid, + status_to_str(q->smtp.status), + q->smtp.code, + q->smtp.response); + smtp_filter_response(s->id, q->hook, q->smtp.status, q->smtp.code, + q->smtp.response); + free(q->smtp.response); + + done: + TAILQ_REMOVE(&s->queries, q, entry); + free(q); +} + +static void filter_imsg(struct mproc *p, struct imsg *imsg) { struct filter_proc *proc = p->data; @@ -617,7 +642,7 @@ filter_imsg(struct mproc *p, struct imsg *imsg) q = tree_xpop(&queries, qid); if (q->hook != qhook) { - log_warnx("warn: mfa: hook mismatch %d != %d", q->hook, qhook); + log_warnx("warn: filter: hook mismatch %d != %d", q->hook, qhook); fatalx("exiting"); } q->smtp.status = status; @@ -658,6 +683,80 @@ filter_imsg(struct mproc *p, struct imsg *imsg) } } +static int +filter_tx(struct filter_session *s, int fdout) +{ + int sp[2]; + + /* reset */ + s->datain = 0; + s->datalen = 0; + s->eom = NULL; + s->error = 0; + + if (socketpair(AF_UNIX, SOCK_STREAM, PF_UNSPEC, sp) == -1) { + log_warn("warn: filter: socketpair"); + return (-1); + } + + if ((s->ofile = fdopen(fdout, "w")) == NULL) { + log_warn("warn: filter: fdopen"); + close(sp[0]); + close(sp[1]); + return (-1); + } + + iobuf_init(&s->iobuf, 0, 0); + io_init(&s->io, sp[0], s, filter_tx_io, &s->iobuf); + io_set_read(&s->io); + + return (sp[1]); +} + +static void +filter_tx_io(struct io *io, int evt) +{ + struct filter_session *s = io->arg; + size_t len, n; + char *data; + + switch (evt) { + case IO_DATAIN: + data = iobuf_data(&s->iobuf); + len = iobuf_len(&s->iobuf); + log_debug("debug: filter: tx data (%zu) for req %016"PRIx64, + len, s->id); + n = fwrite(data, 1, len, s->ofile); + if (n != len) { + log_warnx("warn: filter_tx_io: fwrite %zu/%zu", n, len); + s->error = 1; + break; + } + s->datain += n; + iobuf_drop(&s->iobuf, n); + iobuf_normalize(&s->iobuf); + return; + + case IO_DISCONNECTED: + log_debug("debug: filter: tx done for req %016"PRIx64, s->id); + break; + + default: + log_warn("warn: filter_tx_io: bad evt (%i) for req %016"PRIx64, evt, s->id); + s->error = 1; + break; + } + + io_clear(&s->io); + iobuf_clear(&s->iobuf); + fclose(s->ofile); + s->ofile = NULL; + + /* deferred eom request */ + if (s->eom) + filter_end_query(s->eom); +} + static const char * filter_query_to_text(struct filter_query *q) { diff --git a/smtpd/smtp_session.c b/smtpd/smtp_session.c index 1e735677..80cdb61d 100644 --- a/smtpd/smtp_session.c +++ b/smtpd/smtp_session.c @@ -49,6 +49,8 @@ #define SMTP_KICK_CMD 5 #define SMTP_KICK_RCPTFAIL 50 +#define DATA_HIWAT 65535 + enum smtp_phase { PHASE_INIT = 0, PHASE_SETUP, @@ -76,8 +78,9 @@ enum session_flags { SF_BOUNCE = 0x0010, SF_KICK = 0x0020, SF_VERIFIED = 0x0040, - SF_FILTERCONN = 0x0080, - SF_BADINPUT = 0x0100, + SF_BADINPUT = 0x0080, + SF_FILTERCONN = 0x0100, + SF_FILTERDATA = 0x0200, }; enum message_flags { @@ -139,9 +142,10 @@ struct smtp_session { size_t rcptfail; TAILQ_HEAD(, smtp_rcpt) rcpts; - size_t datalen; - FILE *ofile; + struct iobuf dataiobuf; + struct io dataio; + int dataeom; struct event pause; }; @@ -159,6 +163,8 @@ static int smtp_lookup_servername(struct smtp_session *); static void smtp_connected(struct smtp_session *); static void smtp_send_banner(struct smtp_session *); static void smtp_io(struct io *, int); +static void smtp_data_io(struct io *, int); +static void smtp_data_io_done(struct smtp_session *); static void smtp_enter_state(struct smtp_session *, int); static void smtp_reply(struct smtp_session *, char *, ...); static void smtp_command(struct smtp_session *, char *); @@ -207,8 +213,8 @@ static struct { int code; const char *cmd; } commands[] = { static struct tree wait_lka_ptr; static struct tree wait_lka_helo; static struct tree wait_lka_rcpt; -static struct tree wait_mfa; -static struct tree wait_mfa_data; +static struct tree wait_filter; +static struct tree wait_filter_data; static struct tree wait_parent_auth; static struct tree wait_queue_msg; static struct tree wait_queue_fd; @@ -225,8 +231,8 @@ smtp_session_init(void) tree_init(&wait_lka_ptr); tree_init(&wait_lka_helo); tree_init(&wait_lka_rcpt); - tree_init(&wait_mfa); - tree_init(&wait_mfa_data); + tree_init(&wait_filter); + tree_init(&wait_filter_data); tree_init(&wait_parent_auth); tree_init(&wait_queue_msg); tree_init(&wait_queue_fd); @@ -302,7 +308,6 @@ smtp_session_imsg(struct mproc *p, struct imsg *imsg) uint64_t reqid, evpid; uint32_t msgid; int status, success, dnserror; - X509 *x; void *ssl_ctx; switch (imsg->hdr.type) { @@ -387,8 +392,7 @@ smtp_session_imsg(struct mproc *p, struct imsg *imsg) m_end(&m); s = tree_xpop(&wait_queue_fd, reqid); - if (!success || imsg->fd == -1 || - (s->ofile = fdopen(imsg->fd, "w")) == NULL) { + if (!success || imsg->fd == -1) { if (imsg->fd != -1) close(imsg->fd); smtp_reply(s, "421 %s: Temporary Error", @@ -398,47 +402,10 @@ smtp_session_imsg(struct mproc *p, struct imsg *imsg) return; } - fprintf(s->ofile, "Received: "); - if (! (s->listener->flags & F_MASK_SOURCE)) { - fprintf(s->ofile, "from %s (%s [%s]);\n\t", - s->evp.helo, - s->hostname, - ss_to_text(&s->ss)); - } - fprintf(s->ofile, "by %s (%s) with %sSMTP%s%s id %08x;\n", - s->smtpname, - SMTPD_NAME, - s->flags & SF_EHLO ? "E" : "", - s->flags & SF_SECURE ? "S" : "", - s->flags & SF_AUTHENTICATED ? "A" : "", - evpid_to_msgid(s->evp.id)); - - if (s->flags & SF_SECURE) { - x = SSL_get_peer_certificate(s->io.ssl); - fprintf(s->ofile, - "\tTLS version=%s cipher=%s bits=%d verify=%s;\n", - SSL_get_cipher_version(s->io.ssl), - SSL_get_cipher_name(s->io.ssl), - SSL_get_cipher_bits(s->io.ssl, NULL), - (s->flags & SF_VERIFIED) ? "YES" : (x ? "FAIL" : "NO")); - if (x) - X509_free(x); - } - - if (s->rcptcount == 1) { - fprintf(s->ofile, "\tfor <%s@%s>;\n", - s->evp.rcpt.user, - s->evp.rcpt.domain); - } - - fprintf(s->ofile, "\t%s\n", time_to_text(time(NULL))); - - smtp_enter_state(s, STATE_BODY); - smtp_reply(s, "354 Enter mail, end with \".\"" - " on a line by itself"); + log_debug("smtp: %p: fd %d from queue", s, imsg->fd); - tree_xset(&wait_mfa_data, s->id, s); - io_reload(&s->io); + tree_xset(&wait_filter, s->id, s); + filter_build_fd_chain(s->id, imsg->fd); return; case IMSG_QUEUE_ENVELOPE_SUBMIT: @@ -629,7 +596,7 @@ smtp_filter_response(uint64_t id, int query, int status, uint32_t code, struct smtp_session *s; struct ca_cert_req_msg req_ca_cert; - s = tree_xpop(&wait_mfa, id); + s = tree_xpop(&wait_filter, id); if (status == FILTER_CLOSE) { code = code ? code : 421; @@ -767,6 +734,78 @@ smtp_filter_response(uint64_t id, int query, int status, uint32_t code, } } +void +smtp_filter_fd(uint64_t id, int fd) +{ + struct smtp_session *s; + X509 *x; + + s = tree_xpop(&wait_filter, id); + + log_debug("smtp: %p: fd %d from filter", s, fd); + + if (fd == -1) { + smtp_reply(s, "421 %s: Temporary Error", + esc_code(ESC_STATUS_TEMPFAIL, ESC_OTHER_MAIL_SYSTEM_STATUS)); + smtp_enter_state(s, STATE_QUIT); + io_reload(&s->io); + return; + } + + iobuf_init(&s->dataiobuf, 0, 0); + io_init(&s->dataio, fd, s, smtp_data_io, &s->dataiobuf); + + iobuf_fqueue(&s->dataiobuf, "Received: "); + if (! (s->listener->flags & F_MASK_SOURCE)) { + iobuf_fqueue(&s->dataiobuf, "from %s (%s [%s]);\n\t", + s->evp.helo, + s->hostname, + ss_to_text(&s->ss)); + } + iobuf_fqueue(&s->dataiobuf, "by %s (%s) with %sSMTP%s%s id %08x;\n", + s->smtpname, + SMTPD_NAME, + s->flags & SF_EHLO ? "E" : "", + s->flags & SF_SECURE ? "S" : "", + s->flags & SF_AUTHENTICATED ? "A" : "", + evpid_to_msgid(s->evp.id)); + + if (s->flags & SF_SECURE) { + x = SSL_get_peer_certificate(s->io.ssl); + iobuf_fqueue(&s->dataiobuf, + "\tTLS version=%s cipher=%s bits=%d verify=%s;\n", + SSL_get_cipher_version(s->io.ssl), + SSL_get_cipher_name(s->io.ssl), + SSL_get_cipher_bits(s->io.ssl, NULL), + (s->flags & SF_VERIFIED) ? "YES" : (x ? "FAIL" : "NO")); + if (x) + X509_free(x); + } + + if (s->rcptcount == 1) { + iobuf_fqueue(&s->dataiobuf, "\tfor <%s@%s>;\n", + s->evp.rcpt.user, + s->evp.rcpt.domain); + } + + iobuf_fqueue(&s->dataiobuf, "\t%s\n", time_to_text(time(NULL))); + + /* + * XXX This is not exactly fair, since this is not really + * user data. + */ + s->datalen = iobuf_queued(&s->dataiobuf); + + io_set_write(&s->dataio); + + smtp_enter_state(s, STATE_BODY); + smtp_reply(s, "354 Enter mail, end with \".\"" + " on a line by itself"); + + tree_xset(&wait_filter_data, s->id, s); + io_reload(&s->io); +} + static void smtp_io(struct io *io, int evt) { @@ -856,6 +895,7 @@ smtp_io(struct io *io, int evt) if (line[i] & 0x80) line[i] = line[i] & 0x7f; + log_trace(TRACE_SMTP, "<<< [MSG] %s", line); smtp_filter_dataline(s, line); goto nextline; } @@ -873,10 +913,13 @@ smtp_io(struct io *io, int evt) /* End of body */ if (s->state == STATE_BODY) { + log_trace(TRACE_SMTP, "<<< [EOM]"); iobuf_normalize(&s->iobuf); io_set_write(io); - smtp_filter_eom(s); + s->dataeom = 1; + if (iobuf_queued(&s->dataiobuf) == 0) + smtp_data_io_done(s); return; } @@ -938,6 +981,69 @@ smtp_io(struct io *io, int evt) } static void +smtp_data_io(struct io *io, int evt) +{ + struct smtp_session *s = io->arg; + + log_trace(TRACE_IO, "smtp: %p (data): %s %s", s, io_strevent(evt), + io_strio(io)); + + switch (evt) { + case IO_TIMEOUT: + case IO_DISCONNECTED: + case IO_ERROR: + log_debug("debug: smtp: %p: io error on mfa", s); + io_clear(&s->dataio); + iobuf_clear(&s->dataiobuf); + s->msgflags |= MF_ERROR_IO; + if (s->io.flags & IO_PAUSE_IN) { + log_debug("debug: smtp: %p: resuming session after mfa error", s); + io_resume(&s->io, IO_PAUSE_IN); + } + break; + + case IO_LOWAT: + if (s->dataeom && iobuf_queued(&s->dataiobuf) == 0) { + smtp_data_io_done(s); + } else if (s->io.flags & IO_PAUSE_IN) { + log_debug("debug: smtp: %p: filter congestion over: resuming session", s); + io_resume(&s->io, IO_PAUSE_IN); + } + break; + + default: + fatalx("smtp_data_io()"); + } +} + +static void +smtp_data_io_done(struct smtp_session *s) +{ + log_debug("debug: smtp: %p: data io done (%zu bytes)", s, s->datalen); + io_clear(&s->dataio); + iobuf_clear(&s->dataiobuf); + + if (s->msgflags & MF_ERROR) { + + smtp_filter_rollback(s); + + m_create(p_queue, IMSG_SMTP_MESSAGE_ROLLBACK, 0, 0, -1); + m_add_msgid(p_queue, evpid_to_msgid(s->evp.id)); + m_close(p_queue); + if (s->msgflags & MF_ERROR_SIZE) + smtp_reply(s, "554 Message too big"); + else if (s->msgflags) + smtp_reply(s, "421 Internal server error"); + smtp_message_reset(s, 0); + smtp_enter_state(s, STATE_HELO); + io_reload(&s->io); + } + else { + smtp_filter_eom(s); + } +} + +static void smtp_command(struct smtp_session *s, char *line) { char *args, *eom, *method; @@ -1541,13 +1647,10 @@ smtp_message_end(struct smtp_session *s) { log_debug("debug: %p: end of message, msgflags=0x%04x", s, s->msgflags); - tree_xpop(&wait_mfa_data, s->id); + tree_xpop(&wait_filter_data, s->id); s->phase = PHASE_SETUP; - fclose(s->ofile); - s->ofile = NULL; - if (s->msgflags & MF_ERROR) { m_create(p_queue, IMSG_SMTP_MESSAGE_ROLLBACK, 0, 0, -1); m_add_msgid(p_queue, evpid_to_msgid(s->evp.id)); @@ -1646,10 +1749,12 @@ smtp_free(struct smtp_session *s, const char * reason) log_debug("debug: smtp: %p: deleting session: %s", s, reason); - tree_pop(&wait_mfa_data, s->id); + tree_pop(&wait_filter_data, s->id); +/* if (s->ofile) fclose(s->ofile); +*/ if (s->evp.id) { m_create(p_queue, IMSG_SMTP_MESSAGE_ROLLBACK, 0, 0, -1); @@ -1880,68 +1985,66 @@ smtp_filter_connect(struct smtp_session *s, struct sockaddr *sa) { filter_event(s->id, EVENT_CONNECT); - tree_xset(&wait_mfa, s->id, s); + tree_xset(&wait_filter, s->id, s); filter_connect(s->id, sa, (struct sockaddr *)&s->ss, s->hostname); } static void smtp_filter_eom(struct smtp_session *s) { - tree_xset(&wait_mfa, s->id, s); + log_debug("smtp: %p: eom. datalen=%zu", s, s->datalen); + tree_xset(&wait_filter, s->id, s); filter_eom(s->id, QUERY_EOM, s->datalen); } static void smtp_filter_helo(struct smtp_session *s) { - tree_xset(&wait_mfa, s->id, s); + tree_xset(&wait_filter, s->id, s); filter_line(s->id, QUERY_HELO, s->helo); } static void smtp_filter_mail(struct smtp_session *s) { - tree_xset(&wait_mfa, s->id, s); + tree_xset(&wait_filter, s->id, s); filter_mailaddr(s->id, QUERY_MAIL, &s->evp.sender); } static void smtp_filter_rcpt(struct smtp_session *s) { - tree_xset(&wait_mfa, s->id, s); + tree_xset(&wait_filter, s->id, s); filter_mailaddr(s->id, QUERY_RCPT, &s->evp.rcpt); } static void smtp_filter_data(struct smtp_session *s) { - tree_xset(&wait_mfa, s->id, s); + tree_xset(&wait_filter, s->id, s); filter_line(s->id, QUERY_DATA, NULL); } static void smtp_filter_dataline(struct smtp_session *s, const char *line) { - size_t len; - - log_trace(TRACE_SMTP, "<<< [MSG] %s", line); + int n; - /* Don't waste resources on message if it's going to bin anyway. */ + /* ignore data line if an error flag is set */ if (s->msgflags & MF_ERROR) return; - len = strlen(line) + 1; - - if (s->datalen + len > env->sc_maxsize) { - s->msgflags |= MF_ERROR_SIZE; - return; + n = iobuf_fqueue(&s->dataiobuf, "%s\n", line); + if (n == -1) { + /* XXX */ + fatalx("iobuf_fqueue"); } - s->datalen += len; - - if (fprintf(s->ofile, "%s\n", line) != (int)len) { - s->msgflags |= MF_ERROR_IO; - return; + s->datalen += n; + if (iobuf_queued(&s->dataiobuf) > DATA_HIWAT && !(s->io.flags & IO_PAUSE_IN)) { + log_debug("debug: smtp: %p: filter congestion over: pausing session", s); + io_pause(&s->io, IO_PAUSE_IN); } + io_reload(&s->dataio); } #define CASE(x) case x : return #x diff --git a/smtpd/smtpd.h b/smtpd/smtpd.h index 9a5e47b9..56e1e5ba 100644 --- a/smtpd/smtpd.h +++ b/smtpd/smtpd.h @@ -1312,6 +1312,7 @@ int smtp_session(struct listener *, int, const struct sockaddr_storage *, const char *); void smtp_session_imsg(struct mproc *, struct imsg *); void smtp_filter_response(uint64_t, int, int, uint32_t, const char *); +void smtp_filter_fd(uint64_t, int); /* smtpd.c */ void imsg_dispatch(struct mproc *, struct imsg *); |