aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJoerg Jung <mail@umaxx.net>2016-06-19 00:42:33 +0200
committerJoerg Jung <mail@umaxx.net>2016-06-19 00:42:33 +0200
commitc40687b76532d6be00fceb2a27d56542ae5ca3cf (patch)
tree405bc06cc7d0c19b5c816c47b01dbf7dc4028fa9
parentsync with smtpd version (diff)
downloadOpenSMTPD-extras-c40687b76532d6be00fceb2a27d56542ae5ca3cf.tar.xz
OpenSMTPD-extras-c40687b76532d6be00fceb2a27d56542ae5ca3cf.zip
convert filter spamassassin to non-blocking
-rw-r--r--extras/wip/filters/filter-spamassassin/filter_spamassassin.c301
1 files changed, 147 insertions, 154 deletions
diff --git a/extras/wip/filters/filter-spamassassin/filter_spamassassin.c b/extras/wip/filters/filter-spamassassin/filter_spamassassin.c
index c76bf09..d7435e7 100644
--- a/extras/wip/filters/filter-spamassassin/filter_spamassassin.c
+++ b/extras/wip/filters/filter-spamassassin/filter_spamassassin.c
@@ -32,94 +32,51 @@
#include "smtpd-api.h"
#include "log.h"
#include "iobuf.h"
+#include "ioev.h"
+
+#define SPAMASSASSIN_HOST "127.0.0.1"
+#define SPAMASSASSIN_PORT "783"
struct spamassassin {
- int fd, r;
+ uint64_t id;
struct iobuf iobuf;
+ struct io io;
size_t l;
+ int r;
+ enum { SA_DATA, SA_EOM, SA_STA, SA_HDR, SA_MSG } s;
};
-static const char *spamassassin_host = "127.0.0.1", *spamassassin_port = "783";
+static struct sockaddr_storage spamassassin_ss;
static size_t spamassassin_limit;
static enum { SPAMASSASSIN_ACCEPT, SPAMASSASSIN_REJECT } spamassassin_strategy;
static void
-spamassassin_init(struct spamassassin *sa)
-{
- sa->fd = sa->r = -1;
- iobuf_xinit(&sa->iobuf, LINE_MAX, LINE_MAX, "init");
-}
-
-static int
-spamassassin_open(struct spamassassin *sa)
+spamassassin_resolve(const char *h, const char *p)
{
struct addrinfo hints, *addresses, *ai;
- int r;
+ int fd, r;
memset(&hints, 0, sizeof(hints));
hints.ai_family = PF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
- hints.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV; /* avoid failing name resolution in chroot() */
- if ((r = getaddrinfo(spamassassin_host, spamassassin_port, &hints, &addresses))) {
- log_warnx("warn: open: getaddrinfo %s", gai_strerror(r));
- return -1;
- }
+ if ((r = getaddrinfo(h, p, &hints, &addresses)))
+ fatalx("resolve: getaddrinfo %s", gai_strerror(r));
for (ai = addresses; ai; ai = ai->ai_next) {
- if ((sa->fd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1)
+ if ((fd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1)
continue;
- if (connect(sa->fd, ai->ai_addr, ai->ai_addrlen) == -1) {
- close(sa->fd);
- sa->fd = -1;
+ if (connect(fd, ai->ai_addr, ai->ai_addrlen) == -1) {
+ close(fd);
continue;
}
- break; /* connected */
+ write(fd, "PING SPAMC/1.5\r\n", 16); /* avoid warning in log */
+ close(fd);
+ memmove(&spamassassin_ss, ai->ai_addr, ai->ai_addrlen);
+ break;
}
freeaddrinfo(addresses);
- if (!ai) {
- log_warnx("warn: open: failed");
- return -1;
- }
- return 0;
-}
-
-static int
-spamassassin_write(struct spamassassin *sa, const char *l) {
- size_t len = strlen(l) + 1;
-
- if (iobuf_fqueue(&sa->iobuf, "%s\n", l) != (int)len) {
- log_warn("warn: write: iobuf_fqueue");
- return -1;
- }
- if (iobuf_flush(&sa->iobuf, sa->fd) < 0) {
- log_warn("warn: write: iobuf_flush");
- return -1;
- }
- return 0;
-}
-
-static int
-spamassassin_request(struct spamassassin *sa) {
- return spamassassin_write(sa, "PROCESS SPAMC/1.5\r\n\r"); /* spamd.raw source: content length header is optional */
-}
-
-static int
-spamassassin_read(struct spamassassin *sa, char **l) {
- int r;
-
- while ((*l = iobuf_getline(&sa->iobuf, NULL)) == NULL) {
- if (iobuf_len(&sa->iobuf) >= LINE_MAX) {
- log_warnx("warn: read: iobuf_getline");
- return -1;
- }
- iobuf_normalize(&sa->iobuf);
- if ((r = iobuf_read(&sa->iobuf, sa->fd)) < 0) {
- if (r != IOBUF_CLOSED)
- log_warn("warn: read: iobuf_read r=%d", r);
- return r;
- }
- }
- return 0;
+ if (!ai)
+ fatalx("resolve: failed");
}
#define SPAMASSASSIN_EXPAND(tok) #tok
@@ -127,7 +84,8 @@ spamassassin_read(struct spamassassin *sa, char **l) {
#define SPAMASSASSIN_EX_MAX 16 /* longest spamd response e.g. strlen("EX_UNAVAILABLE") */
static int
-spamassassin_status(struct spamassassin *sa, const char *l) {
+spamassassin_status(struct spamassassin *sa, const char *l)
+{
char s[SPAMASSASSIN_EX_MAX + 1];
int r;
@@ -139,86 +97,66 @@ spamassassin_status(struct spamassassin *sa, const char *l) {
log_warnx("warn: status: r=%d, s=%s", r, s);
return -1;
}
- return 0;
+ return sa->s++;
}
static int
-spamassassin_result(struct spamassassin *sa, const char *l) {
+spamassassin_header(struct spamassassin *sa, const char *l)
+{
char s[SPAMASSASSIN_EX_MAX + 1];
- if (sscanf(l, "Spam: %"SPAMASSASSIN_QUOTE(SPAMASSASSIN_EX_MAX)"s ; %*f / %*f", s) != 1) {
- (errno ? log_warn : log_warnx)("warn: result: sscanf");
- return -1;
+ if (strlen(l) == 0)
+ return sa->s++; /* end of spamd response headers */
+ if (strncmp(l, "Spam: ", 6) == 0) {
+ if (sscanf(l, "Spam: %"SPAMASSASSIN_QUOTE(SPAMASSASSIN_EX_MAX)"s ; %*f / %*f", s) != 1) {
+ (errno ? log_warn : log_warnx)("warn: result: sscanf");
+ return -1;
+ }
+ log_info("info: result: %s", l);
+ sa->r = (strcmp(s, "True") == 0);
}
- log_info("info: result: %s", l);
- sa->r = (strcmp(s, "True") == 0);
return 0;
}
static int
-spamassassin_header(struct spamassassin *sa) {
- char *l = NULL;
-
- if (spamassassin_read(sa, &l) != 0)
- return -1;
- if (spamassassin_status(sa, l) == -1)
- return -1;
- while (1) {
- if (spamassassin_read(sa, &l) != 0)
+spamassassin_response(struct spamassassin *sa, const char *l)
+{
+ switch (sa->s) {
+ case SA_STA:
+ if (spamassassin_status(sa, l) == -1)
return -1;
- if (strncmp(l, "Spam: ", 6) == 0 &&
- spamassassin_result(sa, l) == -1)
+ break;
+ case SA_HDR:
+ if (spamassassin_header(sa, l) == -1)
return -1;
- if (strlen(l) == 0)
- break; /* end of spamd response headers */
- }
- if (sa->r == -1) {
- log_warnx("warn: header: result failed");
- return -1;
+ break;
+ case SA_MSG:
+ filter_api_writeln(sa->id, l);
+ break;
+ default:
+ fatalx("response: bad state");
}
return 0;
}
static int
-spamassassin_message(struct spamassassin *sa, uint64_t id) {
- char *l = NULL;
- int r;
-
- while (1) {
- if ((r = spamassassin_read(sa, &l)) != 0) {
- if (r == IOBUF_CLOSED)
- break;
- return -1;
- }
- if (l)
- filter_api_writeln(id, l);
- }
- if (iobuf_len(&sa->iobuf)) {
- log_warnx("warn: message: incomplete");
+spamassassin_result(struct spamassassin *sa)
+{
+ if (sa->r == INT_MIN) {
+ log_warnx("warn: result: failed");
return -1;
}
- return 0;
-}
-
-static int
-spamassassin_response(struct spamassassin *sa, uint64_t id) {
- if (shutdown(sa->fd, SHUT_WR) == -1) {
- log_warn("warn: response: shutdown");
- return -1;
+ if (sa->r) {
+ if (spamassassin_strategy == SPAMASSASSIN_ACCEPT) {
+ log_warnx("warn: session %016"PRIx64": result: ACCEPT spam", sa->id);
+ filter_api_accept(sa->id);
+ }
+ if (spamassassin_strategy == SPAMASSASSIN_REJECT) {
+ log_warnx("warn: session %016"PRIx64": result: REJECT spam", sa->id);
+ filter_api_reject_code(sa->id, FILTER_CLOSE, 554, "5.7.1 Message considered spam");
+ }
}
- if (spamassassin_header(sa) == -1)
- return -1;
- if (spamassassin_message(sa, id) == -1)
- return -1;
- return 0;
-}
-
-static void
-spamassassin_close(struct spamassassin *sa)
-{
- if (sa->fd >= 0)
- close(sa->fd);
- sa->fd = -1;
+ return filter_api_accept(sa->id);
}
static void
@@ -226,22 +164,86 @@ spamassassin_clear(struct spamassassin *sa)
{
if (sa == NULL)
return;
+ io_clear(&sa->io);
iobuf_clear(&sa->iobuf);
- spamassassin_close(sa);
free(sa);
}
+static void
+spamassassin_io(struct io *io, int evt)
+{
+ struct spamassassin *sa = io->arg;
+ char *l;
+
+ switch (evt) {
+ case IO_CONNECTED:
+ io_set_write(io);
+ iobuf_xfqueue(&sa->iobuf, "io", "PROCESS SPAMC/1.5\r\n\r\n"); /* spamd.raw source: content length header is optional */
+ break;
+ case IO_LOWAT:
+ if (sa->s == SA_EOM) {
+ if (shutdown(sa->io.sock, SHUT_WR) == -1) {
+ log_warn("warn: io: shutdown");
+ goto fail;
+ }
+ sa->s++;
+ io_set_read(io);
+ }
+ break;
+ case IO_DATAIN:
+ while ((l = iobuf_getline(&sa->iobuf, NULL))) {
+ if (iobuf_len(&sa->iobuf) >= LINE_MAX) {
+ log_warnx("warn: io: iobuf_getline");
+ goto fail;
+ }
+ if (spamassassin_response(sa, l) == -1)
+ goto fail;
+ }
+ iobuf_normalize(&sa->iobuf);
+ break;
+ case IO_DISCONNECTED:
+ if (sa->s == SA_MSG) {
+ if (iobuf_len(&sa->iobuf)) {
+ log_warnx("warn: io: incomplete");
+ goto fail;
+ }
+ if (spamassassin_result(sa) == -1)
+ goto fail;
+ io_clear(io);
+ break;
+ }
+ /* FALLTHROUGH */
+ case IO_TIMEOUT:
+ case IO_ERROR:
+ log_warnx("warn: io: %s %s", io_strevent(evt), sa->io.error);
+ goto fail;
+ default:
+ fatalx("io: bad event");
+ }
+ return;
+fail:
+ filter_api_reject_code(sa->id, FILTER_FAIL, 451, "4.7.1 Spam filter failed");
+ filter_api_set_udata(sa->id, NULL);
+ spamassassin_clear(sa);
+}
+
+static void
+spamassassin_init(struct spamassassin *sa, uint64_t id)
+{
+ iobuf_xinit(&sa->iobuf, LINE_MAX, LINE_MAX, "init");
+ io_init(&sa->io, -1, sa, spamassassin_io, &sa->iobuf);
+ sa->id = id;
+ sa->r = INT_MIN;
+}
+
static int
spamassassin_on_data(uint64_t id)
{
struct spamassassin *sa;
- spamassassin_init((sa = xcalloc(1, sizeof(struct spamassassin), "on_data")));
- if (spamassassin_open(sa) == -1) {
- spamassassin_clear(sa);
- return filter_api_accept(id);
- }
- if (spamassassin_request(sa) == -1) {
+ spamassassin_init((sa = xcalloc(1, sizeof(struct spamassassin), "on_data")), id);
+ if (io_connect(&sa->io, (struct sockaddr *)&spamassassin_ss, NULL) == -1) {
+ log_warnx("warn: on_data: io_connect %s", sa->io.error);
spamassassin_clear(sa);
return filter_api_accept(id);
}
@@ -261,14 +263,16 @@ spamassassin_on_dataline(uint64_t id, const char *l)
sa->l += strlen(l);
if (spamassassin_limit && sa->l >= spamassassin_limit) {
log_info("info: dataline: limit reached");
+#if 0 /* todo */
spamassassin_response(sa, id);
filter_api_writeln(id, l);
spamassassin_clear(sa);
filter_api_set_udata(id, NULL);
return;
+#endif
}
- if (sa->fd >= 0 && spamassassin_write(sa, l) == -1)
- spamassassin_close(sa);
+ iobuf_xfqueue(&sa->iobuf, "on_dataline", "%s\n", l);
+ io_reload(&sa->io);
}
static int
@@ -278,22 +282,10 @@ spamassassin_on_eom(uint64_t id, size_t size)
if ((sa = filter_api_get_udata(id)) == NULL)
return filter_api_accept(id);
- if (spamassassin_response(sa, id) == -1) {
- spamassassin_clear(sa);
- filter_api_set_udata(id, NULL);
- return filter_api_reject_code(id, FILTER_FAIL, 451, "4.7.1 Spam filter failed");
- }
- if (sa->r) {
- if (spamassassin_strategy == SPAMASSASSIN_ACCEPT) {
- log_warnx("warn: session %016"PRIx64": on_eom: ACCEPT spam", id);
- return filter_api_accept(id);
- }
- if (spamassassin_strategy == SPAMASSASSIN_REJECT) {
- log_warnx("warn: session %016"PRIx64": on_eom: REJECT spam", id);
- return filter_api_reject_code(id, FILTER_CLOSE, 554, "5.7.1 Message considered spam");
- }
- }
- return filter_api_accept(id);
+ sa->s++;
+ if (iobuf_queued(&sa->iobuf) == 0)
+ spamassassin_io(&sa->io, IO_LOWAT);
+ return 1; /* defer accept or reject */
}
static void
@@ -315,7 +307,7 @@ main(int argc, char **argv)
{
int ch, C = 0, d = 0, v = 0;
const char *errstr, *l = NULL;
- char *c = NULL, *h = NULL, *p = NULL, *s = NULL;
+ char *c = NULL, *h = SPAMASSASSIN_HOST, *p = SPAMASSASSIN_PORT, *s = NULL;
log_init(1);
@@ -357,9 +349,9 @@ main(int argc, char **argv)
if (c)
c = strip(c);
if (h)
- spamassassin_host = strip(h);
+ h = strip(h);
if (p)
- spamassassin_port = strip(p);
+ p = strip(p);
if (l) {
spamassassin_limit = strtonum(l, 1, SIZE_MAX, &errstr);
if (errstr)
@@ -379,6 +371,7 @@ main(int argc, char **argv)
log_verbose(v);
log_debug("debug: starting...");
+ spamassassin_resolve(h, p);
filter_api_on_data(spamassassin_on_data);
filter_api_on_dataline(spamassassin_on_dataline);