aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgilles <gilles@poolp.org>2016-06-23 18:55:03 +0200
committergilles <gilles@poolp.org>2016-06-23 18:55:03 +0200
commitc3ad5ede788ef450318e39b72e499a67d032429f (patch)
tree30c691bcc0bc6e1dd67f606ba71793acf7d156ba
parentMerge branch 'master' into filter-rspamd (diff)
downloadOpenSMTPD-extras-c3ad5ede788ef450318e39b72e499a67d032429f.tar.xz
OpenSMTPD-extras-c3ad5ede788ef450318e39b72e499a67d032429f.zip
fix state machine bugs
-rw-r--r--extras/filters/filter-rspamd/filter_rspamd.c157
1 files changed, 89 insertions, 68 deletions
diff --git a/extras/filters/filter-rspamd/filter_rspamd.c b/extras/filters/filter-rspamd/filter_rspamd.c
index ac09f3e..fccd019 100644
--- a/extras/filters/filter-rspamd/filter_rspamd.c
+++ b/extras/filters/filter-rspamd/filter_rspamd.c
@@ -51,24 +51,32 @@ struct session {
struct tx {
FILE *fp;
- char *line;
- int eom;
- int rspamd_done;
+ char *line;
+ int error;
+ int eom;
- char *from;
- char *rcpt;
+ char *from;
+ char *rcpt;
} tx;
};
struct sockaddr_storage ss;
-static struct session *rspamd_session_init(uint64_t);
-static void rspamd_session_reset(struct session *);
-static void rspamd_session_free(struct session *);
+static struct session *session_init(uint64_t);
+static void session_reset(struct session *);
+static void session_free(struct session *);
+
+static int rspamd_connect(struct session *);
+static void rspamd_connected(struct session *);
+static void rspamd_error(struct session *);
static void rspamd_io(struct io *, int);
static void rspamd_send_query(struct session *);
-
+static void rspamd_send_chunk(struct session *, const char *);
+static void rspamd_read_response(struct session *);
+
+static void datahold_stream(uint64_t, FILE *, void *);
+
/* XXX
* this needs to be handled differently, but lets focus on the filter for now
*/
@@ -101,7 +109,7 @@ rspamd_resolve(const char *h, const char *p)
}
static struct session *
-rspamd_session_init(uint64_t id)
+session_init(uint64_t id)
{
struct session *rs;
@@ -111,24 +119,26 @@ rspamd_session_init(uint64_t id)
}
static void
-rspamd_session_reset(struct session *rs)
+session_reset(struct session *rs)
{
- free(rs->tx.from);
- free(rs->tx.rcpt);
+ iobuf_clear(&rs->iobuf);
+ io_clear(&rs->io);
filter_api_datahold_close(rs->id);
+ free(rs->tx.from);
+ free(rs->tx.rcpt);
+
rs->tx.eom = 0;
- rs->tx.rspamd_done = 0;
+ rs->tx.error = 0;
+ rs->tx.from = NULL;
+ rs->tx.rcpt = NULL;
}
static void
-rspamd_session_free(struct session *rs)
+session_free(struct session *rs)
{
- iobuf_clear(&rs->iobuf);
- io_clear(&rs->io);
-
- rspamd_session_reset(rs);
+ session_reset(rs);
free(rs->ip);
free(rs->hostname);
@@ -147,6 +157,13 @@ rspamd_connect(struct session *rs)
}
static void
+rspamd_disconnect(struct session *rs)
+{
+ iobuf_clear(&rs->iobuf);
+ io_clear(&rs->io);
+}
+
+static void
rspamd_connected(struct session *rs)
{
filter_api_accept(rs->id);
@@ -155,8 +172,8 @@ rspamd_connected(struct session *rs)
static void
rspamd_error(struct session *rs)
{
- /* XXX */
- filter_api_accept(rs->id);
+ filter_api_reject_code(rs->id, FILTER_FAIL, 421, "temporary failure");
+ session_reset(rs);
}
static void
@@ -164,13 +181,14 @@ rspamd_send_query(struct session *rs)
{
iobuf_xfqueue(&rs->iobuf, "io",
"POST /check HTTP/1.0\r\n"
+ "Transfer-Encoding: chunked\r\n"
+ "Pass: all\r\n"
"IP: %s\r\n"
"Helo: %s\r\n"
"Hostname: %s\r\n"
"From: %s\r\n"
"Rcpt: %s\r\n"
- "Pass: all\r\n"
- "Transfer-Encoding: chunked\r\n\r\n",
+ "\r\n",
rs->ip,
rs->helo,
rs->hostname,
@@ -182,21 +200,19 @@ rspamd_send_query(struct session *rs)
static void
rspamd_send_chunk(struct session *rs, const char *line)
{
- iobuf_xfqueue(&rs->iobuf, "io", "%x\r\n%s\r\n\r\n",
- strlen(line)+2, line);
- io_reload(&rs->io);
-}
-
-static void
-rspamd_send_eom(struct session *rs)
-{
- iobuf_xfqueue(&rs->iobuf, "io", "0\r\n\r\n");
- rs->tx.eom = 1;
+ if (line)
+ iobuf_xfqueue(&rs->iobuf, "io", "%x\r\n%s\r\n\r\n",
+ strlen(line)+2, line);
+ else {
+ iobuf_xfqueue(&rs->iobuf, "io", "0\r\n\r\n");
+ rs->tx.eom = 1;
+ }
+
io_reload(&rs->io);
}
static void
-rspamd_response(struct session *rs)
+rspamd_read_response(struct session *rs)
{
char *line;
@@ -208,29 +224,27 @@ rspamd_response(struct session *rs)
iobuf_data(&rs->iobuf));
}
iobuf_normalize(&rs->iobuf);
-
- rs->tx.rspamd_done = 1;
}
static void
-smtpd_stream_back(uint64_t id, FILE *fp, void *arg)
+datahold_stream(uint64_t id, FILE *fp, void *arg)
{
struct session *rs = arg;
size_t sz;
ssize_t len;
- len = getline(&rs->tx.line, &sz, fp);
-
- /* XXX */
- if (len == -1) {
+ errno = 0;
+ if ((len = getline(&rs->tx.line, &sz, fp)) == -1) {
+ if (errno) {
+ filter_api_reject_code(rs->id, FILTER_FAIL, 421,
+ "temporary failure");
+ return;
+ }
filter_api_accept(rs->id);
return;
}
- if (strcmp(rs->tx.line, "\n") == 0)
- filter_api_writeln(rs->id, "X-MangeDes: Bites");
-
- rs->tx.line[len-1] = 0;
+ rs->tx.line[strcspn(rs->tx.line, "\n")] = '\0';
log_debug("debug: STREAM BACK: [%s]", rs->tx.line);
filter_api_writeln(rs->id, rs->tx.line);
}
@@ -254,26 +268,29 @@ rspamd_io(struct io *io, int evt)
break;
case IO_DATAIN:
- rspamd_response(rs);
- if (rs->tx.rspamd_done) {
- log_debug("debug: ####### WILL STREAM BACK");
- filter_api_datahold_start(rs->id);
- io_set_write(io);
- }
+ /* accumulate reply */
+ rspamd_read_response(rs);
break;
case IO_DISCONNECTED:
+ rspamd_disconnect(rs);
+
+ /* we're done with rspamd, if there was a local error
+ * during transaction, reject now, else move forward.
+ */
+ if (rs->tx.error) {
+ rspamd_error(rs);
+ break;
+ }
log_debug("debug: DISCONNECT");
- rspamd_session_free(rs);
+ /* process rspamd reply and start processing datahold */
+ filter_api_datahold_start(rs->id);
break;
+
case IO_TIMEOUT:
- log_debug("debug: TIMEOUT");
- break;
case IO_ERROR:
- log_debug("debug: ERROR");
- break;
default:
- log_debug("debug: WTF");
+ //rspamd_error(rs);
break;
}
return;
@@ -291,7 +308,7 @@ on_connect(uint64_t id, struct filter_connect *conn)
struct session *rs;
const char *ip;
- rs = rspamd_session_init(id);
+ rs = session_init(id);
//ip = filter_api_sockaddr_to_text((struct sockaddr *)&conn->local);
ip = "127.0.0.1";
@@ -337,12 +354,14 @@ on_data(uint64_t id)
{
struct session *rs = filter_api_get_udata(id);
- rs->tx.fp = filter_api_datahold_open(id, smtpd_stream_back, rs);
+ rs->tx.fp = filter_api_datahold_open(id, datahold_stream, rs);
if (rs->tx.fp == NULL)
- return filter_api_accept(id); /* XXX */
+ return filter_api_reject_code(rs->id, FILTER_FAIL, 421,
+ "temporary failure");
if (! rspamd_connect(rs))
- return filter_api_accept(id); /* XXX */
+ return filter_api_reject_code(rs->id, FILTER_FAIL, 421,
+ "temporary failure");
return 1;
}
@@ -350,10 +369,12 @@ on_data(uint64_t id)
static void
on_dataline(uint64_t id, const char *line)
{
- struct session *rs = filter_api_get_udata(id);
+ struct session *rs = filter_api_get_udata(id);
+ ssize_t sz;
- /* XXX - tempfail here */
- fprintf(rs->tx.fp, "%s\n", line);
+ sz = fprintf(rs->tx.fp, "%s\n", line);
+ if (sz == -1 || sz < (ssize_t)strlen(line) + 1)
+ rs->tx.error = 1;
rspamd_send_chunk(rs, line);
}
@@ -362,7 +383,7 @@ on_eom(uint64_t id, size_t size)
{
struct session *rs = filter_api_get_udata(id);
- rspamd_send_eom(rs);
+ rspamd_send_chunk(rs, NULL);
}
static void
@@ -370,7 +391,7 @@ on_commit(uint64_t id)
{
struct session *rs = filter_api_get_udata(id);
- rspamd_session_reset(rs);
+ session_reset(rs);
}
static void
@@ -378,13 +399,13 @@ on_rollback(uint64_t id)
{
struct session *rs = filter_api_get_udata(id);
- rspamd_session_reset(rs);
+ session_reset(rs);
}
static void
on_disconnect(uint64_t id)
{
- rspamd_session_free((struct session *)filter_api_get_udata(id));
+ session_free(filter_api_get_udata(id));
}
int