aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgilles <gilles@poolp.org>2016-06-24 11:26:52 +0200
committergilles <gilles@poolp.org>2016-06-24 11:26:52 +0200
commit75aa2f3c93f78b6e7700ed84d63e1bbc360fab8e (patch)
treeadb58b9274066e21ffff50e72a22d8b78952c9d7
parentadd transaction allocator/destructor interface (diff)
downloadOpenSMTPD-extras-75aa2f3c93f78b6e7700ed84d63e1bbc360fab8e.tar.xz
OpenSMTPD-extras-75aa2f3c93f78b6e7700ed84d63e1bbc360fab8e.zip
completely separate session and transaction data
-rw-r--r--extras/filters/filter-rspamd/filter_rspamd.c62
-rw-r--r--extras/filters/filter-rspamd/rspamd.c301
-rw-r--r--extras/filters/filter-rspamd/rspamd.h56
3 files changed, 213 insertions, 206 deletions
diff --git a/extras/filters/filter-rspamd/filter_rspamd.c b/extras/filters/filter-rspamd/filter_rspamd.c
index dce3014..b43ec0e 100644
--- a/extras/filters/filter-rspamd/filter_rspamd.c
+++ b/extras/filters/filter-rspamd/filter_rspamd.c
@@ -61,11 +61,11 @@ on_helo(uint64_t id, const char *helo)
static int
on_mail(uint64_t id, struct mailaddr *mail)
{
- struct session *rs = filter_api_session(id);
- const char *address;
+ struct transaction *tx = filter_api_transaction(id);
+ const char *address;
address = filter_api_mailaddr_to_text(mail);
- rs->tx.from = xstrdup(address, "on_mail");
+ tx->from = xstrdup(address, "on_mail");
return filter_api_accept(id);
}
@@ -73,11 +73,11 @@ on_mail(uint64_t id, struct mailaddr *mail)
static int
on_rcpt(uint64_t id, struct mailaddr *rcpt)
{
- struct session *rs = filter_api_session(id);
- const char *address;
+ struct transaction *tx = filter_api_transaction(id);
+ const char *address;
address = filter_api_mailaddr_to_text(rcpt);
- rs->tx.rcpt = xstrdup(address, "on_rcpt");
+ tx->rcpt = xstrdup(address, "on_rcpt");
return filter_api_accept(id);
}
@@ -85,14 +85,14 @@ on_rcpt(uint64_t id, struct mailaddr *rcpt)
static int
on_data(uint64_t id)
{
- struct session *rs = filter_api_session(id);
+ struct transaction *tx = filter_api_transaction(id);
- if (! rspamd_buffer(rs))
- return filter_api_reject_code(rs->id, FILTER_FAIL, 421,
+ if (! rspamd_buffer(tx))
+ return filter_api_reject_code(id, FILTER_FAIL, 421,
"temporary failure");
- if (! rspamd_connect(rs))
- return filter_api_reject_code(rs->id, FILTER_FAIL, 421,
+ if (! rspamd_connect(tx))
+ return filter_api_reject_code(id, FILTER_FAIL, 421,
"temporary failure");
return 1;
@@ -101,45 +101,25 @@ on_data(uint64_t id)
static void
on_dataline(uint64_t id, const char *line)
{
- struct session *rs = filter_api_session(id);
- ssize_t sz;
+ struct transaction *tx = filter_api_transaction(id);
+ ssize_t sz;
- sz = fprintf(rs->tx.fp, "%s\n", line);
+ sz = fprintf(tx->fp, "%s\n", line);
if (sz == -1 || sz < (ssize_t)strlen(line) + 1)
- rs->tx.error = 1;
+ tx->error = 1;
- rspamd_send_chunk(rs, line);
+ rspamd_send_chunk(tx, line);
}
static int
on_eom(uint64_t id, size_t size)
{
- struct session *rs = filter_api_session(id);
+ struct transaction *tx = filter_api_transaction(id);
- rspamd_send_chunk(rs, NULL);
+ rspamd_send_chunk(tx, NULL);
return 1;
}
-static void
-on_commit(uint64_t id)
-{
- struct session *rs = filter_api_session(id);
-
- session_reset(rs);
-}
-
-static void
-on_rollback(uint64_t id)
-{
- struct session *rs = filter_api_session(id);
-
- session_reset(rs);
-}
-
-static void
-on_disconnect(uint64_t id)
-{
-}
int
main(int argc, char **argv)
@@ -206,13 +186,13 @@ main(int argc, char **argv)
filter_api_on_data(on_data);
filter_api_on_dataline(on_dataline);
filter_api_on_eom(on_eom);
- filter_api_on_commit(on_commit);
- filter_api_on_rollback(on_rollback);
- filter_api_on_disconnect(on_disconnect);
filter_api_session_allocator(session_allocator);
filter_api_session_destructor(session_destructor);
+ filter_api_transaction_allocator(transaction_allocator);
+ filter_api_transaction_destructor(transaction_destructor);
+
/*
if (c)
filter_api_set_chroot(c);
diff --git a/extras/filters/filter-rspamd/rspamd.c b/extras/filters/filter-rspamd/rspamd.c
index ff2069a..78065b3 100644
--- a/extras/filters/filter-rspamd/rspamd.c
+++ b/extras/filters/filter-rspamd/rspamd.c
@@ -38,6 +38,63 @@ struct sockaddr_storage ss;
static void datahold_stream(uint64_t, FILE *, void *);
+
+void *
+session_allocator(uint64_t id)
+{
+ struct session *rs;
+
+ return xcalloc(1, sizeof (struct session), "on_connect");
+}
+
+void
+session_destructor(void *ctx)
+{
+ struct session *rs = ctx;
+
+ free(rs->ip);
+ free(rs->hostname);
+ free(rs->helo);
+ free(rs);
+}
+
+void *
+transaction_allocator(uint64_t id)
+{
+ struct transaction *tx;
+
+ tx = xcalloc(1, sizeof *tx, "transaction_allocator");
+ tx->id = id;
+
+ return tx;
+}
+
+void
+transaction_destructor(void *ctx)
+{
+ struct transaction *tx = ctx;
+
+ iobuf_clear(&tx->iobuf);
+ io_clear(&tx->io);
+
+ filter_api_datahold_close(tx->id);
+
+ if (tx->from)
+ free(tx->from);
+ if (tx->rcpt)
+ free(tx->rcpt);
+
+ tx->eom = 0;
+ tx->error = 0;
+ tx->from = NULL;
+ tx->rcpt = NULL;
+
+}
+
+
+
+
+
/* XXX
* this needs to be handled differently, but lets focus on the filter for now
*/
@@ -72,7 +129,7 @@ rspamd_resolve(const char *h, const char *p)
static void
headers_callback(const struct rfc2822_header *hdr, void *arg)
{
- struct session *rs = arg;
+ struct transaction *tx = arg;
struct rfc2822_line *l;
char buffer[4096];
int i = 0;
@@ -81,104 +138,67 @@ headers_callback(const struct rfc2822_header *hdr, void *arg)
TAILQ_FOREACH(l, &hdr->lines, next) {
if (i++ == 0) {
snprintf(buffer, sizeof buffer, "%s:%s", hdr->name, l->buffer);
- filter_api_writeln(rs->id, buffer);
+ filter_api_writeln(tx->id, buffer);
continue;
}
- filter_api_writeln(rs->id, l->buffer);
+ filter_api_writeln(tx->id, l->buffer);
}
}
static void
dataline_callback(const char *line, void *arg)
{
- struct session *rs = arg;
-
- log_debug("debug: STREAM BACK: [%s]", rs->tx.line);
- filter_api_writeln(rs->id, rs->tx.line);
-}
-
-
-struct session *
-session_allocator(uint64_t id)
-{
- struct session *rs;
-
- rs = xcalloc(1, sizeof *rs, "on_connect");
- rs->id = id;
- return rs;
-}
-
-void
-session_reset(struct session *rs)
-{
- iobuf_clear(&rs->iobuf);
- io_clear(&rs->io);
+ struct transaction *tx = arg;
- filter_api_datahold_close(rs->id);
-
- free(rs->tx.from);
- free(rs->tx.rcpt);
-
- rs->tx.eom = 0;
- rs->tx.error = 0;
- rs->tx.from = NULL;
- rs->tx.rcpt = NULL;
+ log_debug("debug: STREAM BACK: [%s]", tx->line);
+ filter_api_writeln(tx->id, tx->line);
}
-void
-session_destructor(struct session *rs)
-{
- session_reset(rs);
-
- free(rs->ip);
- free(rs->hostname);
- free(rs->helo);
- free(rs);
-}
int
-rspamd_buffer(struct session *rs)
+rspamd_buffer(struct transaction *tx)
{
- rs->tx.fp = filter_api_datahold_open(rs->id, datahold_stream, rs);
- if (rs->tx.fp == NULL)
+ tx->fp = filter_api_datahold_open(tx->id, datahold_stream, tx);
+ if (tx->fp == NULL)
return 0;
return 1;
}
int
-rspamd_connect(struct session *rs)
+rspamd_connect(struct transaction *tx)
{
- iobuf_xinit(&rs->iobuf, LINE_MAX, LINE_MAX, "on_eom");
- io_init(&rs->io, -1, rs, rspamd_io, &rs->iobuf);
- if (io_connect(&rs->io, (struct sockaddr *)&ss, NULL) == -1)
+ iobuf_xinit(&tx->iobuf, LINE_MAX, LINE_MAX, "on_eom");
+ io_init(&tx->io, -1, tx, rspamd_io, &tx->iobuf);
+ if (io_connect(&tx->io, (struct sockaddr *)&ss, NULL) == -1)
return 0;
return 1;
}
void
-rspamd_disconnect(struct session *rs)
+rspamd_disconnect(struct transaction *tx)
{
- iobuf_clear(&rs->iobuf);
- io_clear(&rs->io);
+ iobuf_clear(&tx->iobuf);
+ io_clear(&tx->io);
}
void
-rspamd_connected(struct session *rs)
+rspamd_connected(struct transaction *tx)
{
- filter_api_accept(rs->id);
+ filter_api_accept(tx->id);
}
void
-rspamd_error(struct session *rs)
+rspamd_error(struct transaction *tx)
{
- filter_api_reject_code(rs->id, FILTER_FAIL, 421, "temporary failure");
- session_reset(rs);
+ filter_api_reject_code(tx->id, FILTER_FAIL, 421, "temporary failure");
}
void
-rspamd_send_query(struct session *rs)
+rspamd_send_query(struct transaction *tx)
{
- iobuf_xfqueue(&rs->iobuf, "io",
+ struct session *rs = filter_api_session(tx->id);
+
+ iobuf_xfqueue(&tx->iobuf, "io",
"POST /check HTTP/1.0\r\n"
"Transfer-Encoding: chunked\r\n"
"Pass: all\r\n"
@@ -191,46 +211,46 @@ rspamd_send_query(struct session *rs)
rs->ip,
rs->helo,
rs->hostname,
- rs->tx.from,
- rs->tx.rcpt);
- io_reload(&rs->io);
+ tx->from,
+ tx->rcpt);
+ io_reload(&tx->io);
}
void
-rspamd_send_chunk(struct session *rs, const char *line)
+rspamd_send_chunk(struct transaction *tx, const char *line)
{
if (line)
- iobuf_xfqueue(&rs->iobuf, "io", "%x\r\n%s\r\n\r\n",
+ iobuf_xfqueue(&tx->iobuf, "io", "%x\r\n%s\r\n\r\n",
strlen(line)+2, line);
else {
- iobuf_xfqueue(&rs->iobuf, "io", "0\r\n\r\n");
- rs->tx.eom = 1;
+ iobuf_xfqueue(&tx->iobuf, "io", "0\r\n\r\n");
+ tx->eom = 1;
}
- io_reload(&rs->io);
+ io_reload(&tx->io);
}
void
-rspamd_read_response(struct session *rs)
+rspamd_read_response(struct transaction *tx)
{
char *line;
- while ((line = iobuf_getline(&rs->iobuf, NULL)))
+ while ((line = iobuf_getline(&tx->iobuf, NULL)))
if (strlen(line) == 0)
- rs->rspamd.eoh = 1;
+ tx->rspamd.eoh = 1;
- if (rs->rspamd.eoh) {
- if (iobuf_len(&rs->iobuf) != 0) {
- rs->rspamd.body = xmemdup(iobuf_data(&rs->iobuf),
- iobuf_len(&rs->iobuf) + 1, "rspamd_read_response");
- rs->rspamd.body[iobuf_len(&rs->iobuf)] = 0;
+ if (tx->rspamd.eoh) {
+ if (iobuf_len(&tx->iobuf) != 0) {
+ tx->rspamd.body = xmemdup(iobuf_data(&tx->iobuf),
+ iobuf_len(&tx->iobuf) + 1, "rspamd_read_response");
+ tx->rspamd.body[iobuf_len(&tx->iobuf)] = 0;
}
}
- iobuf_normalize(&rs->iobuf);
+ iobuf_normalize(&tx->iobuf);
}
int
-rspamd_parse_response(struct session *rs)
+rspamd_parse_response(struct transaction *tx)
{
json_value *jv;
json_value *def = NULL;
@@ -238,7 +258,7 @@ rspamd_parse_response(struct session *rs)
json_value *val;
size_t i;
- jv = json_parse(rs->rspamd.body, strlen(rs->rspamd.body));
+ jv = json_parse(tx->rspamd.body, strlen(tx->rspamd.body));
if (jv == NULL || jv->type != json_object)
goto fail;
@@ -257,25 +277,25 @@ rspamd_parse_response(struct session *rs)
val = def->u.object.values[i].value;
if (val->type != json_boolean)
goto fail;
- rs->rspamd.is_spam = val->u.boolean;
+ tx->rspamd.is_spam = val->u.boolean;
}
else if (strcmp(name, "is_skipped") == 0) {
val = def->u.object.values[i].value;
if (val->type != json_boolean)
goto fail;
- rs->rspamd.is_skipped = val->u.boolean;
+ tx->rspamd.is_skipped = val->u.boolean;
}
else if (strcmp(name, "score") == 0) {
val = def->u.object.values[i].value;
if (val->type != json_double)
goto fail;
- rs->rspamd.score = val->u.dbl;
+ tx->rspamd.score = val->u.dbl;
}
else if (strcmp(name, "required_score") == 0) {
val = def->u.object.values[i].value;
if (val->type != json_double)
goto fail;
- rs->rspamd.required_score = val->u.dbl;
+ tx->rspamd.required_score = val->u.dbl;
}
else if (strcmp(name, "action") == 0) {
val = def->u.object.values[i].value;
@@ -284,28 +304,28 @@ rspamd_parse_response(struct session *rs)
log_debug("[%.*s]", val->u.string.length, val->u.string.ptr);
if (strncmp(val->u.string.ptr, "no action",
val->u.string.length) == 0)
- rs->rspamd.action = NO_ACTION;
+ tx->rspamd.action = NO_ACTION;
else if (strncmp(val->u.string.ptr, "greylist",
val->u.string.length) == 0)
- rs->rspamd.action = GREYLIST;
+ tx->rspamd.action = GREYLIST;
else if (strncmp(val->u.string.ptr, "add header",
val->u.string.length) == 0)
- rs->rspamd.action = ADD_HEADER;
+ tx->rspamd.action = ADD_HEADER;
else if (strncmp(val->u.string.ptr, "rewrite subject",
val->u.string.length) == 0)
- rs->rspamd.action = REWRITE_SUBJECT;
+ tx->rspamd.action = REWRITE_SUBJECT;
else if (strncmp(val->u.string.ptr, "soft reject",
val->u.string.length) == 0)
- rs->rspamd.action = SOFT_REJECT;
+ tx->rspamd.action = SOFT_REJECT;
else if (strncmp(val->u.string.ptr, "reject",
val->u.string.length) == 0)
- rs->rspamd.action = REJECT;
+ tx->rspamd.action = REJECT;
}
else if (strcmp(name, "subject") == 0) {
val = def->u.object.values[i].value;
if (val->type != json_string)
goto fail;
- rs->rspamd.subject = xmemdup(val->u.string.ptr,
+ tx->rspamd.subject = xmemdup(val->u.string.ptr,
val->u.string.length, "rspamd_parse_result");
}
}
@@ -321,58 +341,58 @@ fail:
void
rspamd_spam_header(const char *header, void *arg)
{
- struct session *rs = arg;
+ struct transaction *tx = arg;
char buffer[4096];
snprintf(buffer, sizeof buffer, "X-Spam-Flag: %s",
- rs->rspamd.is_spam ? "Yes" : "No");
- filter_api_writeln(rs->id, buffer);
+ tx->rspamd.is_spam ? "Yes" : "No");
+ filter_api_writeln(tx->id, buffer);
snprintf(buffer, sizeof buffer, "X-Spam-Score: %.2f",
- rs->rspamd.score);
- filter_api_writeln(rs->id, buffer);
+ tx->rspamd.score);
+ filter_api_writeln(tx->id, buffer);
snprintf(buffer, sizeof buffer, "X-Spam-Status: %s, score=%.2f, required=%.2f",
- rs->rspamd.is_spam ? "Yes" : "No",
- rs->rspamd.score,
- rs->rspamd.required_score);
- filter_api_writeln(rs->id, buffer);
+ tx->rspamd.is_spam ? "Yes" : "No",
+ tx->rspamd.score,
+ tx->rspamd.required_score);
+ filter_api_writeln(tx->id, buffer);
}
int
-rspamd_proceed(struct session *rs)
+rspamd_proceed(struct transaction *tx)
{
- rfc2822_parser_init(&rs->tx.rfc2822_parser);
- rfc2822_parser_reset(&rs->tx.rfc2822_parser);
- rfc2822_header_default_callback(&rs->tx.rfc2822_parser,
- headers_callback, rs);
- rfc2822_body_callback(&rs->tx.rfc2822_parser,
- dataline_callback, rs);
-
- switch (rs->rspamd.action) {
+ rfc2822_parser_init(&tx->rfc2822_parser);
+ rfc2822_parser_reset(&tx->rfc2822_parser);
+ rfc2822_header_default_callback(&tx->rfc2822_parser,
+ headers_callback, tx);
+ rfc2822_body_callback(&tx->rfc2822_parser,
+ dataline_callback, tx);
+
+ switch (tx->rspamd.action) {
case NO_ACTION:
return 1;
case SOFT_REJECT:
- filter_api_reject_code(rs->id, FILTER_FAIL, 421,
+ filter_api_reject_code(tx->id, FILTER_FAIL, 421,
"message content rejected");
return 0;
case GREYLIST:
- filter_api_reject_code(rs->id, FILTER_FAIL, 421,
+ filter_api_reject_code(tx->id, FILTER_FAIL, 421,
"greylisted");
return 0;
case REJECT:
- filter_api_reject_code(rs->id, FILTER_FAIL, 550,
+ filter_api_reject_code(tx->id, FILTER_FAIL, 550,
"message content rejected");
return 0;
case ADD_HEADER:
/* insert header */
log_debug("ADDING X-SPAM");
- rfc2822_missing_header_callback(&rs->tx.rfc2822_parser,
- "x-spam", rspamd_spam_header, rs);
+ rfc2822_missing_header_callback(&tx->rfc2822_parser,
+ "x-spam", rspamd_spam_header, tx);
return 1;
case REWRITE_SUBJECT:
@@ -380,60 +400,61 @@ rspamd_proceed(struct session *rs)
return 1;
}
-
-
+ filter_api_reject_code(tx->id, FILTER_FAIL, 550,
+ "server internal error");
+ return 0;
}
void
rspamd_io(struct io *io, int evt)
{
- struct session *rs = io->arg;
-
+ struct transaction *tx = io->arg;
+
switch (evt) {
case IO_CONNECTED:
- rspamd_connected(rs);
- rspamd_send_query(rs);
+ rspamd_connected(tx);
+ rspamd_send_query(tx);
io_set_write(io);
break;
case IO_LOWAT:
/* we've hit EOM and no more data, toggle to read */
- if (rs->tx.eom)
+ if (tx->eom)
io_set_read(io);
break;
case IO_DATAIN:
/* accumulate reply */
- rspamd_read_response(rs);
+ rspamd_read_response(tx);
break;
case IO_DISCONNECTED:
- rspamd_disconnect(rs);
+ rspamd_disconnect(tx);
/* we're done with rspamd, if there was a local error
* during transaction, reject now, else move forward.
*/
- if (rs->tx.error) {
- rspamd_error(rs);
+ if (tx->error) {
+ rspamd_error(tx);
break;
}
/* process rspamd reply and start processing datahold */
- if (! rspamd_parse_response(rs)) {
- rspamd_error(rs);
+ if (! rspamd_parse_response(tx)) {
+ rspamd_error(tx);
break;
}
- if (! rspamd_proceed(rs))
+ if (! rspamd_proceed(tx))
break;
- filter_api_datahold_start(rs->id);
+ filter_api_datahold_start(tx->id);
break;
case IO_TIMEOUT:
case IO_ERROR:
default:
- //rspamd_error(rs);
+ rspamd_error(tx);
break;
}
return;
@@ -442,24 +463,24 @@ rspamd_io(struct io *io, int evt)
static void
datahold_stream(uint64_t id, FILE *fp, void *arg)
{
- struct session *rs = arg;
- size_t sz;
- ssize_t len;
- int ret;
+ struct transaction *tx = arg;
+ size_t sz;
+ ssize_t len;
+ int ret;
errno = 0;
- if ((len = getline(&rs->tx.line, &sz, fp)) == -1) {
+ if ((len = getline(&tx->line, &sz, fp)) == -1) {
if (errno) {
- filter_api_reject_code(rs->id, FILTER_FAIL, 421,
+ filter_api_reject_code(id, FILTER_FAIL, 421,
"temporary failure");
return;
}
- filter_api_accept(rs->id);
+ filter_api_accept(id);
return;
}
- rs->tx.line[strcspn(rs->tx.line, "\n")] = '\0';
- ret = rfc2822_parser_feed(&rs->tx.rfc2822_parser,
- rs->tx.line);
+ tx->line[strcspn(tx->line, "\n")] = '\0';
+ ret = rfc2822_parser_feed(&tx->rfc2822_parser,
+ tx->line);
datahold_stream(id, fp, arg);
}
diff --git a/extras/filters/filter-rspamd/rspamd.h b/extras/filters/filter-rspamd/rspamd.h
index e754386..a6161d1 100644
--- a/extras/filters/filter-rspamd/rspamd.h
+++ b/extras/filters/filter-rspamd/rspamd.h
@@ -20,26 +20,21 @@
#define RSPAMD_PORT "11333"
struct session {
- uint64_t id;
-
- struct iobuf iobuf;
- struct io io;
-
char *ip;
char *hostname;
char *helo;
+};
- struct tx {
- FILE *fp;
- char *line;
- int error;
- int eom;
+struct transaction {
+ uint64_t id;
- char *from;
- char *rcpt;
- struct rfc2822_parser rfc2822_parser;
- } tx;
+ struct iobuf iobuf;
+ struct io io;
+ char *from;
+ char *rcpt;
+ int eom;
+
struct rspamd_response {
int eoh;
char *body;
@@ -59,20 +54,31 @@ struct session {
char *subject;
} rspamd;
+ struct rfc2822_parser rfc2822_parser;
+ int error;
+ char *line;
+ FILE *fp;
};
-struct session *session_allocator(uint64_t);
-void session_destructor(struct session *);
-void session_reset(struct session *);
+void *session_allocator(uint64_t);
+void session_destructor(void *);
+
+void *transaction_allocator(uint64_t);
+void transaction_destructor(void *);
+
+int rspamd_connect(struct transaction *);
+void rspamd_connected(struct transaction *);
+void rspamd_send_query(struct transaction *);
+void rspamd_send_chunk(struct transaction *, const char *);
+void rspamd_read_response(struct transaction *);
+int rspamd_parse_response(struct transaction *);
+int rspamd_buffer(struct transaction *);
+void rspamd_error(struct transaction *);
-int rspamd_buffer(struct session *);
-int rspamd_connect(struct session *);
-void rspamd_connected(struct session *);
-void rspamd_error(struct session *);
void rspamd_io(struct io *, int);
-void rspamd_send_query(struct session *);
-void rspamd_send_chunk(struct session *, const char *);
-void rspamd_read_response(struct session *);
-int rspamd_parse_response(struct session *);
+
+
+
+