aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEric Faurot <eric@faurot.net>2014-05-18 17:47:40 +0200
committerEric Faurot <eric@faurot.net>2014-05-18 17:47:40 +0200
commitfa6fa9669361b9d584f338a95075cd1e27e09429 (patch)
tree2d1085cd8183fd6aea3a0c83da208256253159e0
parentMerge branch 'master' into portable (diff)
parentMerge branch 'master' of ssh.poolp.org:/git/opensmtpd (diff)
downloadOpenSMTPD-fa6fa9669361b9d584f338a95075cd1e27e09429.tar.xz
OpenSMTPD-fa6fa9669361b9d584f338a95075cd1e27e09429.zip
Merge branch 'master' into portable
-rw-r--r--smtpd/filter.c174
-rw-r--r--smtpd/filter_api.c340
-rw-r--r--smtpd/filters/filter_monkey.c2
-rw-r--r--smtpd/filters/filter_perl.c2
-rw-r--r--smtpd/filters/filter_python.c99
-rw-r--r--smtpd/filters/filter_stub.c2
-rw-r--r--smtpd/filters/filter_trace.c4
-rw-r--r--smtpd/smtp_session.c54
-rw-r--r--smtpd/smtpd-api.h6
9 files changed, 368 insertions, 315 deletions
diff --git a/smtpd/filter.c b/smtpd/filter.c
index 483205c9..91f2c141 100644
--- a/smtpd/filter.c
+++ b/smtpd/filter.c
@@ -43,8 +43,8 @@
#include "log.h"
enum {
- QT_QUERY,
- QT_EVENT,
+ QK_QUERY,
+ QK_EVENT,
};
enum {
@@ -78,19 +78,19 @@ struct filter_session {
struct filter_query_lst queries;
- struct io io;
- struct iobuf iobuf;
- FILE *ofile;
- size_t datain;
- size_t datalen;
int error;
+ struct io iev;
+ struct iobuf ibuf;
+ size_t idatalen;
+ FILE *ofile;
+
struct filter_query *eom;
};
struct filter_query {
uint64_t qid;
+ int kind;
int type;
- int hook;
struct filter_session *session;
TAILQ_ENTRY(filter_query) entry;
@@ -123,7 +123,7 @@ static struct filter_query *filter_query(struct filter_session *, int, int);
static void filter_drain_query(struct filter_query *);
static void filter_run_query(struct filter *, struct filter_query *);
static void filter_end_query(struct filter_query *);
-static void filter_set_fdout(struct filter_session *, int);
+static void filter_set_sink(struct filter_session *, int);
static int filter_tx(struct filter_session *, int);
static void filter_tx_io(struct io *, int);
@@ -134,7 +134,7 @@ static const char * filter_session_to_text(struct filter_session *);
static const char * filter_query_to_text(struct filter_query *);
static const char * filter_to_text(struct filter *);
static const char * filter_proc_to_text(struct filter_proc *);
-static const char * type_to_str(int);
+static const char * kind_to_str(int);
static const char * query_to_str(int);
static const char * event_to_str(int);
static const char * status_to_str(int);
@@ -291,7 +291,7 @@ filter_event(uint64_t id, int event)
s = xcalloc(1, sizeof(*s), "filter_event");
s->id = id;
s->filters = dict_xget(&chains, "default");
- s->io.sock = -1;
+ s->iev.sock = -1;
TAILQ_INIT(&s->queries);
tree_xset(&sessions, s->id, s);
}
@@ -300,7 +300,7 @@ filter_event(uint64_t id, int event)
s = tree_xpop(&sessions, id);
else
s = tree_xget(&sessions, id);
- q = filter_query(s, QT_EVENT, event);
+ q = filter_query(s, QK_EVENT, event);
filter_drain_query(q);
}
@@ -313,7 +313,7 @@ filter_connect(uint64_t id, const struct sockaddr *local,
struct filter_query *q;
s = tree_xget(&sessions, id);
- q = filter_query(s, QT_QUERY, QUERY_CONNECT);
+ q = filter_query(s, QK_QUERY, QUERY_CONNECT);
memmove(&q->u.connect.local, local, SA_LEN(local));
memmove(&q->u.connect.remote, remote, SA_LEN(remote));
@@ -327,13 +327,13 @@ filter_connect(uint64_t id, const struct sockaddr *local,
}
void
-filter_mailaddr(uint64_t id, int qhook, const struct mailaddr *maddr)
+filter_mailaddr(uint64_t id, int type, const struct mailaddr *maddr)
{
struct filter_session *s;
struct filter_query *q;
s = tree_xget(&sessions, id);
- q = filter_query(s, QT_QUERY, qhook);
+ q = filter_query(s, QK_QUERY, type);
strlcpy(q->u.maddr.user, maddr->user, sizeof(q->u.maddr.user));
strlcpy(q->u.maddr.domain, maddr->domain, sizeof(q->u.maddr.domain));
@@ -342,13 +342,13 @@ filter_mailaddr(uint64_t id, int qhook, const struct mailaddr *maddr)
}
void
-filter_line(uint64_t id, int qhook, const char *line)
+filter_line(uint64_t id, int type, const char *line)
{
struct filter_session *s;
struct filter_query *q;
s = tree_xget(&sessions, id);
- q = filter_query(s, QT_QUERY, qhook);
+ q = filter_query(s, QK_QUERY, type);
if (line)
strlcpy(q->u.line, line, sizeof(q->u.line));
@@ -357,29 +357,28 @@ filter_line(uint64_t id, int qhook, const char *line)
}
void
-filter_eom(uint64_t id, int qhook, size_t datalen)
+filter_eom(uint64_t id, int type, size_t datalen)
{
struct filter_session *s;
struct filter_query *q;
s = tree_xget(&sessions, id);
- q = filter_query(s, QT_QUERY, qhook);
+ q = filter_query(s, QK_QUERY, type);
q->u.datalen = datalen;
filter_drain_query(q);
}
static void
-filter_set_fdout(struct filter_session *s, int fdout)
+filter_set_sink(struct filter_session *s, int sink)
{
struct mproc *p;
- int fd;
while(s->fcurr) {
if (s->fcurr->proc->hooks & HOOK_DATALINE) {
- log_trace(TRACE_MFA, "filter: sending fd %d to %s", fdout, filter_to_text(s->fcurr));
+ log_trace(TRACE_MFA, "filter: sending fd %d to %s", sink, filter_to_text(s->fcurr));
p = &s->fcurr->proc->mproc;
- m_create(p, IMSG_FILTER_PIPE_SETUP, 0, 0, fdout);
+ m_create(p, IMSG_FILTER_PIPE, 0, 0, sink);
m_add_id(p, s->id);
m_close(p);
return;
@@ -387,45 +386,46 @@ filter_set_fdout(struct filter_session *s, int fdout)
s->fcurr = TAILQ_PREV(s->fcurr, filter_lst, entry);
}
- log_trace(TRACE_MFA, "filter: chain input is %d", fdout);
+ log_trace(TRACE_MFA, "filter: chain input is %d", sink);
- fd = filter_tx(s, fdout);
- smtp_filter_fd(s->id, fd);
+ smtp_filter_fd(s->id, sink);
}
void
-filter_build_fd_chain(uint64_t id, int fdout)
+filter_build_fd_chain(uint64_t id, int sink)
{
struct filter_session *s;
+ int fd;
s = tree_xget(&sessions, id);
s->fcurr = TAILQ_LAST(s->filters, filter_lst);
- filter_set_fdout(s, fdout);
+ fd = filter_tx(s, sink);
+ filter_set_sink(s, fd);
}
static struct filter_query *
-filter_query(struct filter_session *s, int type, int qhook)
+filter_query(struct filter_session *s, int kind, int type)
{
struct filter_query *q;
q = xcalloc(1, sizeof *q, "filter_query");
q->qid = generate_uid();
q->session = s;
+ q->kind = kind;
q->type = type;
- q->hook = qhook;
TAILQ_INSERT_TAIL(&s->queries, q, entry);
q->state = QUERY_READY;
q->current = TAILQ_FIRST(s->filters);
q->hasrun = 0;
- if (type == QT_QUERY)
- log_trace(TRACE_MFA, "filter: new query %s %s", type_to_str(type),
- query_to_str(qhook));
+ if (kind == QK_QUERY)
+ log_trace(TRACE_MFA, "filter: new query %s %s", kind_to_str(kind),
+ query_to_str(type));
else
- log_trace(TRACE_MFA, "filter: new query %s %s", type_to_str(type),
- event_to_str(qhook));
+ log_trace(TRACE_MFA, "filter: new query %s %s", kind_to_str(kind),
+ event_to_str(type));
return (q);
}
@@ -478,7 +478,7 @@ filter_drain_query(struct filter_query *q)
}
/* Defer the response if the file is not closed yet. */
- if (q->type == QT_QUERY && q->hook == QUERY_EOM && q->session->ofile) {
+ if (q->kind == QK_QUERY && q->type == QUERY_EOM && q->session->ofile) {
log_debug("filter: deferring eom query...");
q->session->eom = q;
return;
@@ -490,7 +490,7 @@ filter_drain_query(struct filter_query *q)
static void
filter_run_query(struct filter *f, struct filter_query *q)
{
- if (q->type == QT_QUERY) {
+ if (q->kind == QK_QUERY) {
log_trace(TRACE_MFA, "filter: running filter %s for query %s",
filter_to_text(f), filter_query_to_text(q));
@@ -498,9 +498,9 @@ filter_run_query(struct filter *f, struct filter_query *q)
m_create(&f->proc->mproc, IMSG_FILTER_QUERY, 0, 0, -1);
m_add_id(&f->proc->mproc, q->session->id);
m_add_id(&f->proc->mproc, q->qid);
- m_add_int(&f->proc->mproc, q->hook);
+ m_add_int(&f->proc->mproc, q->type);
- switch (q->hook) {
+ switch (q->type) {
case QUERY_CONNECT:
m_add_sockaddr(&f->proc->mproc,
(struct sockaddr *)&q->u.connect.local);
@@ -532,7 +532,7 @@ filter_run_query(struct filter *f, struct filter_query *q)
m_create(&f->proc->mproc, IMSG_FILTER_EVENT, 0, 0, -1);
m_add_id(&f->proc->mproc, q->session->id);
- m_add_int(&f->proc->mproc, q->hook);
+ m_add_int(&f->proc->mproc, q->type);
m_close(&f->proc->mproc);
}
}
@@ -544,18 +544,22 @@ filter_end_query(struct filter_query *q)
log_trace(TRACE_MFA, "filter: filter_end_query %s", filter_query_to_text(q));
- if (q->type == QT_EVENT)
+ if (q->kind == QK_EVENT)
goto done;
- if (q->hook == QUERY_EOM) {
+ if (q->type == QUERY_EOM) {
+
+ log_debug("debug: filter: filter_end_query(%d, %zu, %zu)", s->iev.sock,
+ s->idatalen, q->u.datalen);
+
if (s->error) {
smtp_filter_response(s->id, QUERY_EOM, FILTER_FAIL, 0, NULL);
free(q->smtp.response);
goto done;
}
- else if (q->u.datalen != s->datain) {
+ else if (q->u.datalen != s->idatalen) {
log_warnx("filter: datalen mismatch on session %" PRIx64
- ": %zu/%zu", s->id, s->datain, q->u.datalen);
+ ": %zu/%zu", s->id, s->idatalen, q->u.datalen);
smtp_filter_response(s->id, QUERY_EOM, FILTER_FAIL, 0, NULL);
free(q->smtp.response);
goto done;
@@ -569,7 +573,7 @@ filter_end_query(struct filter_query *q)
status_to_str(q->smtp.status),
q->smtp.code,
q->smtp.response);
- smtp_filter_response(s->id, q->hook, q->smtp.status, q->smtp.code,
+ smtp_filter_response(s->id, q->type, q->smtp.status, q->smtp.code,
q->smtp.response);
free(q->smtp.response);
@@ -588,7 +592,7 @@ filter_imsg(struct mproc *p, struct imsg *imsg)
const char *line;
uint64_t qid;
uint32_t datalen;
- int qhook, status, code;
+ int type, status, code;
if (imsg == NULL) {
log_warnx("warn: filter \"%s\" closed unexpectedly", p->name);
@@ -627,8 +631,8 @@ filter_imsg(struct mproc *p, struct imsg *imsg)
case IMSG_FILTER_RESPONSE:
m_msg(&m, imsg);
m_get_id(&m, &qid);
- m_get_int(&m, &qhook);
- if (qhook == QUERY_EOM)
+ m_get_int(&m, &type);
+ if (type == QUERY_EOM)
m_get_u32(&m, &datalen);
m_get_int(&m, &status);
m_get_int(&m, &code);
@@ -639,8 +643,8 @@ filter_imsg(struct mproc *p, struct imsg *imsg)
m_end(&m);
q = tree_xpop(&queries, qid);
- if (q->hook != qhook) {
- log_warnx("warn: filter: hook mismatch %d != %d", q->hook, qhook);
+ if (q->type != type) {
+ log_warnx("warn: filter: type mismatch %d != %d", q->type, type);
fatalx("exiting");
}
q->smtp.status = status;
@@ -651,7 +655,7 @@ filter_imsg(struct mproc *p, struct imsg *imsg)
q->smtp.response = xstrdup(line, "filter_imsg");
}
q->state = (status == FILTER_OK) ? QUERY_READY : QUERY_DONE;
- if (qhook == QUERY_EOM)
+ if (type == QUERY_EOM)
q->u.datalen = datalen;
next = TAILQ_NEXT(q, entry);
@@ -665,14 +669,14 @@ filter_imsg(struct mproc *p, struct imsg *imsg)
filter_drain_query(next);
break;
- case IMSG_FILTER_PIPE_SETUP:
+ case IMSG_FILTER_PIPE:
m_msg(&m, imsg);
m_get_id(&m, &qid);
m_end(&m);
s = tree_xget(&sessions, qid);
s->fcurr = TAILQ_PREV(s->fcurr, filter_lst, entry);
- filter_set_fdout(s, imsg->fd);
+ filter_set_sink(s, imsg->fd);
break;
default:
@@ -682,13 +686,11 @@ filter_imsg(struct mproc *p, struct imsg *imsg)
}
static int
-filter_tx(struct filter_session *s, int fdout)
+filter_tx(struct filter_session *s, int sink)
{
- int sp[2];
+ int sp[2];
- /* reset */
- s->datain = 0;
- s->datalen = 0;
+ s->idatalen = 0;
s->eom = NULL;
s->error = 0;
@@ -697,16 +699,16 @@ filter_tx(struct filter_session *s, int fdout)
return (-1);
}
- if ((s->ofile = fdopen(fdout, "w")) == NULL) {
+ if ((s->ofile = fdopen(sink, "w")) == NULL) {
log_warn("warn: filter: fdopen");
close(sp[0]);
close(sp[1]);
return (-1);
}
- iobuf_init(&s->iobuf, 0, 0);
- io_init(&s->io, sp[0], s, filter_tx_io, &s->iobuf);
- io_set_read(&s->io);
+ iobuf_init(&s->ibuf, 0, 0);
+ io_init(&s->iev, sp[0], s, filter_tx_io, &s->ibuf);
+ io_set_read(&s->iev);
return (sp[1]);
}
@@ -717,26 +719,32 @@ filter_tx_io(struct io *io, int evt)
struct filter_session *s = io->arg;
size_t len, n;
char *data;
+ char buf[65535];
+
+ log_debug("filter: filter_tx_io(%p, %s)", s, io_strevent(evt));
switch (evt) {
case IO_DATAIN:
- data = iobuf_data(&s->iobuf);
- len = iobuf_len(&s->iobuf);
- log_debug("debug: filter: tx data (%zu) for req %016"PRIx64,
- len, s->id);
+ data = iobuf_data(&s->ibuf);
+ len = iobuf_len(&s->ibuf);
+ memmove(buf, data, len);
+ buf[len] = 0;
+ log_debug("filter: filter_tx_io: datain (%zu) for req %016"PRIx64": %s",
+ len, s->id, buf);
+
n = fwrite(data, 1, len, s->ofile);
if (n != len) {
log_warnx("warn: filter_tx_io: fwrite %zu/%zu", n, len);
s->error = 1;
break;
}
- s->datain += n;
- iobuf_drop(&s->iobuf, n);
- iobuf_normalize(&s->iobuf);
+ s->idatalen += n;
+ iobuf_drop(&s->ibuf, n);
+ iobuf_normalize(&s->ibuf);
return;
case IO_DISCONNECTED:
- log_debug("debug: filter: tx done for req %016"PRIx64, s->id);
+ log_debug("debug: filter: tx done (%zu) for req %016"PRIx64, s->idatalen, s->id);
break;
default:
@@ -745,8 +753,8 @@ filter_tx_io(struct io *io, int evt)
break;
}
- io_clear(&s->io);
- iobuf_clear(&s->iobuf);
+ io_clear(&s->iev);
+ iobuf_clear(&s->ibuf);
fclose(s->ofile);
s->ofile = NULL;
@@ -767,8 +775,8 @@ filter_query_to_text(struct filter_query *q)
tmp[0] = '\0';
- if (q->type == QT_QUERY) {
- switch(q->hook) {
+ if (q->kind == QK_QUERY) {
+ switch(q->type) {
case QUERY_CONNECT:
strlcat(tmp, "=", sizeof tmp);
strlcat(tmp, ss_to_text(&q->u.connect.local), sizeof tmp);
@@ -790,12 +798,12 @@ filter_query_to_text(struct filter_query *q)
break;
}
snprintf(buf, sizeof buf, "%016"PRIx64"[%s,%s%s,%s]",
- q->qid, type_to_str(q->type), query_to_str(q->hook), tmp,
+ q->qid, kind_to_str(q->kind), query_to_str(q->type), tmp,
filter_session_to_text(q->session));
}
else {
snprintf(buf, sizeof buf, "%016"PRIx64"[%s,%s%s,%s]",
- q->qid, type_to_str(q->type), event_to_str(q->hook), tmp,
+ q->qid, kind_to_str(q->kind), event_to_str(q->type), tmp,
filter_session_to_text(q->session));
}
@@ -811,7 +819,7 @@ filter_session_to_text(struct filter_session *s)
return "filter_session@NULL";
snprintf(buf, sizeof(buf), "filter_session@%p[datalen=%zu,eom=%p,ofile=%p]",
- s, s->datalen, s->eom, s->ofile);
+ s, s->idatalen, s->eom, s->ofile);
return buf;
}
@@ -846,9 +854,7 @@ filterimsg_to_str(int imsg)
CASE(IMSG_FILTER_REGISTER);
CASE(IMSG_FILTER_EVENT);
CASE(IMSG_FILTER_QUERY);
- CASE(IMSG_FILTER_PIPE_SETUP);
- CASE(IMSG_FILTER_PIPE_ABORT);
- CASE(IMSG_FILTER_NOTIFY);
+ CASE(IMSG_FILTER_PIPE);
CASE(IMSG_FILTER_RESPONSE);
default:
return "IMSG_FILTER_???";
@@ -886,13 +892,13 @@ event_to_str(int event)
}
static const char *
-type_to_str(int type)
+kind_to_str(int type)
{
switch (type) {
- CASE(QT_QUERY);
- CASE(QT_EVENT);
+ CASE(QK_QUERY);
+ CASE(QK_EVENT);
default:
- return "QT_???";
+ return "QK_???";
}
}
diff --git a/smtpd/filter_api.c b/smtpd/filter_api.c
index cbf10761..586b876d 100644
--- a/smtpd/filter_api.c
+++ b/smtpd/filter_api.c
@@ -44,10 +44,12 @@ static struct tree sessions;
struct filter_session {
uint64_t id;
uint64_t qid;
- int qhook;
+ int qtype;
+ size_t datalen;
struct {
- size_t datalen;
+ int eom_called;
+
int error;
struct io iev;
struct iobuf ibuf;
@@ -85,7 +87,7 @@ static struct filter_internals {
int (*rcpt)(uint64_t, struct mailaddr *);
int (*data)(uint64_t);
void (*dataline)(uint64_t, const char *);
- int (*eom)(uint64_t);
+ int (*eom)(uint64_t, size_t);
void (*disconnect)(uint64_t);
void (*reset)(uint64_t);
@@ -97,7 +99,7 @@ static struct filter_internals {
static void filter_api_init(void);
static void filter_response(struct filter_session *, int, int, const char *);
static void filter_send_response(struct filter_session *);
-static void filter_register_query(uint64_t, uint64_t, enum filter_hook_type);
+static void filter_register_query(uint64_t, uint64_t, int);
static void filter_dispatch(struct mproc *, struct imsg *);
static void filter_dispatch_dataline(uint64_t, const char *);
static void filter_dispatch_data(uint64_t);
@@ -123,11 +125,8 @@ static const char *event_to_str(int);
static void
filter_response(struct filter_session *s, int status, int code, const char *line)
{
- log_debug("debug: filter-api:%s: got response %s for %016"PRIx64" %d %d %s",
- filter_name, query_to_str(s->qhook), s->id,
- s->response.status,
- s->response.code,
- s->response.line);
+ log_debug("debug: filter-api:%s %016"PRIx64" %s filter_response(%d, %d, %s)",
+ filter_name, s->id, query_to_str(s->qtype), status, code, line);
s->response.ready = 1;
s->response.status = status;
@@ -137,22 +136,18 @@ filter_response(struct filter_session *s, int status, int code, const char *line
else
s->response.line = NULL;
- /* For HOOK_EOM, wait until the obuf is drained before sending the */
- if (s->qhook == QUERY_EOM &&
- fi.hooks & HOOK_DATALINE &&
- s->pipe.oev.sock != -1) {
- log_debug("debug: filter-api:%s: got response, waiting for opipe to be closed", filter_name);
- return;
- }
-
- filter_send_response(s);
+ /* eom is special, as the reponse has to be deferred until the pipe is all flushed */
+ if (s->qtype == QUERY_EOM)
+ filter_trigger_eom(s);
+ else
+ filter_send_response(s);
}
static void
filter_send_response(struct filter_session *s)
{
- log_debug("debug: filter-api:%s: sending response %s for %016"PRIx64" %d %d %s",
- filter_name, query_to_str(s->qhook), s->id,
+ log_debug("debug: filter-api:%s %016"PRIx64" %s filter_send_response() -> %d, %d, %s",
+ filter_name, s->id, query_to_str(s->qtype),
s->response.status,
s->response.code,
s->response.line);
@@ -161,10 +156,9 @@ filter_send_response(struct filter_session *s)
m_create(&fi.p, IMSG_FILTER_RESPONSE, 0, 0, -1);
m_add_id(&fi.p, s->qid);
- m_add_int(&fi.p, s->qhook);
- if (s->qhook == QUERY_EOM)
- m_add_u32(&fi.p, (fi.hooks & HOOK_DATALINE) ?
- s->pipe.odatalen : s->pipe.datalen);
+ m_add_int(&fi.p, s->qtype);
+ if (s->qtype == QUERY_EOM)
+ m_add_u32(&fi.p, s->datalen);
m_add_int(&fi.p, s->response.status);
m_add_int(&fi.p, s->response.code);
if (s->response.line) {
@@ -188,10 +182,10 @@ filter_dispatch(struct mproc *p, struct imsg *imsg)
const char *line, *name;
uint32_t v, datalen;
uint64_t id, qid;
- int status, event, hook;
+ int status, type;
int fds[2], fdin, fdout;
- log_debug("debug: filter-api:%s: imsg %s", filter_name,
+ log_debug("debug: filter-api:%s imsg %s", filter_name,
filterimsg_to_str(imsg->hdr.type));
switch (imsg->hdr.type) {
@@ -202,7 +196,7 @@ filter_dispatch(struct mproc *p, struct imsg *imsg)
filter_name = strdup(name);
m_end(&m);
if (v != FILTER_API_VERSION) {
- log_warnx("warn: filter-api:%s: API mismatch", filter_name);
+ log_warnx("warn: filter-api:%s API mismatch", filter_name);
fatalx("filter-api: exiting");
}
m_create(p, IMSG_FILTER_REGISTER, 0, 0, -1);
@@ -214,9 +208,9 @@ filter_dispatch(struct mproc *p, struct imsg *imsg)
case IMSG_FILTER_EVENT:
m_msg(&m, imsg);
m_get_id(&m, &id);
- m_get_int(&m, &event);
+ m_get_int(&m, &type);
m_end(&m);
- switch (event) {
+ switch (type) {
case EVENT_CONNECT:
s = xcalloc(1, sizeof(*s), "filter_dispatch");
s->id = id;
@@ -238,6 +232,9 @@ filter_dispatch(struct mproc *p, struct imsg *imsg)
case EVENT_ROLLBACK:
filter_dispatch_rollback(id);
break;
+ default:
+ log_warnx("warn: filter-api:%s bad event %d", filter_name, type);
+ fatalx("filter-api: exiting");
}
break;
@@ -245,52 +242,52 @@ filter_dispatch(struct mproc *p, struct imsg *imsg)
m_msg(&m, imsg);
m_get_id(&m, &id);
m_get_id(&m, &qid);
- m_get_int(&m, &hook);
- switch(hook) {
+ m_get_int(&m, &type);
+ switch(type) {
case QUERY_CONNECT:
m_get_sockaddr(&m, (struct sockaddr*)&q_connect.local);
m_get_sockaddr(&m, (struct sockaddr*)&q_connect.remote);
m_get_string(&m, &q_connect.hostname);
m_end(&m);
- filter_register_query(id, qid, hook);
+ filter_register_query(id, qid, type);
filter_dispatch_connect(id, &q_connect);
break;
case QUERY_HELO:
m_get_string(&m, &line);
m_end(&m);
- filter_register_query(id, qid, hook);
+ filter_register_query(id, qid, type);
filter_dispatch_helo(id, line);
break;
case QUERY_MAIL:
m_get_mailaddr(&m, &maddr);
m_end(&m);
- filter_register_query(id, qid, hook);
+ filter_register_query(id, qid, type);
filter_dispatch_mail(id, &maddr);
break;
case QUERY_RCPT:
m_get_mailaddr(&m, &maddr);
m_end(&m);
- filter_register_query(id, qid, hook);
+ filter_register_query(id, qid, type);
filter_dispatch_rcpt(id, &maddr);
break;
case QUERY_DATA:
m_end(&m);
- filter_register_query(id, qid, hook);
+ filter_register_query(id, qid, type);
filter_dispatch_data(id);
break;
case QUERY_EOM:
m_get_u32(&m, &datalen);
m_end(&m);
- filter_register_query(id, qid, hook);
+ filter_register_query(id, qid, type);
filter_dispatch_eom(id, datalen);
break;
default:
- log_warnx("warn: filter-api:%s: bad hook %d", filter_name, hook);
+ log_warnx("warn: filter-api:%s bad query %d", filter_name, type);
fatalx("filter-api: exiting");
}
break;
- case IMSG_FILTER_PIPE_SETUP:
+ case IMSG_FILTER_PIPE:
m_msg(&m, imsg);
m_get_id(&m, &id);
m_end(&m);
@@ -299,11 +296,17 @@ filter_dispatch(struct mproc *p, struct imsg *imsg)
fdin = -1;
if (socketpair(AF_UNIX, SOCK_STREAM, PF_UNSPEC, fds) == -1) {
- log_warn("warn: filter-api:%s: socketpair", filter_name);
+ log_warn("warn: filter-api:%s socketpair", filter_name);
close(fdout);
}
else {
s = tree_xget(&sessions, id);
+
+ s->pipe.eom_called = 0;
+ s->pipe.error = 0;
+ s->pipe.idatalen = 0;
+ s->pipe.odatalen = 0;
+
iobuf_init(&s->pipe.obuf, 0, 0);
io_init(&s->pipe.oev, fdout, s, filter_io_out, &s->pipe.obuf);
io_set_write(&s->pipe.oev);
@@ -313,48 +316,34 @@ filter_dispatch(struct mproc *p, struct imsg *imsg)
io_set_read(&s->pipe.iev);
fdin = fds[1];
- /* XXX notify? */
}
- log_debug("debug: filter-api:%s: tx pipe %d -> %d for %016"PRIx64, filter_name, fdin, fdout, id);
- m_create(&fi.p, IMSG_FILTER_PIPE_SETUP, 0, 0, fdin);
+
+ log_debug("debug: filter-api:%s %016"PRIx64" tx pipe %d -> %d",
+ filter_name, id, fdin, fdout);
+
+ m_create(&fi.p, IMSG_FILTER_PIPE, 0, 0, fdin);
m_add_id(&fi.p, id);
m_close(&fi.p);
- break;
- case IMSG_FILTER_PIPE_ABORT:
- m_msg(&m, imsg);
- m_get_id(&m, &id);
- m_end(&m);
- s = tree_xget(&sessions, id);
- if (s->pipe.iev.sock != -1) {
- io_clear(&s->pipe.iev);
- iobuf_clear(&s->pipe.ibuf);
- }
- if (s->pipe.oev.sock != -1) {
- io_clear(&s->pipe.oev);
- iobuf_clear(&s->pipe.obuf);
- }
- /* XXX notify? */
break;
}
}
static void
-filter_register_query(uint64_t id, uint64_t qid, enum filter_hook_type hook)
+filter_register_query(uint64_t id, uint64_t qid, int type)
{
struct filter_session *s;
- log_debug("debug: filter-api:%s: query %s for %016"PRIx64,
- filter_name, query_to_str(hook), id);
+ log_debug("debug: filter-api:%s %016"PRIx64" %s", filter_name, id, query_to_str(type));
s = tree_xget(&sessions, id);
if (s->qid) {
- log_warnx("warn: filter-api:%s: query already in progess",
+ log_warnx("warn: filter-api:%s query already in progess",
filter_name);
fatalx("filter-api: exiting");
}
s->qid = qid;
- s->qhook = hook;
+ s->qtype = type;
s->response.ready = 0;
tree_xset(&queries, qid, s);
@@ -433,6 +422,14 @@ filter_dispatch_disconnect(uint64_t id)
fi.cb.disconnect(id);
}
+static void
+filter_dispatch_dataline(uint64_t id, const char *data)
+{
+ if (fi.cb.dataline)
+ fi.cb.dataline(id, data);
+ else
+ filter_api_writeln(id, data);
+}
static void
filter_dispatch_eom(uint64_t id, size_t datalen)
@@ -440,71 +437,62 @@ filter_dispatch_eom(uint64_t id, size_t datalen)
struct filter_session *s;
s = tree_xget(&sessions, id);
- s->pipe.datalen = datalen;
+ s->datalen = datalen;
- if (fi.hooks & HOOK_DATALINE) {
- /* wait for the io to be done */
- if (s->pipe.iev.sock != -1) {
- log_debug("debug: filter-api:%s: eom received for %016"PRIx64", waiting for io to end",
- filter_name, id);
- return;
- }
- filter_trigger_eom(s);
- return;
- }
-
- if (fi.cb.eom)
- fi.cb.eom(s->id);
- else
- filter_api_accept(id);
-}
-
-static void
-filter_dispatch_dataline(uint64_t id, const char *data)
-{
- if (fi.cb.dataline)
- fi.cb.dataline(id, data);
- else
- filter_api_writeln(id, data);
+ filter_trigger_eom(s);
}
static void
filter_trigger_eom(struct filter_session *s)
{
- log_debug("debug: filter-api:%s: tx eom (%zu) for %016"PRIx64, filter_name, s->pipe.datalen, s->id);
+ log_debug("debug: filter-api:%s %016"PRIx64" filter_trigger_eom(%d, %d, %zu, %zu, %zu)",
+ filter_name, s->id, s->pipe.iev.sock, s->pipe.oev.sock,
+ s->datalen, s->pipe.idatalen, s->pipe.odatalen);
+
+ /* This is called when
+ * - EOM query is first received
+ * - input data is closed
+ * - output has been written
+ */
+
+ /* input not done yet, or EOM query not received */
+ if (s->pipe.iev.sock != -1 || s->qid == 0)
+ return;
+
+ if (s->pipe.error)
+ goto fail;
- if (!s->pipe.error && s->pipe.idatalen != s->pipe.datalen) {
- log_debug("debug: filter-api:%s: tx datalen mismatch: %zu/%zu",
- filter_name, s->pipe.idatalen, s->pipe.datalen);
+ /* if size don't match, error out */
+ if (s->pipe.idatalen != s->datalen) {
+ log_debug("debug: filter-api:%s tx datalen mismatch: %zu/%zu",
+ filter_name, s->pipe.idatalen, s->datalen);
s->pipe.error = 1;
- }
- if (s->pipe.error) {
- log_debug("debug: filter-api:%s: tx pipe.error", filter_name);
- /* XXX error? */
+ goto fail;
}
- /* if the filter has no eom callback, we accept the message */
- if (fi.cb.eom) {
- log_debug("debug: filter-api:%s: calling eom callback", filter_name);
- fi.cb.eom(s->id);
- } else {
- log_debug("debug: filter-api:%s: accepting by default", filter_name);
- filter_api_accept(s->id);
+ /* if we didn't send the eom to the user do it now */
+ if (!s->pipe.eom_called) {
+ s->pipe.eom_called = 1;
+ if (fi.cb.eom)
+ fi.cb.eom(s->id, s->datalen);
+ else
+ filter_api_accept(s->id);
+ return;
}
- /* if the output is done and the response is ready, send it */
- if ((s->pipe.oev.sock == -1 || iobuf_queued(&s->pipe.obuf) == 0) &&
- s->response.ready) {
- log_debug("debug: filter-api:%s: sending response", filter_name);
- if (s->pipe.oev.sock != -1) {
- io_clear(&s->pipe.oev);
- iobuf_clear(&s->pipe.obuf);
- }
- filter_send_response(s);
- }
- else {
- log_debug("debug: filter-api:%s: waiting for obuf to drain", filter_name);
- }
+ if (s->pipe.error)
+ goto fail;
+
+ /* wait for the output socket to be closed */
+ if (s->pipe.oev.sock != -1)
+ return;
+
+ s->datalen = s->pipe.odatalen;
+ filter_send_response(s);
+
+ fail:
+ /* XXX */
+ return;
}
static void
@@ -514,7 +502,7 @@ filter_io_in(struct io *io, int evt)
char *line;
size_t len;
- log_debug("debug: filter-api:%s: filter_io_in(%p, %s)",
+ log_debug("debug: filter-api:%s filter_io_in(%p, %s)",
filter_name, s, io_strevent(evt));
switch (evt) {
@@ -524,8 +512,6 @@ filter_io_in(struct io *io, int evt)
if ((line == NULL && iobuf_len(&s->pipe.ibuf) >= SMTPD_MAXLINESIZE) ||
(line && len >= SMTPD_MAXLINESIZE)) {
s->pipe.error = 1;
- io_clear(&s->pipe.oev);
- iobuf_clear(&s->pipe.obuf);
break;
}
/* No complete line received */
@@ -533,28 +519,37 @@ filter_io_in(struct io *io, int evt)
iobuf_normalize(&s->pipe.ibuf);
/* flow control */
if (iobuf_queued(&s->pipe.obuf) >= FILTER_HIWAT)
- io_pause(&s->pipe.oev, IO_PAUSE_IN);
+ io_pause(&s->pipe.iev, IO_PAUSE_IN);
return;
}
+
s->pipe.idatalen += len + 1;
+ /* XXX warning: do not clear io from this call! */
filter_dispatch_dataline(s->id, line);
goto nextline;
case IO_DISCONNECTED:
- if (s->qhook == QUERY_EOM)
- filter_trigger_eom(s);
- else {
- log_debug("debug: filter-api:%s: datain closed, for %016"PRIx64", waiting for eom",
+ if (iobuf_len(&s->pipe.ibuf)) {
+ log_warn("warn: filter-api:%s %016"PRIx64" incomplete input",
filter_name, s->id);
}
+ log_debug("debug: filter-api:%s %016"PRIx64" input done (%zu bytes)",
+ filter_name, s->id, s->pipe.idatalen);
break;
+
default:
+ log_warn("warn: filter-api:%s %016"PRIx64": unexpected io event %d on data pipe",
+ filter_name, s->id, evt);
s->pipe.error = 1;
+
+ }
+ if (s->pipe.error) {
io_clear(&s->pipe.oev);
iobuf_clear(&s->pipe.obuf);
}
io_clear(&s->pipe.iev);
iobuf_clear(&s->pipe.ibuf);
+ filter_trigger_eom(s);
}
static void
@@ -562,39 +557,43 @@ filter_io_out(struct io *io, int evt)
{
struct filter_session *s = io->arg;
- log_debug("debug: filter-api:%s: filter_io_out(%p, %s)",
- filter_name, s, io_strevent(evt));
+ log_debug("debug: filter-api:%s %016"PRIx64" filter_io_out(%s)",
+ filter_name, s->id, io_strevent(evt));
switch (evt) {
case IO_TIMEOUT:
case IO_DISCONNECTED:
case IO_ERROR:
- log_debug("debug: filter-api:%s: io error on output pipe",
- filter_name);
+ log_debug("debug: filter-api:%s %016"PRIx64" io error on output pipe",
+ filter_name, s->id);
s->pipe.error = 1;
- io_clear(&s->pipe.oev);
- iobuf_clear(&s->pipe.obuf);
- if (s->pipe.iev.sock != -1) {
- io_clear(&s->pipe.iev);
- iobuf_clear(&s->pipe.ibuf);
- }
break;
case IO_LOWAT:
/* flow control */
- if (s->pipe.iev.sock != -1 && s->pipe.iev.flags & IO_PAUSE_IN)
+ if (s->pipe.iev.sock != -1 && s->pipe.iev.flags & IO_PAUSE_IN) {
io_resume(&s->pipe.iev, IO_PAUSE_IN);
-
- /* if the input is done and there is a response, send it */
- if (s->pipe.iev.sock == -1 && s->response.ready) {
- io_clear(&s->pipe.oev);
- iobuf_clear(&s->pipe.obuf);
- filter_send_response(s);
+ return;
}
- break;
+
+ /* if the input is done and there is a response we are done */
+ if (s->pipe.iev.sock == -1 && s->response.ready)
+ break;
+
+ /* just wait for more data to send */
+ return;
+
default:
fatalx("filter_io_out()");
}
+
+ io_clear(&s->pipe.oev);
+ iobuf_clear(&s->pipe.obuf);
+ if (s->pipe.error) {
+ io_clear(&s->pipe.iev);
+ iobuf_clear(&s->pipe.ibuf);
+ }
+ filter_trigger_eom(s);
}
#define CASE(x) case x : return #x
@@ -606,12 +605,10 @@ filterimsg_to_str(int imsg)
CASE(IMSG_FILTER_REGISTER);
CASE(IMSG_FILTER_EVENT);
CASE(IMSG_FILTER_QUERY);
- CASE(IMSG_FILTER_PIPE_SETUP);
- CASE(IMSG_FILTER_PIPE_ABORT);
- CASE(IMSG_FILTER_NOTIFY);
+ CASE(IMSG_FILTER_PIPE);
CASE(IMSG_FILTER_RESPONSE);
default:
- return "IMSG_FILTER_???";
+ return ("IMSG_FILTER_???");
}
}
@@ -631,7 +628,7 @@ hook_to_str(int hook)
CASE(HOOK_ROLLBACK);
CASE(HOOK_DATALINE);
default:
- return "HOOK_???";
+ return ("HOOK_???");
}
}
@@ -647,7 +644,7 @@ query_to_str(int query)
CASE(QUERY_EOM);
CASE(QUERY_DATALINE);
default:
- return "QUERY_???";
+ return ("QUERY_???");
}
}
@@ -661,7 +658,7 @@ event_to_str(int event)
CASE(EVENT_COMMIT);
CASE(EVENT_ROLLBACK);
default:
- return "EVENT_???";
+ return ("EVENT_???");
}
}
@@ -675,8 +672,8 @@ const char *
proc_name(enum smtp_proc_type proc)
{
if (proc == PROC_FILTER)
- return filter_name;
- return "filter";
+ return (filter_name);
+ return ("filter");
}
const char *
@@ -700,11 +697,11 @@ filter_api_setugid(uid_t uid, gid_t gid)
filter_api_init();
if (! uid) {
- log_warn("warn: filter-api:%s: can't set uid 0", filter_name);
+ log_warn("warn: filter-api:%s can't set uid 0", filter_name);
fatalx("filter-api: exiting");
}
if (! gid) {
- log_warn("warn: filter-api:%s: can't set gid 0", filter_name);
+ log_warn("warn: filter-api:%s can't set gid 0", filter_name);
fatalx("filter-api: exiting");
}
fi.uid = uid;
@@ -744,7 +741,7 @@ filter_api_init(void)
pw = getpwnam(SMTPD_USER);
if (pw == NULL) {
- log_warn("warn: filter-api:%s: getpwnam", filter_name);
+ log_warn("warn: filter-api:%s getpwnam", filter_name);
fatalx("filter-api: exiting");
}
@@ -824,7 +821,7 @@ filter_api_on_dataline(void(*cb)(uint64_t, const char *))
}
void
-filter_api_on_eom(int(*cb)(uint64_t))
+filter_api_on_eom(int(*cb)(uint64_t, size_t))
{
filter_api_init();
@@ -872,7 +869,7 @@ void
filter_api_loop(void)
{
if (register_done) {
- log_warnx("warn: filter-api:%s: filter_api_loop() already called", filter_name);
+ log_warnx("warn: filter-api:%s filter_api_loop() already called", filter_name);
fatalx("filter-api: exiting");
}
@@ -884,11 +881,11 @@ filter_api_loop(void)
if (fi.rootpath) {
if (chroot(fi.rootpath) == -1) {
- log_warn("warn: filter-api:%s: chroot", filter_name);
+ log_warn("warn: filter-api:%s chroot", filter_name);
fatalx("filter-api: exiting");
}
if (chdir("/") == -1) {
- log_warn("warn: filter-api:%s: chdir", filter_name);
+ log_warn("warn: filter-api:%s chdir", filter_name);
fatalx("filter-api: exiting");
}
}
@@ -896,12 +893,12 @@ filter_api_loop(void)
if (setgroups(1, &fi.gid) ||
setresgid(fi.gid, fi.gid, fi.gid) ||
setresuid(fi.uid, fi.uid, fi.uid)) {
- log_warn("warn: filter-api:%s: cannot drop privileges", filter_name);
+ log_warn("warn: filter-api:%s cannot drop privileges", filter_name);
fatalx("filter-api: exiting");
}
if (event_dispatch() < 0) {
- log_warn("warn: filter-api:%s: event_dispatch", filter_name);
+ log_warn("warn: filter-api:%s event_dispatch", filter_name);
fatalx("filter-api: exiting");
}
}
@@ -911,9 +908,12 @@ filter_api_accept(uint64_t id)
{
struct filter_session *s;
+ log_debug("debug: filter-api:%s %016"PRIx64" filter_api_accept()", filter_name, id);
+
s = tree_xget(&sessions, id);
filter_response(s, FILTER_OK, 0, NULL);
- return 1;
+
+ return (1);
}
int
@@ -921,6 +921,9 @@ filter_api_reject(uint64_t id, enum filter_status status)
{
struct filter_session *s;
+ log_debug("debug: filter-api:%s %016"PRIx64" filter_api_reject(%d)",
+ filter_name, id, status);
+
s = tree_xget(&sessions, id);
/* This is NOT an acceptable status for a failure */
@@ -928,7 +931,8 @@ filter_api_reject(uint64_t id, enum filter_status status)
status = FILTER_FAIL;
filter_response(s, status, 0, NULL);
- return 1;
+
+ return (1);
}
int
@@ -937,6 +941,9 @@ filter_api_reject_code(uint64_t id, enum filter_status status, uint32_t code,
{
struct filter_session *s;
+ log_debug("debug: filter-api:%s %016"PRIx64" filter_api_reject_code(%d, %u, %s)",
+ filter_name, id, status, code, line);
+
s = tree_xget(&sessions, id);
/* This is NOT an acceptable status for a failure */
@@ -944,7 +951,8 @@ filter_api_reject_code(uint64_t id, enum filter_status status, uint32_t code,
status = FILTER_FAIL;
filter_response(s, status, code, line);
- return 1;
+
+ return (1);
}
void
@@ -952,6 +960,8 @@ filter_api_writeln(uint64_t id, const char *line)
{
struct filter_session *s;
+ log_debug("debug: filter-api:%s %016"PRIx64" filter_api_writeln(%s)", filter_name, id, line);
+
s = tree_xget(&sessions, id);
if (s->pipe.oev.sock == -1) {
@@ -984,7 +994,7 @@ filter_api_mailaddr_to_text(const struct mailaddr *maddr)
strlcpy(buffer, maddr->user, sizeof buffer);
strlcat(buffer, "@", sizeof buffer);
if (strlcat(buffer, maddr->domain, sizeof buffer) >= sizeof buffer)
- return NULL;
+ return (NULL);
- return buffer;
+ return (buffer);
}
diff --git a/smtpd/filters/filter_monkey.c b/smtpd/filters/filter_monkey.c
index 673b6c98..9a7fa2b4 100644
--- a/smtpd/filters/filter_monkey.c
+++ b/smtpd/filters/filter_monkey.c
@@ -75,7 +75,7 @@ on_data(uint64_t id)
}
static int
-on_eom(uint64_t id)
+on_eom(uint64_t id, size_t size)
{
return monkey(id);
}
diff --git a/smtpd/filters/filter_perl.c b/smtpd/filters/filter_perl.c
index 52beb214..58d57e5c 100644
--- a/smtpd/filters/filter_perl.c
+++ b/smtpd/filters/filter_perl.c
@@ -170,7 +170,7 @@ on_data(uint64_t id)
}
static int
-on_eom(uint64_t id)
+on_eom(uint64_t id, size_t size)
{
call_sub_sv((SV *)pl_on_eom, "%i", id);
return filter_api_accept(id);
diff --git a/smtpd/filters/filter_python.c b/smtpd/filters/filter_python.c
index 72034ca1..f40f62bb 100644
--- a/smtpd/filters/filter_python.c
+++ b/smtpd/filters/filter_python.c
@@ -144,7 +144,7 @@ on_connect(uint64_t id, struct filter_connect *conn)
exit(1);
}
- return 1;
+ return (1);
}
static int
@@ -196,7 +196,7 @@ on_mail(uint64_t id, struct mailaddr *mail)
exit(1);
}
- return 1;
+ return (1);
}
static int
@@ -223,7 +223,7 @@ on_rcpt(uint64_t id, struct mailaddr *rcpt)
exit(1);
}
- return 1;
+ return (1);
}
static int
@@ -245,20 +245,22 @@ on_data(uint64_t id)
exit(1);
}
- log_warnx("warn: filter-python: GOT DATA");
- return 1;
+ return (1);
}
static int
-on_eom(uint64_t id)
+on_eom(uint64_t id, size_t sz)
{
PyObject *py_args;
PyObject *py_ret;
PyObject *py_id;
+ PyObject *py_sz;
- py_args = PyTuple_New(1);
+ py_args = PyTuple_New(2);
py_id = PyLong_FromUnsignedLongLong(id);
+ py_sz = PyLong_FromSize_t(sz);
PyTuple_SetItem(py_args, 0, py_id);
+ PyTuple_SetItem(py_args, 1, py_sz);
py_ret = PyObject_CallObject(py_on_eom, py_args);
Py_DECREF(py_args);
@@ -268,8 +270,7 @@ on_eom(uint64_t id)
exit(1);
}
- log_warnx("warn: filter-python: GOT EOM");
- return 1;
+ return (1);
}
static void
@@ -290,9 +291,6 @@ on_commit(uint64_t id)
log_warnx("warn: filter-python: call to on_commit handler failed");
exit(1);
}
-
- log_warnx("warn: filter-python: GOT COMMIT");
- return;
}
static void
@@ -313,9 +311,6 @@ on_rollback(uint64_t id)
log_warnx("warn: filter-python: call to on_rollback handler failed");
exit(1);
}
-
- log_warnx("warn: filter-python: GOT ROLLBACK");
- return;
}
static void
@@ -336,9 +331,6 @@ on_disconnect(uint64_t id)
log_warnx("warn: filter-python: call to on_disconnect handler failed");
exit(1);
}
-
- log_warnx("warn: filter-python: GOT DISCONNECT");
- return;
}
static void
@@ -366,14 +358,50 @@ on_dataline(uint64_t id, const char *line)
}
}
+static char *
+loadfile(const char * path)
+{
+ FILE *f;
+ off_t oz;
+ size_t sz;
+ char *buf;
+
+ if ((f = fopen(path, "r")) == NULL)
+ err(1, "fopen");
+
+ if (fseek(f, 0, SEEK_END) == -1)
+ err(1, "fseek");
+
+ oz = ftello(f);
+
+ if (fseek(f, 0, SEEK_SET) == -1)
+ err(1, "fseek");
+
+ if (oz >= SIZE_MAX)
+ errx(1, "too big");
+
+ sz = oz;
+
+ if ((buf = malloc(sz + 1)) == NULL)
+ err(1, "malloc");
+
+ if (fread(buf, 1, sz, f) != sz)
+ err(1, "fread");
+
+ buf[sz] = '\0';
+
+ fclose(f);
+
+ return (buf);
+}
+
int
main(int argc, char **argv)
{
- int ch;
- const char *scriptpath = "/tmp/test.py";
- PyObject *name;
- PyObject *self;
- PyObject *module;
+ int ch;
+ char *path;
+ char *buf;
+ PyObject *self, *code, *module;
log_init(-1);
@@ -388,21 +416,32 @@ main(int argc, char **argv)
argc -= optind;
argv += optind;
- setenv("PYTHONPATH", "/tmp", 1);
+ if (argc == 0)
+ errx(1, "missing path");
+ path = argv[0];
+
Py_Initialize();
- self = Py_InitModule("smtpd", py_methods);
+ self = Py_InitModule("filter", py_methods);
PyModule_AddIntConstant(self, "FILTER_OK", FILTER_OK);
PyModule_AddIntConstant(self, "FILTER_FAIL", FILTER_FAIL);
PyModule_AddIntConstant(self, "FILTER_CLOSE", FILTER_CLOSE);
- name = PyString_FromString("test");
- module = PyImport_Import(name);
- Py_DECREF(name);
+ buf = loadfile(path);
+ code = Py_CompileString(buf, path, Py_file_input);
+ free(buf);
+
+ if (code == NULL) {
+ PyErr_Print();
+ log_warnx("warn: filter-python: failed to compile %s", path);
+ return (1);
+ }
+
+ module = PyImport_ExecCodeModuleEx("myfilter", code, path);
if (module == NULL) {
PyErr_Print();
- log_warnx("warn: filter-python: failed to load %s", scriptpath);
- return 1;
+ log_warnx("warn: filter-python: failed to install module %s", path);
+ return (1);
}
log_debug("debug: filter-python: starting...");
diff --git a/smtpd/filters/filter_stub.c b/smtpd/filters/filter_stub.c
index 86bc9a44..2b36aeeb 100644
--- a/smtpd/filters/filter_stub.c
+++ b/smtpd/filters/filter_stub.c
@@ -64,7 +64,7 @@ on_data(uint64_t id)
}
static int
-on_eom(uint64_t id)
+on_eom(uint64_t id, size_t size)
{
log_debug("ON EOM");
return filter_api_accept(id);
diff --git a/smtpd/filters/filter_trace.c b/smtpd/filters/filter_trace.c
index 83c7fcef..9c0dc25b 100644
--- a/smtpd/filters/filter_trace.c
+++ b/smtpd/filters/filter_trace.c
@@ -99,9 +99,9 @@ on_data(uint64_t id)
}
static int
-on_eom(uint64_t id)
+on_eom(uint64_t id, size_t size)
{
- printf("filter-trace: EOM id=%016"PRIx64, id);
+ printf("filter-trace: EOM id=%016"PRIx64", size=%zu", id, size);
return filter_api_accept(id);
}
diff --git a/smtpd/smtp_session.c b/smtpd/smtp_session.c
index 41ad919d..40c64798 100644
--- a/smtpd/smtp_session.c
+++ b/smtpd/smtp_session.c
@@ -145,9 +145,9 @@ struct smtp_session {
size_t rcptfail;
TAILQ_HEAD(, smtp_rcpt) rcpts;
- size_t datalen;
- struct iobuf dataiobuf;
- struct io dataio;
+ size_t odatalen;
+ struct iobuf obuf;
+ struct io oev;
int dataeom;
struct event pause;
@@ -494,7 +494,7 @@ smtp_session_imsg(struct mproc *p, struct imsg *imsg)
rcpt->maddr.user,
rcpt->maddr.user[0] == '\0' ? "" : "@",
rcpt->maddr.domain,
- s->datalen,
+ s->odatalen,
rcpt->destcount,
s->flags & SF_EHLO ? "ESMTP" : "SMTP");
}
@@ -763,17 +763,17 @@ smtp_filter_fd(uint64_t id, int fd)
return;
}
- iobuf_init(&s->dataiobuf, 0, 0);
- io_init(&s->dataio, fd, s, smtp_data_io, &s->dataiobuf);
+ iobuf_init(&s->obuf, 0, 0);
+ io_init(&s->oev, fd, s, smtp_data_io, &s->obuf);
- iobuf_fqueue(&s->dataiobuf, "Received: ");
+ iobuf_fqueue(&s->obuf, "Received: ");
if (! (s->listener->flags & F_MASK_SOURCE)) {
- iobuf_fqueue(&s->dataiobuf, "from %s (%s [%s]);\n\t",
+ iobuf_fqueue(&s->obuf, "from %s (%s [%s]);\n\t",
s->evp.helo,
s->hostname,
ss_to_text(&s->ss));
}
- iobuf_fqueue(&s->dataiobuf, "by %s (%s) with %sSMTP%s%s id %08x;\n",
+ iobuf_fqueue(&s->obuf, "by %s (%s) with %sSMTP%s%s id %08x;\n",
s->smtpname,
SMTPD_NAME,
s->flags & SF_EHLO ? "E" : "",
@@ -783,7 +783,7 @@ smtp_filter_fd(uint64_t id, int fd)
if (s->flags & SF_SECURE) {
x = SSL_get_peer_certificate(s->io.ssl);
- iobuf_fqueue(&s->dataiobuf,
+ iobuf_fqueue(&s->obuf,
"\tTLS version=%s cipher=%s bits=%d verify=%s;\n",
SSL_get_cipher_version(s->io.ssl),
SSL_get_cipher_name(s->io.ssl),
@@ -794,20 +794,20 @@ smtp_filter_fd(uint64_t id, int fd)
}
if (s->rcptcount == 1) {
- iobuf_fqueue(&s->dataiobuf, "\tfor <%s@%s>;\n",
+ iobuf_fqueue(&s->obuf, "\tfor <%s@%s>;\n",
s->evp.rcpt.user,
s->evp.rcpt.domain);
}
- iobuf_fqueue(&s->dataiobuf, "\t%s\n", time_to_text(time(NULL)));
+ iobuf_fqueue(&s->obuf, "\t%s\n", time_to_text(time(NULL)));
/*
* XXX This is not exactly fair, since this is not really
* user data.
*/
- s->datalen = iobuf_queued(&s->dataiobuf);
+ s->odatalen = iobuf_queued(&s->obuf);
- io_set_write(&s->dataio);
+ io_set_write(&s->oev);
smtp_enter_state(s, STATE_BODY);
smtp_reply(s, "354 Enter mail, end with \".\""
@@ -929,7 +929,7 @@ smtp_io(struct io *io, int evt)
io_set_write(io);
s->dataeom = 1;
- if (iobuf_queued(&s->dataiobuf) == 0)
+ if (iobuf_queued(&s->obuf) == 0)
smtp_data_io_done(s);
return;
}
@@ -1004,8 +1004,8 @@ smtp_data_io(struct io *io, int evt)
case IO_DISCONNECTED:
case IO_ERROR:
log_debug("debug: smtp: %p: io error on mfa", s);
- io_clear(&s->dataio);
- iobuf_clear(&s->dataiobuf);
+ io_clear(&s->oev);
+ iobuf_clear(&s->obuf);
s->msgflags |= MF_ERROR_IO;
if (s->io.flags & IO_PAUSE_IN) {
log_debug("debug: smtp: %p: resuming session after mfa error", s);
@@ -1014,7 +1014,7 @@ smtp_data_io(struct io *io, int evt)
break;
case IO_LOWAT:
- if (s->dataeom && iobuf_queued(&s->dataiobuf) == 0) {
+ if (s->dataeom && iobuf_queued(&s->obuf) == 0) {
smtp_data_io_done(s);
} else if (s->io.flags & IO_PAUSE_IN) {
log_debug("debug: smtp: %p: filter congestion over: resuming session", s);
@@ -1030,9 +1030,9 @@ smtp_data_io(struct io *io, int evt)
static void
smtp_data_io_done(struct smtp_session *s)
{
- log_debug("debug: smtp: %p: data io done (%zu bytes)", s, s->datalen);
- io_clear(&s->dataio);
- iobuf_clear(&s->dataiobuf);
+ log_debug("debug: smtp: %p: data io done (%zu bytes)", s, s->odatalen);
+ io_clear(&s->oev);
+ iobuf_clear(&s->obuf);
if (s->msgflags & MF_ERROR) {
@@ -1698,7 +1698,7 @@ smtp_message_reset(struct smtp_session *s, int prepare)
s->msgflags = 0;
s->destcount = 0;
s->rcptcount = 0;
- s->datalen = 0;
+ s->odatalen = 0;
if (prepare) {
s->evp.ss = s->ss;
@@ -2006,7 +2006,7 @@ static void
smtp_filter_eom(struct smtp_session *s)
{
tree_xset(&wait_filter, s->id, s);
- filter_eom(s->id, QUERY_EOM, s->datalen);
+ filter_eom(s->id, QUERY_EOM, s->odatalen);
}
static void
@@ -2046,19 +2046,19 @@ smtp_filter_dataline(struct smtp_session *s, const char *line)
if (s->msgflags & MF_ERROR)
return;
- n = iobuf_fqueue(&s->dataiobuf, "%s\n", line);
+ n = iobuf_fqueue(&s->obuf, "%s\n", line);
if (n == -1) {
/* XXX */
fatalx("iobuf_fqueue");
}
- s->datalen += strlen(line) +1;
+ s->odatalen += strlen(line) +1;
- if (iobuf_queued(&s->dataiobuf) > DATA_HIWAT && !(s->io.flags & IO_PAUSE_IN)) {
+ if (iobuf_queued(&s->obuf) > DATA_HIWAT && !(s->io.flags & IO_PAUSE_IN)) {
log_debug("debug: smtp: %p: filter congestion over: pausing session", s);
io_pause(&s->io, IO_PAUSE_IN);
}
- io_reload(&s->dataio);
+ io_reload(&s->oev);
}
#define CASE(x) case x : return #x
diff --git a/smtpd/smtpd-api.h b/smtpd/smtpd-api.h
index 98e83570..e8cf5771 100644
--- a/smtpd/smtpd-api.h
+++ b/smtpd/smtpd-api.h
@@ -57,9 +57,7 @@ enum filter_imsg {
IMSG_FILTER_REGISTER,
IMSG_FILTER_EVENT,
IMSG_FILTER_QUERY,
- IMSG_FILTER_PIPE_SETUP,
- IMSG_FILTER_PIPE_ABORT,
- IMSG_FILTER_NOTIFY,
+ IMSG_FILTER_PIPE,
IMSG_FILTER_RESPONSE
};
@@ -361,7 +359,7 @@ void filter_api_on_mail(int(*)(uint64_t, struct mailaddr *));
void filter_api_on_rcpt(int(*)(uint64_t, struct mailaddr *));
void filter_api_on_data(int(*)(uint64_t));
void filter_api_on_dataline(void(*)(uint64_t, const char *));
-void filter_api_on_eom(int(*)(uint64_t));
+void filter_api_on_eom(int(*)(uint64_t, size_t));
/* queue */
void queue_api_on_message_create(int(*)(uint32_t *));