diff options
author | 2014-05-18 17:47:40 +0200 | |
---|---|---|
committer | 2014-05-18 17:47:40 +0200 | |
commit | fa6fa9669361b9d584f338a95075cd1e27e09429 (patch) | |
tree | 2d1085cd8183fd6aea3a0c83da208256253159e0 | |
parent | Merge branch 'master' into portable (diff) | |
parent | Merge branch 'master' of ssh.poolp.org:/git/opensmtpd (diff) | |
download | OpenSMTPD-fa6fa9669361b9d584f338a95075cd1e27e09429.tar.xz OpenSMTPD-fa6fa9669361b9d584f338a95075cd1e27e09429.zip |
Merge branch 'master' into portable
-rw-r--r-- | smtpd/filter.c | 174 | ||||
-rw-r--r-- | smtpd/filter_api.c | 340 | ||||
-rw-r--r-- | smtpd/filters/filter_monkey.c | 2 | ||||
-rw-r--r-- | smtpd/filters/filter_perl.c | 2 | ||||
-rw-r--r-- | smtpd/filters/filter_python.c | 99 | ||||
-rw-r--r-- | smtpd/filters/filter_stub.c | 2 | ||||
-rw-r--r-- | smtpd/filters/filter_trace.c | 4 | ||||
-rw-r--r-- | smtpd/smtp_session.c | 54 | ||||
-rw-r--r-- | smtpd/smtpd-api.h | 6 |
9 files changed, 368 insertions, 315 deletions
diff --git a/smtpd/filter.c b/smtpd/filter.c index 483205c9..91f2c141 100644 --- a/smtpd/filter.c +++ b/smtpd/filter.c @@ -43,8 +43,8 @@ #include "log.h" enum { - QT_QUERY, - QT_EVENT, + QK_QUERY, + QK_EVENT, }; enum { @@ -78,19 +78,19 @@ struct filter_session { struct filter_query_lst queries; - struct io io; - struct iobuf iobuf; - FILE *ofile; - size_t datain; - size_t datalen; int error; + struct io iev; + struct iobuf ibuf; + size_t idatalen; + FILE *ofile; + struct filter_query *eom; }; struct filter_query { uint64_t qid; + int kind; int type; - int hook; struct filter_session *session; TAILQ_ENTRY(filter_query) entry; @@ -123,7 +123,7 @@ static struct filter_query *filter_query(struct filter_session *, int, int); static void filter_drain_query(struct filter_query *); static void filter_run_query(struct filter *, struct filter_query *); static void filter_end_query(struct filter_query *); -static void filter_set_fdout(struct filter_session *, int); +static void filter_set_sink(struct filter_session *, int); static int filter_tx(struct filter_session *, int); static void filter_tx_io(struct io *, int); @@ -134,7 +134,7 @@ static const char * filter_session_to_text(struct filter_session *); static const char * filter_query_to_text(struct filter_query *); static const char * filter_to_text(struct filter *); static const char * filter_proc_to_text(struct filter_proc *); -static const char * type_to_str(int); +static const char * kind_to_str(int); static const char * query_to_str(int); static const char * event_to_str(int); static const char * status_to_str(int); @@ -291,7 +291,7 @@ filter_event(uint64_t id, int event) s = xcalloc(1, sizeof(*s), "filter_event"); s->id = id; s->filters = dict_xget(&chains, "default"); - s->io.sock = -1; + s->iev.sock = -1; TAILQ_INIT(&s->queries); tree_xset(&sessions, s->id, s); } @@ -300,7 +300,7 @@ filter_event(uint64_t id, int event) s = tree_xpop(&sessions, id); else s = tree_xget(&sessions, id); - q = filter_query(s, QT_EVENT, event); + q = filter_query(s, QK_EVENT, event); filter_drain_query(q); } @@ -313,7 +313,7 @@ filter_connect(uint64_t id, const struct sockaddr *local, struct filter_query *q; s = tree_xget(&sessions, id); - q = filter_query(s, QT_QUERY, QUERY_CONNECT); + q = filter_query(s, QK_QUERY, QUERY_CONNECT); memmove(&q->u.connect.local, local, SA_LEN(local)); memmove(&q->u.connect.remote, remote, SA_LEN(remote)); @@ -327,13 +327,13 @@ filter_connect(uint64_t id, const struct sockaddr *local, } void -filter_mailaddr(uint64_t id, int qhook, const struct mailaddr *maddr) +filter_mailaddr(uint64_t id, int type, const struct mailaddr *maddr) { struct filter_session *s; struct filter_query *q; s = tree_xget(&sessions, id); - q = filter_query(s, QT_QUERY, qhook); + q = filter_query(s, QK_QUERY, type); strlcpy(q->u.maddr.user, maddr->user, sizeof(q->u.maddr.user)); strlcpy(q->u.maddr.domain, maddr->domain, sizeof(q->u.maddr.domain)); @@ -342,13 +342,13 @@ filter_mailaddr(uint64_t id, int qhook, const struct mailaddr *maddr) } void -filter_line(uint64_t id, int qhook, const char *line) +filter_line(uint64_t id, int type, const char *line) { struct filter_session *s; struct filter_query *q; s = tree_xget(&sessions, id); - q = filter_query(s, QT_QUERY, qhook); + q = filter_query(s, QK_QUERY, type); if (line) strlcpy(q->u.line, line, sizeof(q->u.line)); @@ -357,29 +357,28 @@ filter_line(uint64_t id, int qhook, const char *line) } void -filter_eom(uint64_t id, int qhook, size_t datalen) +filter_eom(uint64_t id, int type, size_t datalen) { struct filter_session *s; struct filter_query *q; s = tree_xget(&sessions, id); - q = filter_query(s, QT_QUERY, qhook); + q = filter_query(s, QK_QUERY, type); q->u.datalen = datalen; filter_drain_query(q); } static void -filter_set_fdout(struct filter_session *s, int fdout) +filter_set_sink(struct filter_session *s, int sink) { struct mproc *p; - int fd; while(s->fcurr) { if (s->fcurr->proc->hooks & HOOK_DATALINE) { - log_trace(TRACE_MFA, "filter: sending fd %d to %s", fdout, filter_to_text(s->fcurr)); + log_trace(TRACE_MFA, "filter: sending fd %d to %s", sink, filter_to_text(s->fcurr)); p = &s->fcurr->proc->mproc; - m_create(p, IMSG_FILTER_PIPE_SETUP, 0, 0, fdout); + m_create(p, IMSG_FILTER_PIPE, 0, 0, sink); m_add_id(p, s->id); m_close(p); return; @@ -387,45 +386,46 @@ filter_set_fdout(struct filter_session *s, int fdout) s->fcurr = TAILQ_PREV(s->fcurr, filter_lst, entry); } - log_trace(TRACE_MFA, "filter: chain input is %d", fdout); + log_trace(TRACE_MFA, "filter: chain input is %d", sink); - fd = filter_tx(s, fdout); - smtp_filter_fd(s->id, fd); + smtp_filter_fd(s->id, sink); } void -filter_build_fd_chain(uint64_t id, int fdout) +filter_build_fd_chain(uint64_t id, int sink) { struct filter_session *s; + int fd; s = tree_xget(&sessions, id); s->fcurr = TAILQ_LAST(s->filters, filter_lst); - filter_set_fdout(s, fdout); + fd = filter_tx(s, sink); + filter_set_sink(s, fd); } static struct filter_query * -filter_query(struct filter_session *s, int type, int qhook) +filter_query(struct filter_session *s, int kind, int type) { struct filter_query *q; q = xcalloc(1, sizeof *q, "filter_query"); q->qid = generate_uid(); q->session = s; + q->kind = kind; q->type = type; - q->hook = qhook; TAILQ_INSERT_TAIL(&s->queries, q, entry); q->state = QUERY_READY; q->current = TAILQ_FIRST(s->filters); q->hasrun = 0; - if (type == QT_QUERY) - log_trace(TRACE_MFA, "filter: new query %s %s", type_to_str(type), - query_to_str(qhook)); + if (kind == QK_QUERY) + log_trace(TRACE_MFA, "filter: new query %s %s", kind_to_str(kind), + query_to_str(type)); else - log_trace(TRACE_MFA, "filter: new query %s %s", type_to_str(type), - event_to_str(qhook)); + log_trace(TRACE_MFA, "filter: new query %s %s", kind_to_str(kind), + event_to_str(type)); return (q); } @@ -478,7 +478,7 @@ filter_drain_query(struct filter_query *q) } /* Defer the response if the file is not closed yet. */ - if (q->type == QT_QUERY && q->hook == QUERY_EOM && q->session->ofile) { + if (q->kind == QK_QUERY && q->type == QUERY_EOM && q->session->ofile) { log_debug("filter: deferring eom query..."); q->session->eom = q; return; @@ -490,7 +490,7 @@ filter_drain_query(struct filter_query *q) static void filter_run_query(struct filter *f, struct filter_query *q) { - if (q->type == QT_QUERY) { + if (q->kind == QK_QUERY) { log_trace(TRACE_MFA, "filter: running filter %s for query %s", filter_to_text(f), filter_query_to_text(q)); @@ -498,9 +498,9 @@ filter_run_query(struct filter *f, struct filter_query *q) m_create(&f->proc->mproc, IMSG_FILTER_QUERY, 0, 0, -1); m_add_id(&f->proc->mproc, q->session->id); m_add_id(&f->proc->mproc, q->qid); - m_add_int(&f->proc->mproc, q->hook); + m_add_int(&f->proc->mproc, q->type); - switch (q->hook) { + switch (q->type) { case QUERY_CONNECT: m_add_sockaddr(&f->proc->mproc, (struct sockaddr *)&q->u.connect.local); @@ -532,7 +532,7 @@ filter_run_query(struct filter *f, struct filter_query *q) m_create(&f->proc->mproc, IMSG_FILTER_EVENT, 0, 0, -1); m_add_id(&f->proc->mproc, q->session->id); - m_add_int(&f->proc->mproc, q->hook); + m_add_int(&f->proc->mproc, q->type); m_close(&f->proc->mproc); } } @@ -544,18 +544,22 @@ filter_end_query(struct filter_query *q) log_trace(TRACE_MFA, "filter: filter_end_query %s", filter_query_to_text(q)); - if (q->type == QT_EVENT) + if (q->kind == QK_EVENT) goto done; - if (q->hook == QUERY_EOM) { + if (q->type == QUERY_EOM) { + + log_debug("debug: filter: filter_end_query(%d, %zu, %zu)", s->iev.sock, + s->idatalen, q->u.datalen); + if (s->error) { smtp_filter_response(s->id, QUERY_EOM, FILTER_FAIL, 0, NULL); free(q->smtp.response); goto done; } - else if (q->u.datalen != s->datain) { + else if (q->u.datalen != s->idatalen) { log_warnx("filter: datalen mismatch on session %" PRIx64 - ": %zu/%zu", s->id, s->datain, q->u.datalen); + ": %zu/%zu", s->id, s->idatalen, q->u.datalen); smtp_filter_response(s->id, QUERY_EOM, FILTER_FAIL, 0, NULL); free(q->smtp.response); goto done; @@ -569,7 +573,7 @@ filter_end_query(struct filter_query *q) status_to_str(q->smtp.status), q->smtp.code, q->smtp.response); - smtp_filter_response(s->id, q->hook, q->smtp.status, q->smtp.code, + smtp_filter_response(s->id, q->type, q->smtp.status, q->smtp.code, q->smtp.response); free(q->smtp.response); @@ -588,7 +592,7 @@ filter_imsg(struct mproc *p, struct imsg *imsg) const char *line; uint64_t qid; uint32_t datalen; - int qhook, status, code; + int type, status, code; if (imsg == NULL) { log_warnx("warn: filter \"%s\" closed unexpectedly", p->name); @@ -627,8 +631,8 @@ filter_imsg(struct mproc *p, struct imsg *imsg) case IMSG_FILTER_RESPONSE: m_msg(&m, imsg); m_get_id(&m, &qid); - m_get_int(&m, &qhook); - if (qhook == QUERY_EOM) + m_get_int(&m, &type); + if (type == QUERY_EOM) m_get_u32(&m, &datalen); m_get_int(&m, &status); m_get_int(&m, &code); @@ -639,8 +643,8 @@ filter_imsg(struct mproc *p, struct imsg *imsg) m_end(&m); q = tree_xpop(&queries, qid); - if (q->hook != qhook) { - log_warnx("warn: filter: hook mismatch %d != %d", q->hook, qhook); + if (q->type != type) { + log_warnx("warn: filter: type mismatch %d != %d", q->type, type); fatalx("exiting"); } q->smtp.status = status; @@ -651,7 +655,7 @@ filter_imsg(struct mproc *p, struct imsg *imsg) q->smtp.response = xstrdup(line, "filter_imsg"); } q->state = (status == FILTER_OK) ? QUERY_READY : QUERY_DONE; - if (qhook == QUERY_EOM) + if (type == QUERY_EOM) q->u.datalen = datalen; next = TAILQ_NEXT(q, entry); @@ -665,14 +669,14 @@ filter_imsg(struct mproc *p, struct imsg *imsg) filter_drain_query(next); break; - case IMSG_FILTER_PIPE_SETUP: + case IMSG_FILTER_PIPE: m_msg(&m, imsg); m_get_id(&m, &qid); m_end(&m); s = tree_xget(&sessions, qid); s->fcurr = TAILQ_PREV(s->fcurr, filter_lst, entry); - filter_set_fdout(s, imsg->fd); + filter_set_sink(s, imsg->fd); break; default: @@ -682,13 +686,11 @@ filter_imsg(struct mproc *p, struct imsg *imsg) } static int -filter_tx(struct filter_session *s, int fdout) +filter_tx(struct filter_session *s, int sink) { - int sp[2]; + int sp[2]; - /* reset */ - s->datain = 0; - s->datalen = 0; + s->idatalen = 0; s->eom = NULL; s->error = 0; @@ -697,16 +699,16 @@ filter_tx(struct filter_session *s, int fdout) return (-1); } - if ((s->ofile = fdopen(fdout, "w")) == NULL) { + if ((s->ofile = fdopen(sink, "w")) == NULL) { log_warn("warn: filter: fdopen"); close(sp[0]); close(sp[1]); return (-1); } - iobuf_init(&s->iobuf, 0, 0); - io_init(&s->io, sp[0], s, filter_tx_io, &s->iobuf); - io_set_read(&s->io); + iobuf_init(&s->ibuf, 0, 0); + io_init(&s->iev, sp[0], s, filter_tx_io, &s->ibuf); + io_set_read(&s->iev); return (sp[1]); } @@ -717,26 +719,32 @@ filter_tx_io(struct io *io, int evt) struct filter_session *s = io->arg; size_t len, n; char *data; + char buf[65535]; + + log_debug("filter: filter_tx_io(%p, %s)", s, io_strevent(evt)); switch (evt) { case IO_DATAIN: - data = iobuf_data(&s->iobuf); - len = iobuf_len(&s->iobuf); - log_debug("debug: filter: tx data (%zu) for req %016"PRIx64, - len, s->id); + data = iobuf_data(&s->ibuf); + len = iobuf_len(&s->ibuf); + memmove(buf, data, len); + buf[len] = 0; + log_debug("filter: filter_tx_io: datain (%zu) for req %016"PRIx64": %s", + len, s->id, buf); + n = fwrite(data, 1, len, s->ofile); if (n != len) { log_warnx("warn: filter_tx_io: fwrite %zu/%zu", n, len); s->error = 1; break; } - s->datain += n; - iobuf_drop(&s->iobuf, n); - iobuf_normalize(&s->iobuf); + s->idatalen += n; + iobuf_drop(&s->ibuf, n); + iobuf_normalize(&s->ibuf); return; case IO_DISCONNECTED: - log_debug("debug: filter: tx done for req %016"PRIx64, s->id); + log_debug("debug: filter: tx done (%zu) for req %016"PRIx64, s->idatalen, s->id); break; default: @@ -745,8 +753,8 @@ filter_tx_io(struct io *io, int evt) break; } - io_clear(&s->io); - iobuf_clear(&s->iobuf); + io_clear(&s->iev); + iobuf_clear(&s->ibuf); fclose(s->ofile); s->ofile = NULL; @@ -767,8 +775,8 @@ filter_query_to_text(struct filter_query *q) tmp[0] = '\0'; - if (q->type == QT_QUERY) { - switch(q->hook) { + if (q->kind == QK_QUERY) { + switch(q->type) { case QUERY_CONNECT: strlcat(tmp, "=", sizeof tmp); strlcat(tmp, ss_to_text(&q->u.connect.local), sizeof tmp); @@ -790,12 +798,12 @@ filter_query_to_text(struct filter_query *q) break; } snprintf(buf, sizeof buf, "%016"PRIx64"[%s,%s%s,%s]", - q->qid, type_to_str(q->type), query_to_str(q->hook), tmp, + q->qid, kind_to_str(q->kind), query_to_str(q->type), tmp, filter_session_to_text(q->session)); } else { snprintf(buf, sizeof buf, "%016"PRIx64"[%s,%s%s,%s]", - q->qid, type_to_str(q->type), event_to_str(q->hook), tmp, + q->qid, kind_to_str(q->kind), event_to_str(q->type), tmp, filter_session_to_text(q->session)); } @@ -811,7 +819,7 @@ filter_session_to_text(struct filter_session *s) return "filter_session@NULL"; snprintf(buf, sizeof(buf), "filter_session@%p[datalen=%zu,eom=%p,ofile=%p]", - s, s->datalen, s->eom, s->ofile); + s, s->idatalen, s->eom, s->ofile); return buf; } @@ -846,9 +854,7 @@ filterimsg_to_str(int imsg) CASE(IMSG_FILTER_REGISTER); CASE(IMSG_FILTER_EVENT); CASE(IMSG_FILTER_QUERY); - CASE(IMSG_FILTER_PIPE_SETUP); - CASE(IMSG_FILTER_PIPE_ABORT); - CASE(IMSG_FILTER_NOTIFY); + CASE(IMSG_FILTER_PIPE); CASE(IMSG_FILTER_RESPONSE); default: return "IMSG_FILTER_???"; @@ -886,13 +892,13 @@ event_to_str(int event) } static const char * -type_to_str(int type) +kind_to_str(int type) { switch (type) { - CASE(QT_QUERY); - CASE(QT_EVENT); + CASE(QK_QUERY); + CASE(QK_EVENT); default: - return "QT_???"; + return "QK_???"; } } diff --git a/smtpd/filter_api.c b/smtpd/filter_api.c index cbf10761..586b876d 100644 --- a/smtpd/filter_api.c +++ b/smtpd/filter_api.c @@ -44,10 +44,12 @@ static struct tree sessions; struct filter_session { uint64_t id; uint64_t qid; - int qhook; + int qtype; + size_t datalen; struct { - size_t datalen; + int eom_called; + int error; struct io iev; struct iobuf ibuf; @@ -85,7 +87,7 @@ static struct filter_internals { int (*rcpt)(uint64_t, struct mailaddr *); int (*data)(uint64_t); void (*dataline)(uint64_t, const char *); - int (*eom)(uint64_t); + int (*eom)(uint64_t, size_t); void (*disconnect)(uint64_t); void (*reset)(uint64_t); @@ -97,7 +99,7 @@ static struct filter_internals { static void filter_api_init(void); static void filter_response(struct filter_session *, int, int, const char *); static void filter_send_response(struct filter_session *); -static void filter_register_query(uint64_t, uint64_t, enum filter_hook_type); +static void filter_register_query(uint64_t, uint64_t, int); static void filter_dispatch(struct mproc *, struct imsg *); static void filter_dispatch_dataline(uint64_t, const char *); static void filter_dispatch_data(uint64_t); @@ -123,11 +125,8 @@ static const char *event_to_str(int); static void filter_response(struct filter_session *s, int status, int code, const char *line) { - log_debug("debug: filter-api:%s: got response %s for %016"PRIx64" %d %d %s", - filter_name, query_to_str(s->qhook), s->id, - s->response.status, - s->response.code, - s->response.line); + log_debug("debug: filter-api:%s %016"PRIx64" %s filter_response(%d, %d, %s)", + filter_name, s->id, query_to_str(s->qtype), status, code, line); s->response.ready = 1; s->response.status = status; @@ -137,22 +136,18 @@ filter_response(struct filter_session *s, int status, int code, const char *line else s->response.line = NULL; - /* For HOOK_EOM, wait until the obuf is drained before sending the */ - if (s->qhook == QUERY_EOM && - fi.hooks & HOOK_DATALINE && - s->pipe.oev.sock != -1) { - log_debug("debug: filter-api:%s: got response, waiting for opipe to be closed", filter_name); - return; - } - - filter_send_response(s); + /* eom is special, as the reponse has to be deferred until the pipe is all flushed */ + if (s->qtype == QUERY_EOM) + filter_trigger_eom(s); + else + filter_send_response(s); } static void filter_send_response(struct filter_session *s) { - log_debug("debug: filter-api:%s: sending response %s for %016"PRIx64" %d %d %s", - filter_name, query_to_str(s->qhook), s->id, + log_debug("debug: filter-api:%s %016"PRIx64" %s filter_send_response() -> %d, %d, %s", + filter_name, s->id, query_to_str(s->qtype), s->response.status, s->response.code, s->response.line); @@ -161,10 +156,9 @@ filter_send_response(struct filter_session *s) m_create(&fi.p, IMSG_FILTER_RESPONSE, 0, 0, -1); m_add_id(&fi.p, s->qid); - m_add_int(&fi.p, s->qhook); - if (s->qhook == QUERY_EOM) - m_add_u32(&fi.p, (fi.hooks & HOOK_DATALINE) ? - s->pipe.odatalen : s->pipe.datalen); + m_add_int(&fi.p, s->qtype); + if (s->qtype == QUERY_EOM) + m_add_u32(&fi.p, s->datalen); m_add_int(&fi.p, s->response.status); m_add_int(&fi.p, s->response.code); if (s->response.line) { @@ -188,10 +182,10 @@ filter_dispatch(struct mproc *p, struct imsg *imsg) const char *line, *name; uint32_t v, datalen; uint64_t id, qid; - int status, event, hook; + int status, type; int fds[2], fdin, fdout; - log_debug("debug: filter-api:%s: imsg %s", filter_name, + log_debug("debug: filter-api:%s imsg %s", filter_name, filterimsg_to_str(imsg->hdr.type)); switch (imsg->hdr.type) { @@ -202,7 +196,7 @@ filter_dispatch(struct mproc *p, struct imsg *imsg) filter_name = strdup(name); m_end(&m); if (v != FILTER_API_VERSION) { - log_warnx("warn: filter-api:%s: API mismatch", filter_name); + log_warnx("warn: filter-api:%s API mismatch", filter_name); fatalx("filter-api: exiting"); } m_create(p, IMSG_FILTER_REGISTER, 0, 0, -1); @@ -214,9 +208,9 @@ filter_dispatch(struct mproc *p, struct imsg *imsg) case IMSG_FILTER_EVENT: m_msg(&m, imsg); m_get_id(&m, &id); - m_get_int(&m, &event); + m_get_int(&m, &type); m_end(&m); - switch (event) { + switch (type) { case EVENT_CONNECT: s = xcalloc(1, sizeof(*s), "filter_dispatch"); s->id = id; @@ -238,6 +232,9 @@ filter_dispatch(struct mproc *p, struct imsg *imsg) case EVENT_ROLLBACK: filter_dispatch_rollback(id); break; + default: + log_warnx("warn: filter-api:%s bad event %d", filter_name, type); + fatalx("filter-api: exiting"); } break; @@ -245,52 +242,52 @@ filter_dispatch(struct mproc *p, struct imsg *imsg) m_msg(&m, imsg); m_get_id(&m, &id); m_get_id(&m, &qid); - m_get_int(&m, &hook); - switch(hook) { + m_get_int(&m, &type); + switch(type) { case QUERY_CONNECT: m_get_sockaddr(&m, (struct sockaddr*)&q_connect.local); m_get_sockaddr(&m, (struct sockaddr*)&q_connect.remote); m_get_string(&m, &q_connect.hostname); m_end(&m); - filter_register_query(id, qid, hook); + filter_register_query(id, qid, type); filter_dispatch_connect(id, &q_connect); break; case QUERY_HELO: m_get_string(&m, &line); m_end(&m); - filter_register_query(id, qid, hook); + filter_register_query(id, qid, type); filter_dispatch_helo(id, line); break; case QUERY_MAIL: m_get_mailaddr(&m, &maddr); m_end(&m); - filter_register_query(id, qid, hook); + filter_register_query(id, qid, type); filter_dispatch_mail(id, &maddr); break; case QUERY_RCPT: m_get_mailaddr(&m, &maddr); m_end(&m); - filter_register_query(id, qid, hook); + filter_register_query(id, qid, type); filter_dispatch_rcpt(id, &maddr); break; case QUERY_DATA: m_end(&m); - filter_register_query(id, qid, hook); + filter_register_query(id, qid, type); filter_dispatch_data(id); break; case QUERY_EOM: m_get_u32(&m, &datalen); m_end(&m); - filter_register_query(id, qid, hook); + filter_register_query(id, qid, type); filter_dispatch_eom(id, datalen); break; default: - log_warnx("warn: filter-api:%s: bad hook %d", filter_name, hook); + log_warnx("warn: filter-api:%s bad query %d", filter_name, type); fatalx("filter-api: exiting"); } break; - case IMSG_FILTER_PIPE_SETUP: + case IMSG_FILTER_PIPE: m_msg(&m, imsg); m_get_id(&m, &id); m_end(&m); @@ -299,11 +296,17 @@ filter_dispatch(struct mproc *p, struct imsg *imsg) fdin = -1; if (socketpair(AF_UNIX, SOCK_STREAM, PF_UNSPEC, fds) == -1) { - log_warn("warn: filter-api:%s: socketpair", filter_name); + log_warn("warn: filter-api:%s socketpair", filter_name); close(fdout); } else { s = tree_xget(&sessions, id); + + s->pipe.eom_called = 0; + s->pipe.error = 0; + s->pipe.idatalen = 0; + s->pipe.odatalen = 0; + iobuf_init(&s->pipe.obuf, 0, 0); io_init(&s->pipe.oev, fdout, s, filter_io_out, &s->pipe.obuf); io_set_write(&s->pipe.oev); @@ -313,48 +316,34 @@ filter_dispatch(struct mproc *p, struct imsg *imsg) io_set_read(&s->pipe.iev); fdin = fds[1]; - /* XXX notify? */ } - log_debug("debug: filter-api:%s: tx pipe %d -> %d for %016"PRIx64, filter_name, fdin, fdout, id); - m_create(&fi.p, IMSG_FILTER_PIPE_SETUP, 0, 0, fdin); + + log_debug("debug: filter-api:%s %016"PRIx64" tx pipe %d -> %d", + filter_name, id, fdin, fdout); + + m_create(&fi.p, IMSG_FILTER_PIPE, 0, 0, fdin); m_add_id(&fi.p, id); m_close(&fi.p); - break; - case IMSG_FILTER_PIPE_ABORT: - m_msg(&m, imsg); - m_get_id(&m, &id); - m_end(&m); - s = tree_xget(&sessions, id); - if (s->pipe.iev.sock != -1) { - io_clear(&s->pipe.iev); - iobuf_clear(&s->pipe.ibuf); - } - if (s->pipe.oev.sock != -1) { - io_clear(&s->pipe.oev); - iobuf_clear(&s->pipe.obuf); - } - /* XXX notify? */ break; } } static void -filter_register_query(uint64_t id, uint64_t qid, enum filter_hook_type hook) +filter_register_query(uint64_t id, uint64_t qid, int type) { struct filter_session *s; - log_debug("debug: filter-api:%s: query %s for %016"PRIx64, - filter_name, query_to_str(hook), id); + log_debug("debug: filter-api:%s %016"PRIx64" %s", filter_name, id, query_to_str(type)); s = tree_xget(&sessions, id); if (s->qid) { - log_warnx("warn: filter-api:%s: query already in progess", + log_warnx("warn: filter-api:%s query already in progess", filter_name); fatalx("filter-api: exiting"); } s->qid = qid; - s->qhook = hook; + s->qtype = type; s->response.ready = 0; tree_xset(&queries, qid, s); @@ -433,6 +422,14 @@ filter_dispatch_disconnect(uint64_t id) fi.cb.disconnect(id); } +static void +filter_dispatch_dataline(uint64_t id, const char *data) +{ + if (fi.cb.dataline) + fi.cb.dataline(id, data); + else + filter_api_writeln(id, data); +} static void filter_dispatch_eom(uint64_t id, size_t datalen) @@ -440,71 +437,62 @@ filter_dispatch_eom(uint64_t id, size_t datalen) struct filter_session *s; s = tree_xget(&sessions, id); - s->pipe.datalen = datalen; + s->datalen = datalen; - if (fi.hooks & HOOK_DATALINE) { - /* wait for the io to be done */ - if (s->pipe.iev.sock != -1) { - log_debug("debug: filter-api:%s: eom received for %016"PRIx64", waiting for io to end", - filter_name, id); - return; - } - filter_trigger_eom(s); - return; - } - - if (fi.cb.eom) - fi.cb.eom(s->id); - else - filter_api_accept(id); -} - -static void -filter_dispatch_dataline(uint64_t id, const char *data) -{ - if (fi.cb.dataline) - fi.cb.dataline(id, data); - else - filter_api_writeln(id, data); + filter_trigger_eom(s); } static void filter_trigger_eom(struct filter_session *s) { - log_debug("debug: filter-api:%s: tx eom (%zu) for %016"PRIx64, filter_name, s->pipe.datalen, s->id); + log_debug("debug: filter-api:%s %016"PRIx64" filter_trigger_eom(%d, %d, %zu, %zu, %zu)", + filter_name, s->id, s->pipe.iev.sock, s->pipe.oev.sock, + s->datalen, s->pipe.idatalen, s->pipe.odatalen); + + /* This is called when + * - EOM query is first received + * - input data is closed + * - output has been written + */ + + /* input not done yet, or EOM query not received */ + if (s->pipe.iev.sock != -1 || s->qid == 0) + return; + + if (s->pipe.error) + goto fail; - if (!s->pipe.error && s->pipe.idatalen != s->pipe.datalen) { - log_debug("debug: filter-api:%s: tx datalen mismatch: %zu/%zu", - filter_name, s->pipe.idatalen, s->pipe.datalen); + /* if size don't match, error out */ + if (s->pipe.idatalen != s->datalen) { + log_debug("debug: filter-api:%s tx datalen mismatch: %zu/%zu", + filter_name, s->pipe.idatalen, s->datalen); s->pipe.error = 1; - } - if (s->pipe.error) { - log_debug("debug: filter-api:%s: tx pipe.error", filter_name); - /* XXX error? */ + goto fail; } - /* if the filter has no eom callback, we accept the message */ - if (fi.cb.eom) { - log_debug("debug: filter-api:%s: calling eom callback", filter_name); - fi.cb.eom(s->id); - } else { - log_debug("debug: filter-api:%s: accepting by default", filter_name); - filter_api_accept(s->id); + /* if we didn't send the eom to the user do it now */ + if (!s->pipe.eom_called) { + s->pipe.eom_called = 1; + if (fi.cb.eom) + fi.cb.eom(s->id, s->datalen); + else + filter_api_accept(s->id); + return; } - /* if the output is done and the response is ready, send it */ - if ((s->pipe.oev.sock == -1 || iobuf_queued(&s->pipe.obuf) == 0) && - s->response.ready) { - log_debug("debug: filter-api:%s: sending response", filter_name); - if (s->pipe.oev.sock != -1) { - io_clear(&s->pipe.oev); - iobuf_clear(&s->pipe.obuf); - } - filter_send_response(s); - } - else { - log_debug("debug: filter-api:%s: waiting for obuf to drain", filter_name); - } + if (s->pipe.error) + goto fail; + + /* wait for the output socket to be closed */ + if (s->pipe.oev.sock != -1) + return; + + s->datalen = s->pipe.odatalen; + filter_send_response(s); + + fail: + /* XXX */ + return; } static void @@ -514,7 +502,7 @@ filter_io_in(struct io *io, int evt) char *line; size_t len; - log_debug("debug: filter-api:%s: filter_io_in(%p, %s)", + log_debug("debug: filter-api:%s filter_io_in(%p, %s)", filter_name, s, io_strevent(evt)); switch (evt) { @@ -524,8 +512,6 @@ filter_io_in(struct io *io, int evt) if ((line == NULL && iobuf_len(&s->pipe.ibuf) >= SMTPD_MAXLINESIZE) || (line && len >= SMTPD_MAXLINESIZE)) { s->pipe.error = 1; - io_clear(&s->pipe.oev); - iobuf_clear(&s->pipe.obuf); break; } /* No complete line received */ @@ -533,28 +519,37 @@ filter_io_in(struct io *io, int evt) iobuf_normalize(&s->pipe.ibuf); /* flow control */ if (iobuf_queued(&s->pipe.obuf) >= FILTER_HIWAT) - io_pause(&s->pipe.oev, IO_PAUSE_IN); + io_pause(&s->pipe.iev, IO_PAUSE_IN); return; } + s->pipe.idatalen += len + 1; + /* XXX warning: do not clear io from this call! */ filter_dispatch_dataline(s->id, line); goto nextline; case IO_DISCONNECTED: - if (s->qhook == QUERY_EOM) - filter_trigger_eom(s); - else { - log_debug("debug: filter-api:%s: datain closed, for %016"PRIx64", waiting for eom", + if (iobuf_len(&s->pipe.ibuf)) { + log_warn("warn: filter-api:%s %016"PRIx64" incomplete input", filter_name, s->id); } + log_debug("debug: filter-api:%s %016"PRIx64" input done (%zu bytes)", + filter_name, s->id, s->pipe.idatalen); break; + default: + log_warn("warn: filter-api:%s %016"PRIx64": unexpected io event %d on data pipe", + filter_name, s->id, evt); s->pipe.error = 1; + + } + if (s->pipe.error) { io_clear(&s->pipe.oev); iobuf_clear(&s->pipe.obuf); } io_clear(&s->pipe.iev); iobuf_clear(&s->pipe.ibuf); + filter_trigger_eom(s); } static void @@ -562,39 +557,43 @@ filter_io_out(struct io *io, int evt) { struct filter_session *s = io->arg; - log_debug("debug: filter-api:%s: filter_io_out(%p, %s)", - filter_name, s, io_strevent(evt)); + log_debug("debug: filter-api:%s %016"PRIx64" filter_io_out(%s)", + filter_name, s->id, io_strevent(evt)); switch (evt) { case IO_TIMEOUT: case IO_DISCONNECTED: case IO_ERROR: - log_debug("debug: filter-api:%s: io error on output pipe", - filter_name); + log_debug("debug: filter-api:%s %016"PRIx64" io error on output pipe", + filter_name, s->id); s->pipe.error = 1; - io_clear(&s->pipe.oev); - iobuf_clear(&s->pipe.obuf); - if (s->pipe.iev.sock != -1) { - io_clear(&s->pipe.iev); - iobuf_clear(&s->pipe.ibuf); - } break; case IO_LOWAT: /* flow control */ - if (s->pipe.iev.sock != -1 && s->pipe.iev.flags & IO_PAUSE_IN) + if (s->pipe.iev.sock != -1 && s->pipe.iev.flags & IO_PAUSE_IN) { io_resume(&s->pipe.iev, IO_PAUSE_IN); - - /* if the input is done and there is a response, send it */ - if (s->pipe.iev.sock == -1 && s->response.ready) { - io_clear(&s->pipe.oev); - iobuf_clear(&s->pipe.obuf); - filter_send_response(s); + return; } - break; + + /* if the input is done and there is a response we are done */ + if (s->pipe.iev.sock == -1 && s->response.ready) + break; + + /* just wait for more data to send */ + return; + default: fatalx("filter_io_out()"); } + + io_clear(&s->pipe.oev); + iobuf_clear(&s->pipe.obuf); + if (s->pipe.error) { + io_clear(&s->pipe.iev); + iobuf_clear(&s->pipe.ibuf); + } + filter_trigger_eom(s); } #define CASE(x) case x : return #x @@ -606,12 +605,10 @@ filterimsg_to_str(int imsg) CASE(IMSG_FILTER_REGISTER); CASE(IMSG_FILTER_EVENT); CASE(IMSG_FILTER_QUERY); - CASE(IMSG_FILTER_PIPE_SETUP); - CASE(IMSG_FILTER_PIPE_ABORT); - CASE(IMSG_FILTER_NOTIFY); + CASE(IMSG_FILTER_PIPE); CASE(IMSG_FILTER_RESPONSE); default: - return "IMSG_FILTER_???"; + return ("IMSG_FILTER_???"); } } @@ -631,7 +628,7 @@ hook_to_str(int hook) CASE(HOOK_ROLLBACK); CASE(HOOK_DATALINE); default: - return "HOOK_???"; + return ("HOOK_???"); } } @@ -647,7 +644,7 @@ query_to_str(int query) CASE(QUERY_EOM); CASE(QUERY_DATALINE); default: - return "QUERY_???"; + return ("QUERY_???"); } } @@ -661,7 +658,7 @@ event_to_str(int event) CASE(EVENT_COMMIT); CASE(EVENT_ROLLBACK); default: - return "EVENT_???"; + return ("EVENT_???"); } } @@ -675,8 +672,8 @@ const char * proc_name(enum smtp_proc_type proc) { if (proc == PROC_FILTER) - return filter_name; - return "filter"; + return (filter_name); + return ("filter"); } const char * @@ -700,11 +697,11 @@ filter_api_setugid(uid_t uid, gid_t gid) filter_api_init(); if (! uid) { - log_warn("warn: filter-api:%s: can't set uid 0", filter_name); + log_warn("warn: filter-api:%s can't set uid 0", filter_name); fatalx("filter-api: exiting"); } if (! gid) { - log_warn("warn: filter-api:%s: can't set gid 0", filter_name); + log_warn("warn: filter-api:%s can't set gid 0", filter_name); fatalx("filter-api: exiting"); } fi.uid = uid; @@ -744,7 +741,7 @@ filter_api_init(void) pw = getpwnam(SMTPD_USER); if (pw == NULL) { - log_warn("warn: filter-api:%s: getpwnam", filter_name); + log_warn("warn: filter-api:%s getpwnam", filter_name); fatalx("filter-api: exiting"); } @@ -824,7 +821,7 @@ filter_api_on_dataline(void(*cb)(uint64_t, const char *)) } void -filter_api_on_eom(int(*cb)(uint64_t)) +filter_api_on_eom(int(*cb)(uint64_t, size_t)) { filter_api_init(); @@ -872,7 +869,7 @@ void filter_api_loop(void) { if (register_done) { - log_warnx("warn: filter-api:%s: filter_api_loop() already called", filter_name); + log_warnx("warn: filter-api:%s filter_api_loop() already called", filter_name); fatalx("filter-api: exiting"); } @@ -884,11 +881,11 @@ filter_api_loop(void) if (fi.rootpath) { if (chroot(fi.rootpath) == -1) { - log_warn("warn: filter-api:%s: chroot", filter_name); + log_warn("warn: filter-api:%s chroot", filter_name); fatalx("filter-api: exiting"); } if (chdir("/") == -1) { - log_warn("warn: filter-api:%s: chdir", filter_name); + log_warn("warn: filter-api:%s chdir", filter_name); fatalx("filter-api: exiting"); } } @@ -896,12 +893,12 @@ filter_api_loop(void) if (setgroups(1, &fi.gid) || setresgid(fi.gid, fi.gid, fi.gid) || setresuid(fi.uid, fi.uid, fi.uid)) { - log_warn("warn: filter-api:%s: cannot drop privileges", filter_name); + log_warn("warn: filter-api:%s cannot drop privileges", filter_name); fatalx("filter-api: exiting"); } if (event_dispatch() < 0) { - log_warn("warn: filter-api:%s: event_dispatch", filter_name); + log_warn("warn: filter-api:%s event_dispatch", filter_name); fatalx("filter-api: exiting"); } } @@ -911,9 +908,12 @@ filter_api_accept(uint64_t id) { struct filter_session *s; + log_debug("debug: filter-api:%s %016"PRIx64" filter_api_accept()", filter_name, id); + s = tree_xget(&sessions, id); filter_response(s, FILTER_OK, 0, NULL); - return 1; + + return (1); } int @@ -921,6 +921,9 @@ filter_api_reject(uint64_t id, enum filter_status status) { struct filter_session *s; + log_debug("debug: filter-api:%s %016"PRIx64" filter_api_reject(%d)", + filter_name, id, status); + s = tree_xget(&sessions, id); /* This is NOT an acceptable status for a failure */ @@ -928,7 +931,8 @@ filter_api_reject(uint64_t id, enum filter_status status) status = FILTER_FAIL; filter_response(s, status, 0, NULL); - return 1; + + return (1); } int @@ -937,6 +941,9 @@ filter_api_reject_code(uint64_t id, enum filter_status status, uint32_t code, { struct filter_session *s; + log_debug("debug: filter-api:%s %016"PRIx64" filter_api_reject_code(%d, %u, %s)", + filter_name, id, status, code, line); + s = tree_xget(&sessions, id); /* This is NOT an acceptable status for a failure */ @@ -944,7 +951,8 @@ filter_api_reject_code(uint64_t id, enum filter_status status, uint32_t code, status = FILTER_FAIL; filter_response(s, status, code, line); - return 1; + + return (1); } void @@ -952,6 +960,8 @@ filter_api_writeln(uint64_t id, const char *line) { struct filter_session *s; + log_debug("debug: filter-api:%s %016"PRIx64" filter_api_writeln(%s)", filter_name, id, line); + s = tree_xget(&sessions, id); if (s->pipe.oev.sock == -1) { @@ -984,7 +994,7 @@ filter_api_mailaddr_to_text(const struct mailaddr *maddr) strlcpy(buffer, maddr->user, sizeof buffer); strlcat(buffer, "@", sizeof buffer); if (strlcat(buffer, maddr->domain, sizeof buffer) >= sizeof buffer) - return NULL; + return (NULL); - return buffer; + return (buffer); } diff --git a/smtpd/filters/filter_monkey.c b/smtpd/filters/filter_monkey.c index 673b6c98..9a7fa2b4 100644 --- a/smtpd/filters/filter_monkey.c +++ b/smtpd/filters/filter_monkey.c @@ -75,7 +75,7 @@ on_data(uint64_t id) } static int -on_eom(uint64_t id) +on_eom(uint64_t id, size_t size) { return monkey(id); } diff --git a/smtpd/filters/filter_perl.c b/smtpd/filters/filter_perl.c index 52beb214..58d57e5c 100644 --- a/smtpd/filters/filter_perl.c +++ b/smtpd/filters/filter_perl.c @@ -170,7 +170,7 @@ on_data(uint64_t id) } static int -on_eom(uint64_t id) +on_eom(uint64_t id, size_t size) { call_sub_sv((SV *)pl_on_eom, "%i", id); return filter_api_accept(id); diff --git a/smtpd/filters/filter_python.c b/smtpd/filters/filter_python.c index 72034ca1..f40f62bb 100644 --- a/smtpd/filters/filter_python.c +++ b/smtpd/filters/filter_python.c @@ -144,7 +144,7 @@ on_connect(uint64_t id, struct filter_connect *conn) exit(1); } - return 1; + return (1); } static int @@ -196,7 +196,7 @@ on_mail(uint64_t id, struct mailaddr *mail) exit(1); } - return 1; + return (1); } static int @@ -223,7 +223,7 @@ on_rcpt(uint64_t id, struct mailaddr *rcpt) exit(1); } - return 1; + return (1); } static int @@ -245,20 +245,22 @@ on_data(uint64_t id) exit(1); } - log_warnx("warn: filter-python: GOT DATA"); - return 1; + return (1); } static int -on_eom(uint64_t id) +on_eom(uint64_t id, size_t sz) { PyObject *py_args; PyObject *py_ret; PyObject *py_id; + PyObject *py_sz; - py_args = PyTuple_New(1); + py_args = PyTuple_New(2); py_id = PyLong_FromUnsignedLongLong(id); + py_sz = PyLong_FromSize_t(sz); PyTuple_SetItem(py_args, 0, py_id); + PyTuple_SetItem(py_args, 1, py_sz); py_ret = PyObject_CallObject(py_on_eom, py_args); Py_DECREF(py_args); @@ -268,8 +270,7 @@ on_eom(uint64_t id) exit(1); } - log_warnx("warn: filter-python: GOT EOM"); - return 1; + return (1); } static void @@ -290,9 +291,6 @@ on_commit(uint64_t id) log_warnx("warn: filter-python: call to on_commit handler failed"); exit(1); } - - log_warnx("warn: filter-python: GOT COMMIT"); - return; } static void @@ -313,9 +311,6 @@ on_rollback(uint64_t id) log_warnx("warn: filter-python: call to on_rollback handler failed"); exit(1); } - - log_warnx("warn: filter-python: GOT ROLLBACK"); - return; } static void @@ -336,9 +331,6 @@ on_disconnect(uint64_t id) log_warnx("warn: filter-python: call to on_disconnect handler failed"); exit(1); } - - log_warnx("warn: filter-python: GOT DISCONNECT"); - return; } static void @@ -366,14 +358,50 @@ on_dataline(uint64_t id, const char *line) } } +static char * +loadfile(const char * path) +{ + FILE *f; + off_t oz; + size_t sz; + char *buf; + + if ((f = fopen(path, "r")) == NULL) + err(1, "fopen"); + + if (fseek(f, 0, SEEK_END) == -1) + err(1, "fseek"); + + oz = ftello(f); + + if (fseek(f, 0, SEEK_SET) == -1) + err(1, "fseek"); + + if (oz >= SIZE_MAX) + errx(1, "too big"); + + sz = oz; + + if ((buf = malloc(sz + 1)) == NULL) + err(1, "malloc"); + + if (fread(buf, 1, sz, f) != sz) + err(1, "fread"); + + buf[sz] = '\0'; + + fclose(f); + + return (buf); +} + int main(int argc, char **argv) { - int ch; - const char *scriptpath = "/tmp/test.py"; - PyObject *name; - PyObject *self; - PyObject *module; + int ch; + char *path; + char *buf; + PyObject *self, *code, *module; log_init(-1); @@ -388,21 +416,32 @@ main(int argc, char **argv) argc -= optind; argv += optind; - setenv("PYTHONPATH", "/tmp", 1); + if (argc == 0) + errx(1, "missing path"); + path = argv[0]; + Py_Initialize(); - self = Py_InitModule("smtpd", py_methods); + self = Py_InitModule("filter", py_methods); PyModule_AddIntConstant(self, "FILTER_OK", FILTER_OK); PyModule_AddIntConstant(self, "FILTER_FAIL", FILTER_FAIL); PyModule_AddIntConstant(self, "FILTER_CLOSE", FILTER_CLOSE); - name = PyString_FromString("test"); - module = PyImport_Import(name); - Py_DECREF(name); + buf = loadfile(path); + code = Py_CompileString(buf, path, Py_file_input); + free(buf); + + if (code == NULL) { + PyErr_Print(); + log_warnx("warn: filter-python: failed to compile %s", path); + return (1); + } + + module = PyImport_ExecCodeModuleEx("myfilter", code, path); if (module == NULL) { PyErr_Print(); - log_warnx("warn: filter-python: failed to load %s", scriptpath); - return 1; + log_warnx("warn: filter-python: failed to install module %s", path); + return (1); } log_debug("debug: filter-python: starting..."); diff --git a/smtpd/filters/filter_stub.c b/smtpd/filters/filter_stub.c index 86bc9a44..2b36aeeb 100644 --- a/smtpd/filters/filter_stub.c +++ b/smtpd/filters/filter_stub.c @@ -64,7 +64,7 @@ on_data(uint64_t id) } static int -on_eom(uint64_t id) +on_eom(uint64_t id, size_t size) { log_debug("ON EOM"); return filter_api_accept(id); diff --git a/smtpd/filters/filter_trace.c b/smtpd/filters/filter_trace.c index 83c7fcef..9c0dc25b 100644 --- a/smtpd/filters/filter_trace.c +++ b/smtpd/filters/filter_trace.c @@ -99,9 +99,9 @@ on_data(uint64_t id) } static int -on_eom(uint64_t id) +on_eom(uint64_t id, size_t size) { - printf("filter-trace: EOM id=%016"PRIx64, id); + printf("filter-trace: EOM id=%016"PRIx64", size=%zu", id, size); return filter_api_accept(id); } diff --git a/smtpd/smtp_session.c b/smtpd/smtp_session.c index 41ad919d..40c64798 100644 --- a/smtpd/smtp_session.c +++ b/smtpd/smtp_session.c @@ -145,9 +145,9 @@ struct smtp_session { size_t rcptfail; TAILQ_HEAD(, smtp_rcpt) rcpts; - size_t datalen; - struct iobuf dataiobuf; - struct io dataio; + size_t odatalen; + struct iobuf obuf; + struct io oev; int dataeom; struct event pause; @@ -494,7 +494,7 @@ smtp_session_imsg(struct mproc *p, struct imsg *imsg) rcpt->maddr.user, rcpt->maddr.user[0] == '\0' ? "" : "@", rcpt->maddr.domain, - s->datalen, + s->odatalen, rcpt->destcount, s->flags & SF_EHLO ? "ESMTP" : "SMTP"); } @@ -763,17 +763,17 @@ smtp_filter_fd(uint64_t id, int fd) return; } - iobuf_init(&s->dataiobuf, 0, 0); - io_init(&s->dataio, fd, s, smtp_data_io, &s->dataiobuf); + iobuf_init(&s->obuf, 0, 0); + io_init(&s->oev, fd, s, smtp_data_io, &s->obuf); - iobuf_fqueue(&s->dataiobuf, "Received: "); + iobuf_fqueue(&s->obuf, "Received: "); if (! (s->listener->flags & F_MASK_SOURCE)) { - iobuf_fqueue(&s->dataiobuf, "from %s (%s [%s]);\n\t", + iobuf_fqueue(&s->obuf, "from %s (%s [%s]);\n\t", s->evp.helo, s->hostname, ss_to_text(&s->ss)); } - iobuf_fqueue(&s->dataiobuf, "by %s (%s) with %sSMTP%s%s id %08x;\n", + iobuf_fqueue(&s->obuf, "by %s (%s) with %sSMTP%s%s id %08x;\n", s->smtpname, SMTPD_NAME, s->flags & SF_EHLO ? "E" : "", @@ -783,7 +783,7 @@ smtp_filter_fd(uint64_t id, int fd) if (s->flags & SF_SECURE) { x = SSL_get_peer_certificate(s->io.ssl); - iobuf_fqueue(&s->dataiobuf, + iobuf_fqueue(&s->obuf, "\tTLS version=%s cipher=%s bits=%d verify=%s;\n", SSL_get_cipher_version(s->io.ssl), SSL_get_cipher_name(s->io.ssl), @@ -794,20 +794,20 @@ smtp_filter_fd(uint64_t id, int fd) } if (s->rcptcount == 1) { - iobuf_fqueue(&s->dataiobuf, "\tfor <%s@%s>;\n", + iobuf_fqueue(&s->obuf, "\tfor <%s@%s>;\n", s->evp.rcpt.user, s->evp.rcpt.domain); } - iobuf_fqueue(&s->dataiobuf, "\t%s\n", time_to_text(time(NULL))); + iobuf_fqueue(&s->obuf, "\t%s\n", time_to_text(time(NULL))); /* * XXX This is not exactly fair, since this is not really * user data. */ - s->datalen = iobuf_queued(&s->dataiobuf); + s->odatalen = iobuf_queued(&s->obuf); - io_set_write(&s->dataio); + io_set_write(&s->oev); smtp_enter_state(s, STATE_BODY); smtp_reply(s, "354 Enter mail, end with \".\"" @@ -929,7 +929,7 @@ smtp_io(struct io *io, int evt) io_set_write(io); s->dataeom = 1; - if (iobuf_queued(&s->dataiobuf) == 0) + if (iobuf_queued(&s->obuf) == 0) smtp_data_io_done(s); return; } @@ -1004,8 +1004,8 @@ smtp_data_io(struct io *io, int evt) case IO_DISCONNECTED: case IO_ERROR: log_debug("debug: smtp: %p: io error on mfa", s); - io_clear(&s->dataio); - iobuf_clear(&s->dataiobuf); + io_clear(&s->oev); + iobuf_clear(&s->obuf); s->msgflags |= MF_ERROR_IO; if (s->io.flags & IO_PAUSE_IN) { log_debug("debug: smtp: %p: resuming session after mfa error", s); @@ -1014,7 +1014,7 @@ smtp_data_io(struct io *io, int evt) break; case IO_LOWAT: - if (s->dataeom && iobuf_queued(&s->dataiobuf) == 0) { + if (s->dataeom && iobuf_queued(&s->obuf) == 0) { smtp_data_io_done(s); } else if (s->io.flags & IO_PAUSE_IN) { log_debug("debug: smtp: %p: filter congestion over: resuming session", s); @@ -1030,9 +1030,9 @@ smtp_data_io(struct io *io, int evt) static void smtp_data_io_done(struct smtp_session *s) { - log_debug("debug: smtp: %p: data io done (%zu bytes)", s, s->datalen); - io_clear(&s->dataio); - iobuf_clear(&s->dataiobuf); + log_debug("debug: smtp: %p: data io done (%zu bytes)", s, s->odatalen); + io_clear(&s->oev); + iobuf_clear(&s->obuf); if (s->msgflags & MF_ERROR) { @@ -1698,7 +1698,7 @@ smtp_message_reset(struct smtp_session *s, int prepare) s->msgflags = 0; s->destcount = 0; s->rcptcount = 0; - s->datalen = 0; + s->odatalen = 0; if (prepare) { s->evp.ss = s->ss; @@ -2006,7 +2006,7 @@ static void smtp_filter_eom(struct smtp_session *s) { tree_xset(&wait_filter, s->id, s); - filter_eom(s->id, QUERY_EOM, s->datalen); + filter_eom(s->id, QUERY_EOM, s->odatalen); } static void @@ -2046,19 +2046,19 @@ smtp_filter_dataline(struct smtp_session *s, const char *line) if (s->msgflags & MF_ERROR) return; - n = iobuf_fqueue(&s->dataiobuf, "%s\n", line); + n = iobuf_fqueue(&s->obuf, "%s\n", line); if (n == -1) { /* XXX */ fatalx("iobuf_fqueue"); } - s->datalen += strlen(line) +1; + s->odatalen += strlen(line) +1; - if (iobuf_queued(&s->dataiobuf) > DATA_HIWAT && !(s->io.flags & IO_PAUSE_IN)) { + if (iobuf_queued(&s->obuf) > DATA_HIWAT && !(s->io.flags & IO_PAUSE_IN)) { log_debug("debug: smtp: %p: filter congestion over: pausing session", s); io_pause(&s->io, IO_PAUSE_IN); } - io_reload(&s->dataio); + io_reload(&s->oev); } #define CASE(x) case x : return #x diff --git a/smtpd/smtpd-api.h b/smtpd/smtpd-api.h index 98e83570..e8cf5771 100644 --- a/smtpd/smtpd-api.h +++ b/smtpd/smtpd-api.h @@ -57,9 +57,7 @@ enum filter_imsg { IMSG_FILTER_REGISTER, IMSG_FILTER_EVENT, IMSG_FILTER_QUERY, - IMSG_FILTER_PIPE_SETUP, - IMSG_FILTER_PIPE_ABORT, - IMSG_FILTER_NOTIFY, + IMSG_FILTER_PIPE, IMSG_FILTER_RESPONSE }; @@ -361,7 +359,7 @@ void filter_api_on_mail(int(*)(uint64_t, struct mailaddr *)); void filter_api_on_rcpt(int(*)(uint64_t, struct mailaddr *)); void filter_api_on_data(int(*)(uint64_t)); void filter_api_on_dataline(void(*)(uint64_t, const char *)); -void filter_api_on_eom(int(*)(uint64_t)); +void filter_api_on_eom(int(*)(uint64_t, size_t)); /* queue */ void queue_api_on_message_create(int(*)(uint32_t *)); |