diff options
author | gilles <gilles@poolp.org> | 2016-06-24 11:26:52 +0200 |
---|---|---|
committer | gilles <gilles@poolp.org> | 2016-06-24 11:26:52 +0200 |
commit | 75aa2f3c93f78b6e7700ed84d63e1bbc360fab8e (patch) | |
tree | adb58b9274066e21ffff50e72a22d8b78952c9d7 | |
parent | add transaction allocator/destructor interface (diff) | |
download | OpenSMTPD-extras-75aa2f3c93f78b6e7700ed84d63e1bbc360fab8e.tar.xz OpenSMTPD-extras-75aa2f3c93f78b6e7700ed84d63e1bbc360fab8e.zip |
completely separate session and transaction data
-rw-r--r-- | extras/filters/filter-rspamd/filter_rspamd.c | 62 | ||||
-rw-r--r-- | extras/filters/filter-rspamd/rspamd.c | 301 | ||||
-rw-r--r-- | extras/filters/filter-rspamd/rspamd.h | 56 |
3 files changed, 213 insertions, 206 deletions
diff --git a/extras/filters/filter-rspamd/filter_rspamd.c b/extras/filters/filter-rspamd/filter_rspamd.c index dce3014..b43ec0e 100644 --- a/extras/filters/filter-rspamd/filter_rspamd.c +++ b/extras/filters/filter-rspamd/filter_rspamd.c @@ -61,11 +61,11 @@ on_helo(uint64_t id, const char *helo) static int on_mail(uint64_t id, struct mailaddr *mail) { - struct session *rs = filter_api_session(id); - const char *address; + struct transaction *tx = filter_api_transaction(id); + const char *address; address = filter_api_mailaddr_to_text(mail); - rs->tx.from = xstrdup(address, "on_mail"); + tx->from = xstrdup(address, "on_mail"); return filter_api_accept(id); } @@ -73,11 +73,11 @@ on_mail(uint64_t id, struct mailaddr *mail) static int on_rcpt(uint64_t id, struct mailaddr *rcpt) { - struct session *rs = filter_api_session(id); - const char *address; + struct transaction *tx = filter_api_transaction(id); + const char *address; address = filter_api_mailaddr_to_text(rcpt); - rs->tx.rcpt = xstrdup(address, "on_rcpt"); + tx->rcpt = xstrdup(address, "on_rcpt"); return filter_api_accept(id); } @@ -85,14 +85,14 @@ on_rcpt(uint64_t id, struct mailaddr *rcpt) static int on_data(uint64_t id) { - struct session *rs = filter_api_session(id); + struct transaction *tx = filter_api_transaction(id); - if (! rspamd_buffer(rs)) - return filter_api_reject_code(rs->id, FILTER_FAIL, 421, + if (! rspamd_buffer(tx)) + return filter_api_reject_code(id, FILTER_FAIL, 421, "temporary failure"); - if (! rspamd_connect(rs)) - return filter_api_reject_code(rs->id, FILTER_FAIL, 421, + if (! rspamd_connect(tx)) + return filter_api_reject_code(id, FILTER_FAIL, 421, "temporary failure"); return 1; @@ -101,45 +101,25 @@ on_data(uint64_t id) static void on_dataline(uint64_t id, const char *line) { - struct session *rs = filter_api_session(id); - ssize_t sz; + struct transaction *tx = filter_api_transaction(id); + ssize_t sz; - sz = fprintf(rs->tx.fp, "%s\n", line); + sz = fprintf(tx->fp, "%s\n", line); if (sz == -1 || sz < (ssize_t)strlen(line) + 1) - rs->tx.error = 1; + tx->error = 1; - rspamd_send_chunk(rs, line); + rspamd_send_chunk(tx, line); } static int on_eom(uint64_t id, size_t size) { - struct session *rs = filter_api_session(id); + struct transaction *tx = filter_api_transaction(id); - rspamd_send_chunk(rs, NULL); + rspamd_send_chunk(tx, NULL); return 1; } -static void -on_commit(uint64_t id) -{ - struct session *rs = filter_api_session(id); - - session_reset(rs); -} - -static void -on_rollback(uint64_t id) -{ - struct session *rs = filter_api_session(id); - - session_reset(rs); -} - -static void -on_disconnect(uint64_t id) -{ -} int main(int argc, char **argv) @@ -206,13 +186,13 @@ main(int argc, char **argv) filter_api_on_data(on_data); filter_api_on_dataline(on_dataline); filter_api_on_eom(on_eom); - filter_api_on_commit(on_commit); - filter_api_on_rollback(on_rollback); - filter_api_on_disconnect(on_disconnect); filter_api_session_allocator(session_allocator); filter_api_session_destructor(session_destructor); + filter_api_transaction_allocator(transaction_allocator); + filter_api_transaction_destructor(transaction_destructor); + /* if (c) filter_api_set_chroot(c); diff --git a/extras/filters/filter-rspamd/rspamd.c b/extras/filters/filter-rspamd/rspamd.c index ff2069a..78065b3 100644 --- a/extras/filters/filter-rspamd/rspamd.c +++ b/extras/filters/filter-rspamd/rspamd.c @@ -38,6 +38,63 @@ struct sockaddr_storage ss; static void datahold_stream(uint64_t, FILE *, void *); + +void * +session_allocator(uint64_t id) +{ + struct session *rs; + + return xcalloc(1, sizeof (struct session), "on_connect"); +} + +void +session_destructor(void *ctx) +{ + struct session *rs = ctx; + + free(rs->ip); + free(rs->hostname); + free(rs->helo); + free(rs); +} + +void * +transaction_allocator(uint64_t id) +{ + struct transaction *tx; + + tx = xcalloc(1, sizeof *tx, "transaction_allocator"); + tx->id = id; + + return tx; +} + +void +transaction_destructor(void *ctx) +{ + struct transaction *tx = ctx; + + iobuf_clear(&tx->iobuf); + io_clear(&tx->io); + + filter_api_datahold_close(tx->id); + + if (tx->from) + free(tx->from); + if (tx->rcpt) + free(tx->rcpt); + + tx->eom = 0; + tx->error = 0; + tx->from = NULL; + tx->rcpt = NULL; + +} + + + + + /* XXX * this needs to be handled differently, but lets focus on the filter for now */ @@ -72,7 +129,7 @@ rspamd_resolve(const char *h, const char *p) static void headers_callback(const struct rfc2822_header *hdr, void *arg) { - struct session *rs = arg; + struct transaction *tx = arg; struct rfc2822_line *l; char buffer[4096]; int i = 0; @@ -81,104 +138,67 @@ headers_callback(const struct rfc2822_header *hdr, void *arg) TAILQ_FOREACH(l, &hdr->lines, next) { if (i++ == 0) { snprintf(buffer, sizeof buffer, "%s:%s", hdr->name, l->buffer); - filter_api_writeln(rs->id, buffer); + filter_api_writeln(tx->id, buffer); continue; } - filter_api_writeln(rs->id, l->buffer); + filter_api_writeln(tx->id, l->buffer); } } static void dataline_callback(const char *line, void *arg) { - struct session *rs = arg; - - log_debug("debug: STREAM BACK: [%s]", rs->tx.line); - filter_api_writeln(rs->id, rs->tx.line); -} - - -struct session * -session_allocator(uint64_t id) -{ - struct session *rs; - - rs = xcalloc(1, sizeof *rs, "on_connect"); - rs->id = id; - return rs; -} - -void -session_reset(struct session *rs) -{ - iobuf_clear(&rs->iobuf); - io_clear(&rs->io); + struct transaction *tx = arg; - filter_api_datahold_close(rs->id); - - free(rs->tx.from); - free(rs->tx.rcpt); - - rs->tx.eom = 0; - rs->tx.error = 0; - rs->tx.from = NULL; - rs->tx.rcpt = NULL; + log_debug("debug: STREAM BACK: [%s]", tx->line); + filter_api_writeln(tx->id, tx->line); } -void -session_destructor(struct session *rs) -{ - session_reset(rs); - - free(rs->ip); - free(rs->hostname); - free(rs->helo); - free(rs); -} int -rspamd_buffer(struct session *rs) +rspamd_buffer(struct transaction *tx) { - rs->tx.fp = filter_api_datahold_open(rs->id, datahold_stream, rs); - if (rs->tx.fp == NULL) + tx->fp = filter_api_datahold_open(tx->id, datahold_stream, tx); + if (tx->fp == NULL) return 0; return 1; } int -rspamd_connect(struct session *rs) +rspamd_connect(struct transaction *tx) { - iobuf_xinit(&rs->iobuf, LINE_MAX, LINE_MAX, "on_eom"); - io_init(&rs->io, -1, rs, rspamd_io, &rs->iobuf); - if (io_connect(&rs->io, (struct sockaddr *)&ss, NULL) == -1) + iobuf_xinit(&tx->iobuf, LINE_MAX, LINE_MAX, "on_eom"); + io_init(&tx->io, -1, tx, rspamd_io, &tx->iobuf); + if (io_connect(&tx->io, (struct sockaddr *)&ss, NULL) == -1) return 0; return 1; } void -rspamd_disconnect(struct session *rs) +rspamd_disconnect(struct transaction *tx) { - iobuf_clear(&rs->iobuf); - io_clear(&rs->io); + iobuf_clear(&tx->iobuf); + io_clear(&tx->io); } void -rspamd_connected(struct session *rs) +rspamd_connected(struct transaction *tx) { - filter_api_accept(rs->id); + filter_api_accept(tx->id); } void -rspamd_error(struct session *rs) +rspamd_error(struct transaction *tx) { - filter_api_reject_code(rs->id, FILTER_FAIL, 421, "temporary failure"); - session_reset(rs); + filter_api_reject_code(tx->id, FILTER_FAIL, 421, "temporary failure"); } void -rspamd_send_query(struct session *rs) +rspamd_send_query(struct transaction *tx) { - iobuf_xfqueue(&rs->iobuf, "io", + struct session *rs = filter_api_session(tx->id); + + iobuf_xfqueue(&tx->iobuf, "io", "POST /check HTTP/1.0\r\n" "Transfer-Encoding: chunked\r\n" "Pass: all\r\n" @@ -191,46 +211,46 @@ rspamd_send_query(struct session *rs) rs->ip, rs->helo, rs->hostname, - rs->tx.from, - rs->tx.rcpt); - io_reload(&rs->io); + tx->from, + tx->rcpt); + io_reload(&tx->io); } void -rspamd_send_chunk(struct session *rs, const char *line) +rspamd_send_chunk(struct transaction *tx, const char *line) { if (line) - iobuf_xfqueue(&rs->iobuf, "io", "%x\r\n%s\r\n\r\n", + iobuf_xfqueue(&tx->iobuf, "io", "%x\r\n%s\r\n\r\n", strlen(line)+2, line); else { - iobuf_xfqueue(&rs->iobuf, "io", "0\r\n\r\n"); - rs->tx.eom = 1; + iobuf_xfqueue(&tx->iobuf, "io", "0\r\n\r\n"); + tx->eom = 1; } - io_reload(&rs->io); + io_reload(&tx->io); } void -rspamd_read_response(struct session *rs) +rspamd_read_response(struct transaction *tx) { char *line; - while ((line = iobuf_getline(&rs->iobuf, NULL))) + while ((line = iobuf_getline(&tx->iobuf, NULL))) if (strlen(line) == 0) - rs->rspamd.eoh = 1; + tx->rspamd.eoh = 1; - if (rs->rspamd.eoh) { - if (iobuf_len(&rs->iobuf) != 0) { - rs->rspamd.body = xmemdup(iobuf_data(&rs->iobuf), - iobuf_len(&rs->iobuf) + 1, "rspamd_read_response"); - rs->rspamd.body[iobuf_len(&rs->iobuf)] = 0; + if (tx->rspamd.eoh) { + if (iobuf_len(&tx->iobuf) != 0) { + tx->rspamd.body = xmemdup(iobuf_data(&tx->iobuf), + iobuf_len(&tx->iobuf) + 1, "rspamd_read_response"); + tx->rspamd.body[iobuf_len(&tx->iobuf)] = 0; } } - iobuf_normalize(&rs->iobuf); + iobuf_normalize(&tx->iobuf); } int -rspamd_parse_response(struct session *rs) +rspamd_parse_response(struct transaction *tx) { json_value *jv; json_value *def = NULL; @@ -238,7 +258,7 @@ rspamd_parse_response(struct session *rs) json_value *val; size_t i; - jv = json_parse(rs->rspamd.body, strlen(rs->rspamd.body)); + jv = json_parse(tx->rspamd.body, strlen(tx->rspamd.body)); if (jv == NULL || jv->type != json_object) goto fail; @@ -257,25 +277,25 @@ rspamd_parse_response(struct session *rs) val = def->u.object.values[i].value; if (val->type != json_boolean) goto fail; - rs->rspamd.is_spam = val->u.boolean; + tx->rspamd.is_spam = val->u.boolean; } else if (strcmp(name, "is_skipped") == 0) { val = def->u.object.values[i].value; if (val->type != json_boolean) goto fail; - rs->rspamd.is_skipped = val->u.boolean; + tx->rspamd.is_skipped = val->u.boolean; } else if (strcmp(name, "score") == 0) { val = def->u.object.values[i].value; if (val->type != json_double) goto fail; - rs->rspamd.score = val->u.dbl; + tx->rspamd.score = val->u.dbl; } else if (strcmp(name, "required_score") == 0) { val = def->u.object.values[i].value; if (val->type != json_double) goto fail; - rs->rspamd.required_score = val->u.dbl; + tx->rspamd.required_score = val->u.dbl; } else if (strcmp(name, "action") == 0) { val = def->u.object.values[i].value; @@ -284,28 +304,28 @@ rspamd_parse_response(struct session *rs) log_debug("[%.*s]", val->u.string.length, val->u.string.ptr); if (strncmp(val->u.string.ptr, "no action", val->u.string.length) == 0) - rs->rspamd.action = NO_ACTION; + tx->rspamd.action = NO_ACTION; else if (strncmp(val->u.string.ptr, "greylist", val->u.string.length) == 0) - rs->rspamd.action = GREYLIST; + tx->rspamd.action = GREYLIST; else if (strncmp(val->u.string.ptr, "add header", val->u.string.length) == 0) - rs->rspamd.action = ADD_HEADER; + tx->rspamd.action = ADD_HEADER; else if (strncmp(val->u.string.ptr, "rewrite subject", val->u.string.length) == 0) - rs->rspamd.action = REWRITE_SUBJECT; + tx->rspamd.action = REWRITE_SUBJECT; else if (strncmp(val->u.string.ptr, "soft reject", val->u.string.length) == 0) - rs->rspamd.action = SOFT_REJECT; + tx->rspamd.action = SOFT_REJECT; else if (strncmp(val->u.string.ptr, "reject", val->u.string.length) == 0) - rs->rspamd.action = REJECT; + tx->rspamd.action = REJECT; } else if (strcmp(name, "subject") == 0) { val = def->u.object.values[i].value; if (val->type != json_string) goto fail; - rs->rspamd.subject = xmemdup(val->u.string.ptr, + tx->rspamd.subject = xmemdup(val->u.string.ptr, val->u.string.length, "rspamd_parse_result"); } } @@ -321,58 +341,58 @@ fail: void rspamd_spam_header(const char *header, void *arg) { - struct session *rs = arg; + struct transaction *tx = arg; char buffer[4096]; snprintf(buffer, sizeof buffer, "X-Spam-Flag: %s", - rs->rspamd.is_spam ? "Yes" : "No"); - filter_api_writeln(rs->id, buffer); + tx->rspamd.is_spam ? "Yes" : "No"); + filter_api_writeln(tx->id, buffer); snprintf(buffer, sizeof buffer, "X-Spam-Score: %.2f", - rs->rspamd.score); - filter_api_writeln(rs->id, buffer); + tx->rspamd.score); + filter_api_writeln(tx->id, buffer); snprintf(buffer, sizeof buffer, "X-Spam-Status: %s, score=%.2f, required=%.2f", - rs->rspamd.is_spam ? "Yes" : "No", - rs->rspamd.score, - rs->rspamd.required_score); - filter_api_writeln(rs->id, buffer); + tx->rspamd.is_spam ? "Yes" : "No", + tx->rspamd.score, + tx->rspamd.required_score); + filter_api_writeln(tx->id, buffer); } int -rspamd_proceed(struct session *rs) +rspamd_proceed(struct transaction *tx) { - rfc2822_parser_init(&rs->tx.rfc2822_parser); - rfc2822_parser_reset(&rs->tx.rfc2822_parser); - rfc2822_header_default_callback(&rs->tx.rfc2822_parser, - headers_callback, rs); - rfc2822_body_callback(&rs->tx.rfc2822_parser, - dataline_callback, rs); - - switch (rs->rspamd.action) { + rfc2822_parser_init(&tx->rfc2822_parser); + rfc2822_parser_reset(&tx->rfc2822_parser); + rfc2822_header_default_callback(&tx->rfc2822_parser, + headers_callback, tx); + rfc2822_body_callback(&tx->rfc2822_parser, + dataline_callback, tx); + + switch (tx->rspamd.action) { case NO_ACTION: return 1; case SOFT_REJECT: - filter_api_reject_code(rs->id, FILTER_FAIL, 421, + filter_api_reject_code(tx->id, FILTER_FAIL, 421, "message content rejected"); return 0; case GREYLIST: - filter_api_reject_code(rs->id, FILTER_FAIL, 421, + filter_api_reject_code(tx->id, FILTER_FAIL, 421, "greylisted"); return 0; case REJECT: - filter_api_reject_code(rs->id, FILTER_FAIL, 550, + filter_api_reject_code(tx->id, FILTER_FAIL, 550, "message content rejected"); return 0; case ADD_HEADER: /* insert header */ log_debug("ADDING X-SPAM"); - rfc2822_missing_header_callback(&rs->tx.rfc2822_parser, - "x-spam", rspamd_spam_header, rs); + rfc2822_missing_header_callback(&tx->rfc2822_parser, + "x-spam", rspamd_spam_header, tx); return 1; case REWRITE_SUBJECT: @@ -380,60 +400,61 @@ rspamd_proceed(struct session *rs) return 1; } - - + filter_api_reject_code(tx->id, FILTER_FAIL, 550, + "server internal error"); + return 0; } void rspamd_io(struct io *io, int evt) { - struct session *rs = io->arg; - + struct transaction *tx = io->arg; + switch (evt) { case IO_CONNECTED: - rspamd_connected(rs); - rspamd_send_query(rs); + rspamd_connected(tx); + rspamd_send_query(tx); io_set_write(io); break; case IO_LOWAT: /* we've hit EOM and no more data, toggle to read */ - if (rs->tx.eom) + if (tx->eom) io_set_read(io); break; case IO_DATAIN: /* accumulate reply */ - rspamd_read_response(rs); + rspamd_read_response(tx); break; case IO_DISCONNECTED: - rspamd_disconnect(rs); + rspamd_disconnect(tx); /* we're done with rspamd, if there was a local error * during transaction, reject now, else move forward. */ - if (rs->tx.error) { - rspamd_error(rs); + if (tx->error) { + rspamd_error(tx); break; } /* process rspamd reply and start processing datahold */ - if (! rspamd_parse_response(rs)) { - rspamd_error(rs); + if (! rspamd_parse_response(tx)) { + rspamd_error(tx); break; } - if (! rspamd_proceed(rs)) + if (! rspamd_proceed(tx)) break; - filter_api_datahold_start(rs->id); + filter_api_datahold_start(tx->id); break; case IO_TIMEOUT: case IO_ERROR: default: - //rspamd_error(rs); + rspamd_error(tx); break; } return; @@ -442,24 +463,24 @@ rspamd_io(struct io *io, int evt) static void datahold_stream(uint64_t id, FILE *fp, void *arg) { - struct session *rs = arg; - size_t sz; - ssize_t len; - int ret; + struct transaction *tx = arg; + size_t sz; + ssize_t len; + int ret; errno = 0; - if ((len = getline(&rs->tx.line, &sz, fp)) == -1) { + if ((len = getline(&tx->line, &sz, fp)) == -1) { if (errno) { - filter_api_reject_code(rs->id, FILTER_FAIL, 421, + filter_api_reject_code(id, FILTER_FAIL, 421, "temporary failure"); return; } - filter_api_accept(rs->id); + filter_api_accept(id); return; } - rs->tx.line[strcspn(rs->tx.line, "\n")] = '\0'; - ret = rfc2822_parser_feed(&rs->tx.rfc2822_parser, - rs->tx.line); + tx->line[strcspn(tx->line, "\n")] = '\0'; + ret = rfc2822_parser_feed(&tx->rfc2822_parser, + tx->line); datahold_stream(id, fp, arg); } diff --git a/extras/filters/filter-rspamd/rspamd.h b/extras/filters/filter-rspamd/rspamd.h index e754386..a6161d1 100644 --- a/extras/filters/filter-rspamd/rspamd.h +++ b/extras/filters/filter-rspamd/rspamd.h @@ -20,26 +20,21 @@ #define RSPAMD_PORT "11333" struct session { - uint64_t id; - - struct iobuf iobuf; - struct io io; - char *ip; char *hostname; char *helo; +}; - struct tx { - FILE *fp; - char *line; - int error; - int eom; +struct transaction { + uint64_t id; - char *from; - char *rcpt; - struct rfc2822_parser rfc2822_parser; - } tx; + struct iobuf iobuf; + struct io io; + char *from; + char *rcpt; + int eom; + struct rspamd_response { int eoh; char *body; @@ -59,20 +54,31 @@ struct session { char *subject; } rspamd; + struct rfc2822_parser rfc2822_parser; + int error; + char *line; + FILE *fp; }; -struct session *session_allocator(uint64_t); -void session_destructor(struct session *); -void session_reset(struct session *); +void *session_allocator(uint64_t); +void session_destructor(void *); + +void *transaction_allocator(uint64_t); +void transaction_destructor(void *); + +int rspamd_connect(struct transaction *); +void rspamd_connected(struct transaction *); +void rspamd_send_query(struct transaction *); +void rspamd_send_chunk(struct transaction *, const char *); +void rspamd_read_response(struct transaction *); +int rspamd_parse_response(struct transaction *); +int rspamd_buffer(struct transaction *); +void rspamd_error(struct transaction *); -int rspamd_buffer(struct session *); -int rspamd_connect(struct session *); -void rspamd_connected(struct session *); -void rspamd_error(struct session *); void rspamd_io(struct io *, int); -void rspamd_send_query(struct session *); -void rspamd_send_chunk(struct session *, const char *); -void rspamd_read_response(struct session *); -int rspamd_parse_response(struct session *); + + + + |