aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgilles <gilles@poolp.org>2016-06-23 00:32:48 +0200
committergilles <gilles@poolp.org>2016-06-23 00:32:48 +0200
commit5fbe5e9277767bb539168d0b90d34a562126bc52 (patch)
tree2a9fd966b61eccc5ecb7d2b51b45751f5e3ca56d
parentMerge branch 'master' into filter-rspamd (diff)
parentwip (diff)
downloadOpenSMTPD-extras-5fbe5e9277767bb539168d0b90d34a562126bc52.tar.xz
OpenSMTPD-extras-5fbe5e9277767bb539168d0b90d34a562126bc52.zip
Merge branch 'filter-rspamd' of ssh://ssh.poolp.org/git/opensmtpd-extras into filter-rspamd
-rw-r--r--extras/wip/filters/filter-rspamd/filter_rspamd.c190
1 files changed, 122 insertions, 68 deletions
diff --git a/extras/wip/filters/filter-rspamd/filter_rspamd.c b/extras/wip/filters/filter-rspamd/filter_rspamd.c
index 465fc08..aa99815 100644
--- a/extras/wip/filters/filter-rspamd/filter_rspamd.c
+++ b/extras/wip/filters/filter-rspamd/filter_rspamd.c
@@ -26,6 +26,7 @@
#include <errno.h>
#include <ctype.h>
#include <limits.h>
+#include <stdlib.h>
#include <unistd.h>
#include "smtpd-defines.h"
@@ -37,14 +38,6 @@
#define RSPAMD_HOST "127.0.0.1"
#define RSPAMD_PORT "11333"
-struct transaction {
- char *from;
- char *rcpt;
-
- int inbody;
- struct iobuf headers;
- struct iobuf body;
-};
struct session {
uint64_t id;
@@ -56,14 +49,22 @@ struct session {
char *hostname;
char *helo;
- struct transaction tx;
+
+ /* transaction specific */
+ FILE *fp;
+
+ int eom;
+ int reply;
+
+ char *from;
+ char *rcpt;
};
struct sockaddr_storage ss;
static struct session *rspamd_session_init(uint64_t);
+static void rspamd_session_reset(struct session *);
static void rspamd_session_free(struct session *);
-static void rspamd_transaction_clear(struct transaction *);
static void rspamd_io(struct io *, int);
static int
@@ -94,8 +95,7 @@ on_mail(uint64_t id, struct mailaddr *mail)
{
struct session *rs = filter_api_get_udata(id);
- rspamd_transaction_clear(&rs->tx);
- rs->tx.from = xstrdup("gilles@poolp.org", "on_mail");
+ rs->from = xstrdup("gilles@poolp.org", "on_mail");
return filter_api_accept(id);
}
@@ -105,45 +105,32 @@ on_rcpt(uint64_t id, struct mailaddr *rcpt)
struct session *rs = filter_api_get_udata(id);
log_debug("debug: on_rcpt");
- rs->tx.rcpt = xstrdup("gilles+rcpt@poolp.org", "on_rcpt");
+ rs->rcpt = xstrdup("gilles+rcpt@poolp.org", "on_rcpt");
return filter_api_accept(id);
}
static int
on_data(uint64_t id)
{
- log_debug("debug: on_data");
- return filter_api_accept(id);
-}
-
-static void
-on_dataline(uint64_t id, const char *line)
-{
- struct session *rs = filter_api_get_udata(id);
-
- if (!strlen(line))
- rs->tx.inbody = 1;
-
- if (! rs->tx.inbody)
- iobuf_xfqueue(&rs->tx.headers, "io", "%s\r\n", line);
- else
- iobuf_xfqueue(&rs->tx.body, "io", "%s\r\n", line);
-
- log_debug("%.*s [%s]", iobuf_len(&rs->tx.headers), iobuf_data(&rs->tx.headers), line);
-}
-
-static int
-on_eom(uint64_t id, size_t size)
-{
- struct session *rs = filter_api_get_udata(id);
+ struct session *rs = filter_api_get_udata(id);
+ char pathname[1024] = "/tmp/filter-rspamd.XXXXXX";
+ int fd;
iobuf_xinit(&rs->iobuf, LINE_MAX, LINE_MAX, "on_eom");
io_init(&rs->io, -1, rs, rspamd_io, &rs->iobuf);
if (io_connect(&rs->io, (struct sockaddr *)&ss, NULL) == -1)
return filter_api_accept(id);
-// iobuf_xfqueue(&rs->iobuf, "io",
- log_debug(
+ fd = mkstemp(pathname);
+ if (fd == -1) {
+ }
+ unlink(pathname);
+ rs->fp = fdopen(fd, "w+b");
+ if (rs->fp == NULL) {
+ close(fd);
+ }
+
+ iobuf_xfqueue(&rs->iobuf, "io",
"POST /check HTTP/1.0\r\n"
"IP: %s\r\n"
"Helo: %s\r\n"
@@ -151,27 +138,52 @@ on_eom(uint64_t id, size_t size)
"From: %s\r\n"
"Rcpt: %s\r\n"
"Pass: all\r\n"
- "Content-Length: %d\r\n\r\n%.*s%.*s",
+ "Transfer-Encoding: chunked\r\n\r\n",
rs->ip,
rs->helo,
rs->hostname,
- rs->tx.from,
- rs->tx.rcpt,
- iobuf_len(&rs->tx.headers) + iobuf_len(&rs->tx.body),
- iobuf_len(&rs->tx.headers), iobuf_data(&rs->tx.headers),
- iobuf_len(&rs->tx.body), iobuf_data(&rs->tx.body));
+ rs->from,
+ rs->rcpt);
+
+ io_reload(&rs->io);
+ return filter_api_accept(id);
+}
+
+static void
+on_dataline(uint64_t id, const char *line)
+{
+ struct session *rs = filter_api_get_udata(id);
+
+ fprintf(rs->fp, "%s\n", line);
+ iobuf_xfqueue(&rs->iobuf, "io", "%x\r\n%s\r\n\r\n",
+ strlen(line)+2, line);
+ io_reload(&rs->io);
+}
+
+static int
+on_eom(uint64_t id, size_t size)
+{
+ struct session *rs = filter_api_get_udata(id);
+
+ iobuf_xfqueue(&rs->iobuf, "io", "0\r\n\r\n");
+ rs->eom = 1;
+ io_reload(&rs->io);
}
static void
on_commit(uint64_t id)
{
- log_debug("debug: on_commit");
+ struct session *rs = filter_api_get_udata(id);
+
+ rspamd_session_reset(rs);
}
static void
on_rollback(uint64_t id)
{
- log_debug("debug: on_rollback");
+ struct session *rs = filter_api_get_udata(id);
+
+ rspamd_session_reset(rs);
}
static void
@@ -188,17 +200,21 @@ rspamd_session_init(uint64_t id)
rs = xcalloc(1, sizeof *rs, "on_connect");
rs->id = id;
- iobuf_xinit(&rs->tx.headers, LINE_MAX, LINE_MAX, "on_eom");
- iobuf_xinit(&rs->tx.body, LINE_MAX, LINE_MAX, "on_eom");
-
return rs;
}
-static void
-rspamd_transaction_clear(struct transaction *tx)
+static struct session *
+rspamd_session_reset(uint64_t id)
{
- free(tx->from);
- free(tx->rcpt);
+ free(rs->from);
+ free(rs->rcpt);
+
+ if (rs->fp) {
+ fclose(rs->fp);
+ rs->fp = NULL;
+ }
+ rs->eom = 0;
+ rs->reply = 0;
}
static void
@@ -207,7 +223,7 @@ rspamd_session_free(struct session *rs)
iobuf_clear(&rs->iobuf);
io_clear(&rs->io);
- rspamd_transaction_clear(&rs->tx);
+ rspamd_session_reset(rs);
free(rs->ip);
free(rs->hostname);
@@ -216,34 +232,69 @@ rspamd_session_free(struct session *rs)
}
static void
+rspamd_response(struct session *rs)
+{
+ char *line;
+
+ while ((line = iobuf_getline(&rs->iobuf, NULL)))
+ log_debug("debug: DATAIN: [%s]", line);
+ if (iobuf_len(&rs->iobuf) != 0) {
+ log_debug("debug: DATAIN: [%.*s]",
+ (int)iobuf_len(&rs->iobuf),
+ iobuf_data(&rs->iobuf));
+
+ }
+ iobuf_normalize(&rs->iobuf);
+
+ rs->reply = 1;
+}
+
+static void
+rspamd_streamback(struct session *rs)
+{
+ char *line;
+ size_t sz;
+ ssize_t len;
+
+ log_debug("debug: STREAM BACK");
+ fseek(rs->fp, 0, 0);
+ line = NULL;
+ while ((len = getline(&line, &sz, rs->fp)) != -1) {
+ line[len-1] = 0;
+ log_debug("debug: STREAM BACK: [%s]", line);
+ filter_api_writeln(rs->id, line);
+ }
+ filter_api_accept(rs->id);
+}
+
+static void
rspamd_io(struct io *io, int evt)
{
- struct session *r = io->arg;
- char *l;
+ struct session *rs = io->arg;
switch (evt) {
case IO_CONNECTED:
- log_debug("debug: CONNECTED");
io_set_write(io);
break;
+
case IO_LOWAT:
- log_debug("debug: LOWAT");
- io_set_read(io);
+ if (rs->eom)
+ io_set_read(io);
break;
+
case IO_DATAIN:
log_debug("debug: DATAIN");
- while ((l = iobuf_getline(&r->iobuf, NULL)) != NULL)
- log_debug("debug: DATAIN: [%s]", l);
- if (iobuf_len(&r->iobuf) != 0) {
- log_debug("debug: DATAIN: [%.*s]",
- iobuf_len(&r->iobuf),
- iobuf_data(&r->iobuf));
+ rspamd_response(rs);
+ if (rs->reply) {
+ log_debug("debug: REPLY DONE");
+ io_set_write(io);
+ rspamd_streamback(rs);
}
- iobuf_normalize(&r->iobuf);
break;
+
case IO_DISCONNECTED:
log_debug("debug: DISCONNECT");
- exit(1);
+ rspamd_session_free(rs);
break;
case IO_TIMEOUT:
log_debug("debug: TIMEOUT");
@@ -356,10 +407,13 @@ main(int argc, char **argv)
filter_api_on_rollback(on_rollback);
filter_api_on_disconnect(on_disconnect);
+ /*
if (c)
filter_api_set_chroot(c);
if (C)
filter_api_no_chroot();
+ */
+ filter_api_no_chroot();
filter_api_loop();
log_debug("debug: exiting");