diff options
author | gilles <gilles@poolp.org> | 2016-06-24 17:31:37 +0200 |
---|---|---|
committer | gilles <gilles@poolp.org> | 2016-06-24 17:31:37 +0200 |
commit | a994c110e9e344bfb9914ffd5e262c0259fc4232 (patch) | |
tree | fb0b3b6358a7b88bdf4c7a7d66e727ba967e088c | |
parent | completely separate session and transaction data (diff) | |
download | OpenSMTPD-extras-a994c110e9e344bfb9914ffd5e262c0259fc4232.tar.xz OpenSMTPD-extras-a994c110e9e344bfb9914ffd5e262c0259fc4232.zip |
convert wip rspamd filter to new data buffering API
add API calls to deal with buffered data + headers remove/replace/add
-rw-r--r-- | api/filter_api.c | 281 | ||||
-rw-r--r-- | api/rfc2822.c | 22 | ||||
-rw-r--r-- | api/rfc2822.h | 8 | ||||
-rw-r--r-- | api/smtpd-api.h | 5 | ||||
-rw-r--r-- | api/util.c | 17 | ||||
-rw-r--r-- | extras/filters/filter-rspamd/filter_rspamd.c | 11 | ||||
-rw-r--r-- | extras/filters/filter-rspamd/rspamd.c | 107 | ||||
-rw-r--r-- | extras/filters/filter-rspamd/rspamd.h | 3 |
8 files changed, 295 insertions, 159 deletions
diff --git a/api/filter_api.c b/api/filter_api.c index 9de5378..ab818ec 100644 --- a/api/filter_api.c +++ b/api/filter_api.c @@ -68,13 +68,17 @@ struct filter_session { char *line; } response; - FILE *datahold; - void (*datahold_cb)(uint64_t, FILE *, void *); - void *datahold_arg; - void *session; void *transaction; void *udata; + + void *data_buffer; + void (*data_buffer_cb)(uint64_t, FILE *, void *); + + struct rfc2822_parser rfc2822_parser; + struct dict headers_replace; + struct dict headers_add; + }; struct filter_timer { @@ -112,11 +116,13 @@ static struct filter_internals { void (*rollback)(uint64_t); } cb; - void *(*session_allocator)(uint64_t); - void (*session_destructor)(void *); + void *(*session_allocator)(uint64_t); + void (*session_destructor)(void *); - void *(*transaction_allocator)(uint64_t); - void (*transaction_destructor)(void *); + void *(*transaction_allocator)(uint64_t); + void (*transaction_destructor)(void *); + + int data_buffered; } fi; static void filter_api_init(void); @@ -144,6 +150,10 @@ static const char *hook_to_str(int); static const char *query_to_str(int); static const char *event_to_str(int); +static void data_buffered_setup(struct filter_session *); +static void data_buffered_release(struct filter_session *); +static void data_buffered_stream_process(uint64_t, FILE *, void *); + static void filter_response(struct filter_session *s, int status, int code, const char *line) @@ -275,33 +285,39 @@ filter_dispatch(struct mproc *p, struct imsg *imsg) break; case EVENT_RESET: filter_dispatch_reset(id); + s = tree_xget(&sessions, id); if (fi.transaction_destructor) { - s = tree_xget(&sessions, id); if (s->transaction) { fi.transaction_destructor(s->transaction); s->transaction = NULL; } } + if (s->data_buffer) + data_buffered_release(s); break; case EVENT_COMMIT: filter_dispatch_commit(id); + s = tree_xget(&sessions, id); if (fi.transaction_destructor) { - s = tree_xget(&sessions, id); if (s->transaction) { fi.transaction_destructor(s->transaction); s->transaction = NULL; } } + if (s->data_buffer) + data_buffered_release(s); break; case EVENT_ROLLBACK: filter_dispatch_rollback(id); + s = tree_xget(&sessions, id); if (fi.transaction_destructor) { - s = tree_xget(&sessions, id); if (s->transaction) { fi.transaction_destructor(s->transaction); s->transaction = NULL; } } + if (s->data_buffer) + data_buffered_release(s); break; default: log_warnx("warn: filter-api:%s bad event %d", filter_name, type); @@ -349,6 +365,12 @@ filter_dispatch(struct mproc *p, struct imsg *imsg) break; case QUERY_DATA: m_end(&m); + + if (fi.data_buffered) { + s = tree_xget(&sessions, id); + data_buffered_setup(s); + } + filter_register_query(id, qid, type); filter_dispatch_data(id); break; @@ -505,8 +527,6 @@ filter_dispatch_commit(uint64_t id) io_clear(&s->pipe.iev); iobuf_clear(&s->pipe.ibuf); - filter_api_datahold_close(id); - if (fi.cb.commit) fi.cb.commit(id); } @@ -526,8 +546,6 @@ filter_dispatch_rollback(uint64_t id) io_clear(&s->pipe.iev); iobuf_clear(&s->pipe.ibuf); - filter_api_datahold_close(id); - if (fi.cb.rollback) fi.cb.rollback(id); } @@ -645,6 +663,10 @@ filter_io_in(struct io *io, int evt) s->pipe.idatalen += len + 1; /* XXX warning: do not clear io from this call! */ + if (s->data_buffer) { + /* XXX handle errors somehow */ + fprintf(s->data_buffer, "%s\n", line); + } filter_dispatch_dataline(s->id, line); goto nextline; @@ -701,8 +723,8 @@ filter_io_out(struct io *io, int evt) break; /* just wait for more data to send or feed through callback */ - if (s->datahold_cb) - s->datahold_cb(s->id, s->datahold, s->datahold_arg); + if (s->data_buffer_cb) + s->data_buffer_cb(s->id, s->data_buffer, s); return; default: @@ -1156,6 +1178,31 @@ filter_api_writeln(uint64_t id, const char *line) io_reload(&s->pipe.oev); } +void +filter_api_printf(uint64_t id, const char *fmt, ...) +{ + struct filter_session *s; + va_list ap; + int len; + + log_trace(TRACE_FILTERS, "filter-api:%s %016"PRIx64" filter_api_printf(%s)", + filter_name, id, fmt); + + s = tree_xget(&sessions, id); + + if (s->pipe.oev.sock == -1) { + log_warnx("warn: session %016"PRIx64": write out of sequence", id); + return; + } + + va_start(ap, fmt); + len = iobuf_vfqueue(&s->pipe.obuf, fmt, ap); + iobuf_fqueue(&s->pipe.obuf, "\n"); + va_end(ap); + s->pipe.odatalen += len + 1; + io_reload(&s->pipe.oev); +} + static void filter_api_timer_cb(int fd, short evt, void *arg) { @@ -1205,65 +1252,207 @@ filter_api_mailaddr_to_text(const struct mailaddr *maddr) return (buffer); } -FILE * -filter_api_datahold_open(uint64_t id, void (*callback)(uint64_t, FILE *, void *), void *arg) + +/* X X X */ +static void +data_buffered_stream_process(uint64_t id, FILE *fp, void *arg) { struct filter_session *s; - FILE *fp; - int fd; - char pathname[] = "/tmp/XXXXXXXXXX"; + size_t sz; + ssize_t len; + char *line = NULL; s = tree_xget(&sessions, id); - - if (s->datahold) { - log_warnx("warn: filter-api:%s filter_api_datahold_open: already opened !", - filter_name); - fatalx("filter-api: exiting"); + errno = 0; + if ((len = getline(&line, &sz, fp)) == -1) { + if (errno) { + filter_api_reject_code(id, FILTER_FAIL, 421, + "Internal Server Error"); + return; + } + filter_api_accept(id); + return; } + line[strcspn(line, "\n")] = '\0'; + //log_debug("##### FEEDING [%s]", line); + rfc2822_parser_feed(&s->rfc2822_parser, line); + /* XXX */ + data_buffered_stream_process(id, fp, s); +} - if (!s->tx) { - log_warnx("warn: filter-api:%s filter_api_datahold_open: not in transaction !", - filter_name); - fatalx("filter-api: exiting"); +static void +default_header_callback(const struct rfc2822_header *hdr, void *arg) +{ + struct filter_session *s = arg; + struct rfc2822_line *l; + int i = 0; + + TAILQ_FOREACH(l, &hdr->lines, next) { + if (i++ == 0) { + filter_api_printf(s->id, "%s: %s", hdr->name, l->buffer + 1); + continue; + } + filter_api_printf(s->id, "%s", l->buffer); } +} + +static void +default_body_callback(const char *line, void *arg) +{ + struct filter_session *s = arg; + + filter_api_writeln(s->id, line); +} + +static void +header_remove_callback(const struct rfc2822_header *hdr, void *arg) +{ +} + +static void +header_replace_callback(const struct rfc2822_header *hdr, void *arg) +{ + struct filter_session *s = arg; + char *value; + char *key; + + key = xstrdup(hdr->name, "header_replace_callback"); + lowercase(key, key, strlen(key)+1); + + value = dict_xget(&s->headers_replace, key); + filter_api_printf(s->id, "%s: %s", hdr->name, value); + free(key); +} + +static void +header_eoh_callback(void *arg) +{ + struct filter_session *s = arg; + void *iter; + const char *key; + void *data; + + iter = NULL; + while (dict_iter(&s->headers_add, &iter, &key, &data)) + filter_api_printf(s->id, "%s: %s", key, (char *)data); +} + +void +data_buffered_setup(struct filter_session *s) +{ + FILE *fp; + int fd; + char pathname[] = "/tmp/XXXXXXXXXX"; fd = mkstemp(pathname); if (fd == -1) - return (NULL); + return; fp = fdopen(fd, "w+b"); if (fp == NULL) { close(fd); - return (NULL); + return; + } + unlink(pathname); + + s->data_buffer = fp; + s->data_buffer_cb = data_buffered_stream_process; + + rfc2822_parser_init(&s->rfc2822_parser); + rfc2822_parser_reset(&s->rfc2822_parser); + rfc2822_header_default_callback(&s->rfc2822_parser, + default_header_callback, s); + rfc2822_body_callback(&s->rfc2822_parser, + default_body_callback, s); + rfc2822_eoh_callback(&s->rfc2822_parser, + header_eoh_callback, s); + + dict_init(&s->headers_replace); + dict_init(&s->headers_add); +} + +static void +data_buffered_release(struct filter_session *s) +{ + void *data; + + rfc2822_parser_release(&s->rfc2822_parser); + if (s->data_buffer) { + fclose(s->data_buffer); + s->data_buffer = NULL; } - s->datahold = fp; - s->datahold_cb = callback; - s->datahold_arg = arg; + while (dict_poproot(&s->headers_replace, &data)) + free(data); - return (fp); + while (dict_poproot(&s->headers_add, &data)) + free(data); } void -filter_api_datahold_start(uint64_t id) +filter_api_data_buffered(void) +{ + fi.data_buffered = 1; +} + +void +filter_api_data_buffered_stream(uint64_t id) { struct filter_session *s; s = tree_xget(&sessions, id); - if (s->datahold) - fseek(s->datahold, 0, 0); + if (s->data_buffer) + fseek(s->data_buffer, 0, 0); io_callback(&s->pipe.oev, IO_LOWAT); } void -filter_api_datahold_close(uint64_t id) +filter_api_header_remove(uint64_t id, const char *header) +{ + struct filter_session *s; + + s = tree_xget(&sessions, id); + rfc2822_header_callback(&s->rfc2822_parser, header, + header_remove_callback, s); +} + +void +filter_api_header_replace(uint64_t id, const char *header, const char *fmt, ...) +{ + struct filter_session *s; + char *key; + char *buffer = NULL; + va_list ap; + + s = tree_xget(&sessions, id); + va_start(ap, fmt); + vasprintf(&buffer, fmt, ap); + va_end(ap); + + key = xstrdup(header, "filter_api_header_replace"); + lowercase(key, key, strlen(key)+1); + dict_set(&s->headers_replace, header, buffer); + free(key); + + rfc2822_header_callback(&s->rfc2822_parser, header, + header_replace_callback, s); +} + +void +filter_api_header_add(uint64_t id, const char *header, const char *fmt, ...) { struct filter_session *s; + char *key; + char *buffer = NULL; + va_list ap; s = tree_xget(&sessions, id); - if (s->datahold) - fclose(s->datahold); - s->datahold = NULL; - s->datahold_cb = NULL; - s->datahold_arg = NULL; + va_start(ap, fmt); + vasprintf(&buffer, fmt, ap); + va_end(ap); + + key = xstrdup(header, "filter_api_header_replace"); + lowercase(key, key, strlen(key)+1); + dict_set(&s->headers_add, header, buffer); + free(key); } diff --git a/api/rfc2822.c b/api/rfc2822.c index 0b80f35..0fadf2e 100644 --- a/api/rfc2822.c +++ b/api/rfc2822.c @@ -82,6 +82,13 @@ missing_headers_callback(struct rfc2822_parser *rp) } static void +eoh_callback(struct rfc2822_parser *rp) +{ + if (rp->eoh_cb.func) + rp->eoh_cb.func(rp->eoh_cb.arg); +} + +static void body_callback(struct rfc2822_parser *rp, const char *line) { rp->body_line_cb.func(line, rp->body_line_cb.arg); @@ -184,8 +191,10 @@ rfc2822_parser_feed(struct rfc2822_parser *rp, const char *line) /* no longer in headers */ if (*line == '\0') { - if (rp->in_hdrs) + if (rp->in_hdrs) { missing_headers_callback(rp); + eoh_callback(rp); + } rp->in_hdrs = 0; } @@ -278,3 +287,14 @@ rfc2822_body_callback(struct rfc2822_parser *rp, cb->arg = arg; } +void +rfc2822_eoh_callback(struct rfc2822_parser *rp, + void (*func)(void *), void *arg) +{ + struct rfc2822_eoh_cb *cb; + + cb = &rp->eoh_cb; + cb->func = func; + cb->arg = arg; +} + diff --git a/api/rfc2822.h b/api/rfc2822.h index 48bdf6d..30b878b 100644 --- a/api/rfc2822.h +++ b/api/rfc2822.h @@ -52,6 +52,11 @@ struct rfc2822_line_cb { void *arg; }; +struct rfc2822_eoh_cb { + void (*func)(void *); + void *arg; +}; + struct rfc2822_parser { uint8_t in_hdrs; /* in headers */ @@ -62,6 +67,7 @@ struct rfc2822_parser { struct rfc2822_header header; struct rfc2822_hdr_cb hdr_dflt_cb; + struct rfc2822_eoh_cb eoh_cb; struct rfc2822_line_cb body_line_cb; }; @@ -79,5 +85,7 @@ void rfc2822_header_default_callback(struct rfc2822_parser *, void (*)(const struct rfc2822_header *, void *), void *); void rfc2822_body_callback(struct rfc2822_parser *, void (*)(const char *, void *), void *); +void rfc2822_eoh_callback(struct rfc2822_parser *, + void (*)(void *), void *); #endif diff --git a/api/smtpd-api.h b/api/smtpd-api.h index 696975f..8c6c7af 100644 --- a/api/smtpd-api.h +++ b/api/smtpd-api.h @@ -393,12 +393,16 @@ void filter_api_no_chroot(void); void filter_api_set_udata(uint64_t, void *); void *filter_api_get_udata(uint64_t); +void filter_api_data_buffered(void); +void filter_api_data_buffered_stream(uint64_t); + void filter_api_loop(void); int filter_api_accept(uint64_t); int filter_api_reject(uint64_t, enum filter_status); int filter_api_reject_code(uint64_t, enum filter_status, uint32_t, const char *); void filter_api_writeln(uint64_t, const char *); +void filter_api_printf(uint64_t id, const char *, ...); void filter_api_timer(uint64_t, uint32_t, void (*)(uint64_t, void *), void *); const char *filter_api_sockaddr_to_text(const struct sockaddr *); const char *filter_api_mailaddr_to_text(const struct mailaddr *); @@ -547,5 +551,6 @@ void iobuf_xfqueue(struct iobuf *, const char *, const char *, ...); char *strip(char *); int base64_encode(unsigned char const *, size_t, char *, size_t); int base64_decode(char const *, unsigned char *, size_t); +int lowercase(char *, const char *, size_t); #endif @@ -128,6 +128,23 @@ strip(char *s) } int +lowercase(char *buf, const char *s, size_t len) +{ + if (len == 0) + return 0; + + if (strlcpy(buf, s, len) >= len) + return 0; + + while (*buf != '\0') { + *buf = tolower((unsigned char)*buf); + buf++; + } + + return 1; +} + +int base64_encode(unsigned char const *src, size_t srclen, char *dest, size_t destsize) { diff --git a/extras/filters/filter-rspamd/filter_rspamd.c b/extras/filters/filter-rspamd/filter_rspamd.c index b43ec0e..fe1077d 100644 --- a/extras/filters/filter-rspamd/filter_rspamd.c +++ b/extras/filters/filter-rspamd/filter_rspamd.c @@ -87,10 +87,6 @@ on_data(uint64_t id) { struct transaction *tx = filter_api_transaction(id); - if (! rspamd_buffer(tx)) - return filter_api_reject_code(id, FILTER_FAIL, 421, - "temporary failure"); - if (! rspamd_connect(tx)) return filter_api_reject_code(id, FILTER_FAIL, 421, "temporary failure"); @@ -102,11 +98,6 @@ static void on_dataline(uint64_t id, const char *line) { struct transaction *tx = filter_api_transaction(id); - ssize_t sz; - - sz = fprintf(tx->fp, "%s\n", line); - if (sz == -1 || sz < (ssize_t)strlen(line) + 1) - tx->error = 1; rspamd_send_chunk(tx, line); } @@ -193,6 +184,8 @@ main(int argc, char **argv) filter_api_transaction_allocator(transaction_allocator); filter_api_transaction_destructor(transaction_destructor); + filter_api_data_buffered(); + /* if (c) filter_api_set_chroot(c); diff --git a/extras/filters/filter-rspamd/rspamd.c b/extras/filters/filter-rspamd/rspamd.c index 78065b3..2ae8109 100644 --- a/extras/filters/filter-rspamd/rspamd.c +++ b/extras/filters/filter-rspamd/rspamd.c @@ -36,14 +36,9 @@ struct sockaddr_storage ss; -static void datahold_stream(uint64_t, FILE *, void *); - - void * session_allocator(uint64_t id) { - struct session *rs; - return xcalloc(1, sizeof (struct session), "on_connect"); } @@ -77,8 +72,6 @@ transaction_destructor(void *ctx) iobuf_clear(&tx->iobuf); io_clear(&tx->io); - filter_api_datahold_close(tx->id); - if (tx->from) free(tx->from); if (tx->rcpt) @@ -93,8 +86,6 @@ transaction_destructor(void *ctx) - - /* XXX * this needs to be handled differently, but lets focus on the filter for now */ @@ -126,44 +117,6 @@ rspamd_resolve(const char *h, const char *p) fatalx("resolve: failed"); } -static void -headers_callback(const struct rfc2822_header *hdr, void *arg) -{ - struct transaction *tx = arg; - struct rfc2822_line *l; - char buffer[4096]; - int i = 0; - - log_debug("#######1"); - TAILQ_FOREACH(l, &hdr->lines, next) { - if (i++ == 0) { - snprintf(buffer, sizeof buffer, "%s:%s", hdr->name, l->buffer); - filter_api_writeln(tx->id, buffer); - continue; - } - filter_api_writeln(tx->id, l->buffer); - } -} - -static void -dataline_callback(const char *line, void *arg) -{ - struct transaction *tx = arg; - - log_debug("debug: STREAM BACK: [%s]", tx->line); - filter_api_writeln(tx->id, tx->line); -} - - -int -rspamd_buffer(struct transaction *tx) -{ - tx->fp = filter_api_datahold_open(tx->id, datahold_stream, tx); - if (tx->fp == NULL) - return 0; - return 1; -} - int rspamd_connect(struct transaction *tx) { @@ -339,36 +292,17 @@ fail: } void -rspamd_spam_header(const char *header, void *arg) +rspamd_spam_headers(struct transaction *tx) { - struct transaction *tx = arg; - char buffer[4096]; - - snprintf(buffer, sizeof buffer, "X-Spam-Flag: %s", + filter_api_header_add(tx->id, "X-Spam-Flag", "%s", tx->rspamd.is_spam ? "Yes" : "No"); - filter_api_writeln(tx->id, buffer); - - snprintf(buffer, sizeof buffer, "X-Spam-Score: %.2f", + filter_api_header_add(tx->id, "X-Spam-Score", "%.2f", tx->rspamd.score); - filter_api_writeln(tx->id, buffer); - - snprintf(buffer, sizeof buffer, "X-Spam-Status: %s, score=%.2f, required=%.2f", - tx->rspamd.is_spam ? "Yes" : "No", - tx->rspamd.score, - tx->rspamd.required_score); - filter_api_writeln(tx->id, buffer); } int rspamd_proceed(struct transaction *tx) { - rfc2822_parser_init(&tx->rfc2822_parser); - rfc2822_parser_reset(&tx->rfc2822_parser); - rfc2822_header_default_callback(&tx->rfc2822_parser, - headers_callback, tx); - rfc2822_body_callback(&tx->rfc2822_parser, - dataline_callback, tx); - switch (tx->rspamd.action) { case NO_ACTION: return 1; @@ -390,9 +324,7 @@ rspamd_proceed(struct transaction *tx) case ADD_HEADER: /* insert header */ - log_debug("ADDING X-SPAM"); - rfc2822_missing_header_callback(&tx->rfc2822_parser, - "x-spam", rspamd_spam_header, tx); + rspamd_spam_headers(tx); return 1; case REWRITE_SUBJECT: @@ -439,7 +371,7 @@ rspamd_io(struct io *io, int evt) rspamd_error(tx); break; } - /* process rspamd reply and start processing datahold */ + if (! rspamd_parse_response(tx)) { rspamd_error(tx); break; @@ -447,8 +379,8 @@ rspamd_io(struct io *io, int evt) if (! rspamd_proceed(tx)) break; - - filter_api_datahold_start(tx->id); + + filter_api_data_buffered_stream(tx->id); break; case IO_TIMEOUT: @@ -459,28 +391,3 @@ rspamd_io(struct io *io, int evt) } return; } - -static void -datahold_stream(uint64_t id, FILE *fp, void *arg) -{ - struct transaction *tx = arg; - size_t sz; - ssize_t len; - int ret; - - errno = 0; - if ((len = getline(&tx->line, &sz, fp)) == -1) { - if (errno) { - filter_api_reject_code(id, FILTER_FAIL, 421, - "temporary failure"); - return; - } - filter_api_accept(id); - return; - } - - tx->line[strcspn(tx->line, "\n")] = '\0'; - ret = rfc2822_parser_feed(&tx->rfc2822_parser, - tx->line); - datahold_stream(id, fp, arg); -} diff --git a/extras/filters/filter-rspamd/rspamd.h b/extras/filters/filter-rspamd/rspamd.h index a6161d1..8a36c2c 100644 --- a/extras/filters/filter-rspamd/rspamd.h +++ b/extras/filters/filter-rspamd/rspamd.h @@ -54,10 +54,8 @@ struct transaction { char *subject; } rspamd; - struct rfc2822_parser rfc2822_parser; int error; char *line; - FILE *fp; }; void *session_allocator(uint64_t); @@ -72,7 +70,6 @@ void rspamd_send_query(struct transaction *); void rspamd_send_chunk(struct transaction *, const char *); void rspamd_read_response(struct transaction *); int rspamd_parse_response(struct transaction *); -int rspamd_buffer(struct transaction *); void rspamd_error(struct transaction *); void rspamd_io(struct io *, int); |