diff options
author | gilles <gilles@poolp.org> | 2016-06-23 18:55:03 +0200 |
---|---|---|
committer | gilles <gilles@poolp.org> | 2016-06-23 18:55:03 +0200 |
commit | c3ad5ede788ef450318e39b72e499a67d032429f (patch) | |
tree | 30c691bcc0bc6e1dd67f606ba71793acf7d156ba | |
parent | Merge branch 'master' into filter-rspamd (diff) | |
download | OpenSMTPD-extras-c3ad5ede788ef450318e39b72e499a67d032429f.tar.xz OpenSMTPD-extras-c3ad5ede788ef450318e39b72e499a67d032429f.zip |
fix state machine bugs
-rw-r--r-- | extras/filters/filter-rspamd/filter_rspamd.c | 157 |
1 files changed, 89 insertions, 68 deletions
diff --git a/extras/filters/filter-rspamd/filter_rspamd.c b/extras/filters/filter-rspamd/filter_rspamd.c index ac09f3e..fccd019 100644 --- a/extras/filters/filter-rspamd/filter_rspamd.c +++ b/extras/filters/filter-rspamd/filter_rspamd.c @@ -51,24 +51,32 @@ struct session { struct tx { FILE *fp; - char *line; - int eom; - int rspamd_done; + char *line; + int error; + int eom; - char *from; - char *rcpt; + char *from; + char *rcpt; } tx; }; struct sockaddr_storage ss; -static struct session *rspamd_session_init(uint64_t); -static void rspamd_session_reset(struct session *); -static void rspamd_session_free(struct session *); +static struct session *session_init(uint64_t); +static void session_reset(struct session *); +static void session_free(struct session *); + +static int rspamd_connect(struct session *); +static void rspamd_connected(struct session *); +static void rspamd_error(struct session *); static void rspamd_io(struct io *, int); static void rspamd_send_query(struct session *); - +static void rspamd_send_chunk(struct session *, const char *); +static void rspamd_read_response(struct session *); + +static void datahold_stream(uint64_t, FILE *, void *); + /* XXX * this needs to be handled differently, but lets focus on the filter for now */ @@ -101,7 +109,7 @@ rspamd_resolve(const char *h, const char *p) } static struct session * -rspamd_session_init(uint64_t id) +session_init(uint64_t id) { struct session *rs; @@ -111,24 +119,26 @@ rspamd_session_init(uint64_t id) } static void -rspamd_session_reset(struct session *rs) +session_reset(struct session *rs) { - free(rs->tx.from); - free(rs->tx.rcpt); + iobuf_clear(&rs->iobuf); + io_clear(&rs->io); filter_api_datahold_close(rs->id); + free(rs->tx.from); + free(rs->tx.rcpt); + rs->tx.eom = 0; - rs->tx.rspamd_done = 0; + rs->tx.error = 0; + rs->tx.from = NULL; + rs->tx.rcpt = NULL; } static void -rspamd_session_free(struct session *rs) +session_free(struct session *rs) { - iobuf_clear(&rs->iobuf); - io_clear(&rs->io); - - rspamd_session_reset(rs); + session_reset(rs); free(rs->ip); free(rs->hostname); @@ -147,6 +157,13 @@ rspamd_connect(struct session *rs) } static void +rspamd_disconnect(struct session *rs) +{ + iobuf_clear(&rs->iobuf); + io_clear(&rs->io); +} + +static void rspamd_connected(struct session *rs) { filter_api_accept(rs->id); @@ -155,8 +172,8 @@ rspamd_connected(struct session *rs) static void rspamd_error(struct session *rs) { - /* XXX */ - filter_api_accept(rs->id); + filter_api_reject_code(rs->id, FILTER_FAIL, 421, "temporary failure"); + session_reset(rs); } static void @@ -164,13 +181,14 @@ rspamd_send_query(struct session *rs) { iobuf_xfqueue(&rs->iobuf, "io", "POST /check HTTP/1.0\r\n" + "Transfer-Encoding: chunked\r\n" + "Pass: all\r\n" "IP: %s\r\n" "Helo: %s\r\n" "Hostname: %s\r\n" "From: %s\r\n" "Rcpt: %s\r\n" - "Pass: all\r\n" - "Transfer-Encoding: chunked\r\n\r\n", + "\r\n", rs->ip, rs->helo, rs->hostname, @@ -182,21 +200,19 @@ rspamd_send_query(struct session *rs) static void rspamd_send_chunk(struct session *rs, const char *line) { - iobuf_xfqueue(&rs->iobuf, "io", "%x\r\n%s\r\n\r\n", - strlen(line)+2, line); - io_reload(&rs->io); -} - -static void -rspamd_send_eom(struct session *rs) -{ - iobuf_xfqueue(&rs->iobuf, "io", "0\r\n\r\n"); - rs->tx.eom = 1; + if (line) + iobuf_xfqueue(&rs->iobuf, "io", "%x\r\n%s\r\n\r\n", + strlen(line)+2, line); + else { + iobuf_xfqueue(&rs->iobuf, "io", "0\r\n\r\n"); + rs->tx.eom = 1; + } + io_reload(&rs->io); } static void -rspamd_response(struct session *rs) +rspamd_read_response(struct session *rs) { char *line; @@ -208,29 +224,27 @@ rspamd_response(struct session *rs) iobuf_data(&rs->iobuf)); } iobuf_normalize(&rs->iobuf); - - rs->tx.rspamd_done = 1; } static void -smtpd_stream_back(uint64_t id, FILE *fp, void *arg) +datahold_stream(uint64_t id, FILE *fp, void *arg) { struct session *rs = arg; size_t sz; ssize_t len; - len = getline(&rs->tx.line, &sz, fp); - - /* XXX */ - if (len == -1) { + errno = 0; + if ((len = getline(&rs->tx.line, &sz, fp)) == -1) { + if (errno) { + filter_api_reject_code(rs->id, FILTER_FAIL, 421, + "temporary failure"); + return; + } filter_api_accept(rs->id); return; } - if (strcmp(rs->tx.line, "\n") == 0) - filter_api_writeln(rs->id, "X-MangeDes: Bites"); - - rs->tx.line[len-1] = 0; + rs->tx.line[strcspn(rs->tx.line, "\n")] = '\0'; log_debug("debug: STREAM BACK: [%s]", rs->tx.line); filter_api_writeln(rs->id, rs->tx.line); } @@ -254,26 +268,29 @@ rspamd_io(struct io *io, int evt) break; case IO_DATAIN: - rspamd_response(rs); - if (rs->tx.rspamd_done) { - log_debug("debug: ####### WILL STREAM BACK"); - filter_api_datahold_start(rs->id); - io_set_write(io); - } + /* accumulate reply */ + rspamd_read_response(rs); break; case IO_DISCONNECTED: + rspamd_disconnect(rs); + + /* we're done with rspamd, if there was a local error + * during transaction, reject now, else move forward. + */ + if (rs->tx.error) { + rspamd_error(rs); + break; + } log_debug("debug: DISCONNECT"); - rspamd_session_free(rs); + /* process rspamd reply and start processing datahold */ + filter_api_datahold_start(rs->id); break; + case IO_TIMEOUT: - log_debug("debug: TIMEOUT"); - break; case IO_ERROR: - log_debug("debug: ERROR"); - break; default: - log_debug("debug: WTF"); + //rspamd_error(rs); break; } return; @@ -291,7 +308,7 @@ on_connect(uint64_t id, struct filter_connect *conn) struct session *rs; const char *ip; - rs = rspamd_session_init(id); + rs = session_init(id); //ip = filter_api_sockaddr_to_text((struct sockaddr *)&conn->local); ip = "127.0.0.1"; @@ -337,12 +354,14 @@ on_data(uint64_t id) { struct session *rs = filter_api_get_udata(id); - rs->tx.fp = filter_api_datahold_open(id, smtpd_stream_back, rs); + rs->tx.fp = filter_api_datahold_open(id, datahold_stream, rs); if (rs->tx.fp == NULL) - return filter_api_accept(id); /* XXX */ + return filter_api_reject_code(rs->id, FILTER_FAIL, 421, + "temporary failure"); if (! rspamd_connect(rs)) - return filter_api_accept(id); /* XXX */ + return filter_api_reject_code(rs->id, FILTER_FAIL, 421, + "temporary failure"); return 1; } @@ -350,10 +369,12 @@ on_data(uint64_t id) static void on_dataline(uint64_t id, const char *line) { - struct session *rs = filter_api_get_udata(id); + struct session *rs = filter_api_get_udata(id); + ssize_t sz; - /* XXX - tempfail here */ - fprintf(rs->tx.fp, "%s\n", line); + sz = fprintf(rs->tx.fp, "%s\n", line); + if (sz == -1 || sz < (ssize_t)strlen(line) + 1) + rs->tx.error = 1; rspamd_send_chunk(rs, line); } @@ -362,7 +383,7 @@ on_eom(uint64_t id, size_t size) { struct session *rs = filter_api_get_udata(id); - rspamd_send_eom(rs); + rspamd_send_chunk(rs, NULL); } static void @@ -370,7 +391,7 @@ on_commit(uint64_t id) { struct session *rs = filter_api_get_udata(id); - rspamd_session_reset(rs); + session_reset(rs); } static void @@ -378,13 +399,13 @@ on_rollback(uint64_t id) { struct session *rs = filter_api_get_udata(id); - rspamd_session_reset(rs); + session_reset(rs); } static void on_disconnect(uint64_t id) { - rspamd_session_free((struct session *)filter_api_get_udata(id)); + session_free(filter_api_get_udata(id)); } int |