diff options
author | Joerg Jung <mail@umaxx.net> | 2016-06-19 00:42:33 +0200 |
---|---|---|
committer | Joerg Jung <mail@umaxx.net> | 2016-06-19 00:42:33 +0200 |
commit | c40687b76532d6be00fceb2a27d56542ae5ca3cf (patch) | |
tree | 405bc06cc7d0c19b5c816c47b01dbf7dc4028fa9 | |
parent | sync with smtpd version (diff) | |
download | OpenSMTPD-extras-c40687b76532d6be00fceb2a27d56542ae5ca3cf.tar.xz OpenSMTPD-extras-c40687b76532d6be00fceb2a27d56542ae5ca3cf.zip |
convert filter spamassassin to non-blocking
-rw-r--r-- | extras/wip/filters/filter-spamassassin/filter_spamassassin.c | 301 |
1 files changed, 147 insertions, 154 deletions
diff --git a/extras/wip/filters/filter-spamassassin/filter_spamassassin.c b/extras/wip/filters/filter-spamassassin/filter_spamassassin.c index c76bf09..d7435e7 100644 --- a/extras/wip/filters/filter-spamassassin/filter_spamassassin.c +++ b/extras/wip/filters/filter-spamassassin/filter_spamassassin.c @@ -32,94 +32,51 @@ #include "smtpd-api.h" #include "log.h" #include "iobuf.h" +#include "ioev.h" + +#define SPAMASSASSIN_HOST "127.0.0.1" +#define SPAMASSASSIN_PORT "783" struct spamassassin { - int fd, r; + uint64_t id; struct iobuf iobuf; + struct io io; size_t l; + int r; + enum { SA_DATA, SA_EOM, SA_STA, SA_HDR, SA_MSG } s; }; -static const char *spamassassin_host = "127.0.0.1", *spamassassin_port = "783"; +static struct sockaddr_storage spamassassin_ss; static size_t spamassassin_limit; static enum { SPAMASSASSIN_ACCEPT, SPAMASSASSIN_REJECT } spamassassin_strategy; static void -spamassassin_init(struct spamassassin *sa) -{ - sa->fd = sa->r = -1; - iobuf_xinit(&sa->iobuf, LINE_MAX, LINE_MAX, "init"); -} - -static int -spamassassin_open(struct spamassassin *sa) +spamassassin_resolve(const char *h, const char *p) { struct addrinfo hints, *addresses, *ai; - int r; + int fd, r; memset(&hints, 0, sizeof(hints)); hints.ai_family = PF_UNSPEC; hints.ai_socktype = SOCK_STREAM; hints.ai_protocol = IPPROTO_TCP; - hints.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV; /* avoid failing name resolution in chroot() */ - if ((r = getaddrinfo(spamassassin_host, spamassassin_port, &hints, &addresses))) { - log_warnx("warn: open: getaddrinfo %s", gai_strerror(r)); - return -1; - } + if ((r = getaddrinfo(h, p, &hints, &addresses))) + fatalx("resolve: getaddrinfo %s", gai_strerror(r)); for (ai = addresses; ai; ai = ai->ai_next) { - if ((sa->fd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1) + if ((fd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1) continue; - if (connect(sa->fd, ai->ai_addr, ai->ai_addrlen) == -1) { - close(sa->fd); - sa->fd = -1; + if (connect(fd, ai->ai_addr, ai->ai_addrlen) == -1) { + close(fd); continue; } - break; /* connected */ + write(fd, "PING SPAMC/1.5\r\n", 16); /* avoid warning in log */ + close(fd); + memmove(&spamassassin_ss, ai->ai_addr, ai->ai_addrlen); + break; } freeaddrinfo(addresses); - if (!ai) { - log_warnx("warn: open: failed"); - return -1; - } - return 0; -} - -static int -spamassassin_write(struct spamassassin *sa, const char *l) { - size_t len = strlen(l) + 1; - - if (iobuf_fqueue(&sa->iobuf, "%s\n", l) != (int)len) { - log_warn("warn: write: iobuf_fqueue"); - return -1; - } - if (iobuf_flush(&sa->iobuf, sa->fd) < 0) { - log_warn("warn: write: iobuf_flush"); - return -1; - } - return 0; -} - -static int -spamassassin_request(struct spamassassin *sa) { - return spamassassin_write(sa, "PROCESS SPAMC/1.5\r\n\r"); /* spamd.raw source: content length header is optional */ -} - -static int -spamassassin_read(struct spamassassin *sa, char **l) { - int r; - - while ((*l = iobuf_getline(&sa->iobuf, NULL)) == NULL) { - if (iobuf_len(&sa->iobuf) >= LINE_MAX) { - log_warnx("warn: read: iobuf_getline"); - return -1; - } - iobuf_normalize(&sa->iobuf); - if ((r = iobuf_read(&sa->iobuf, sa->fd)) < 0) { - if (r != IOBUF_CLOSED) - log_warn("warn: read: iobuf_read r=%d", r); - return r; - } - } - return 0; + if (!ai) + fatalx("resolve: failed"); } #define SPAMASSASSIN_EXPAND(tok) #tok @@ -127,7 +84,8 @@ spamassassin_read(struct spamassassin *sa, char **l) { #define SPAMASSASSIN_EX_MAX 16 /* longest spamd response e.g. strlen("EX_UNAVAILABLE") */ static int -spamassassin_status(struct spamassassin *sa, const char *l) { +spamassassin_status(struct spamassassin *sa, const char *l) +{ char s[SPAMASSASSIN_EX_MAX + 1]; int r; @@ -139,86 +97,66 @@ spamassassin_status(struct spamassassin *sa, const char *l) { log_warnx("warn: status: r=%d, s=%s", r, s); return -1; } - return 0; + return sa->s++; } static int -spamassassin_result(struct spamassassin *sa, const char *l) { +spamassassin_header(struct spamassassin *sa, const char *l) +{ char s[SPAMASSASSIN_EX_MAX + 1]; - if (sscanf(l, "Spam: %"SPAMASSASSIN_QUOTE(SPAMASSASSIN_EX_MAX)"s ; %*f / %*f", s) != 1) { - (errno ? log_warn : log_warnx)("warn: result: sscanf"); - return -1; + if (strlen(l) == 0) + return sa->s++; /* end of spamd response headers */ + if (strncmp(l, "Spam: ", 6) == 0) { + if (sscanf(l, "Spam: %"SPAMASSASSIN_QUOTE(SPAMASSASSIN_EX_MAX)"s ; %*f / %*f", s) != 1) { + (errno ? log_warn : log_warnx)("warn: result: sscanf"); + return -1; + } + log_info("info: result: %s", l); + sa->r = (strcmp(s, "True") == 0); } - log_info("info: result: %s", l); - sa->r = (strcmp(s, "True") == 0); return 0; } static int -spamassassin_header(struct spamassassin *sa) { - char *l = NULL; - - if (spamassassin_read(sa, &l) != 0) - return -1; - if (spamassassin_status(sa, l) == -1) - return -1; - while (1) { - if (spamassassin_read(sa, &l) != 0) +spamassassin_response(struct spamassassin *sa, const char *l) +{ + switch (sa->s) { + case SA_STA: + if (spamassassin_status(sa, l) == -1) return -1; - if (strncmp(l, "Spam: ", 6) == 0 && - spamassassin_result(sa, l) == -1) + break; + case SA_HDR: + if (spamassassin_header(sa, l) == -1) return -1; - if (strlen(l) == 0) - break; /* end of spamd response headers */ - } - if (sa->r == -1) { - log_warnx("warn: header: result failed"); - return -1; + break; + case SA_MSG: + filter_api_writeln(sa->id, l); + break; + default: + fatalx("response: bad state"); } return 0; } static int -spamassassin_message(struct spamassassin *sa, uint64_t id) { - char *l = NULL; - int r; - - while (1) { - if ((r = spamassassin_read(sa, &l)) != 0) { - if (r == IOBUF_CLOSED) - break; - return -1; - } - if (l) - filter_api_writeln(id, l); - } - if (iobuf_len(&sa->iobuf)) { - log_warnx("warn: message: incomplete"); +spamassassin_result(struct spamassassin *sa) +{ + if (sa->r == INT_MIN) { + log_warnx("warn: result: failed"); return -1; } - return 0; -} - -static int -spamassassin_response(struct spamassassin *sa, uint64_t id) { - if (shutdown(sa->fd, SHUT_WR) == -1) { - log_warn("warn: response: shutdown"); - return -1; + if (sa->r) { + if (spamassassin_strategy == SPAMASSASSIN_ACCEPT) { + log_warnx("warn: session %016"PRIx64": result: ACCEPT spam", sa->id); + filter_api_accept(sa->id); + } + if (spamassassin_strategy == SPAMASSASSIN_REJECT) { + log_warnx("warn: session %016"PRIx64": result: REJECT spam", sa->id); + filter_api_reject_code(sa->id, FILTER_CLOSE, 554, "5.7.1 Message considered spam"); + } } - if (spamassassin_header(sa) == -1) - return -1; - if (spamassassin_message(sa, id) == -1) - return -1; - return 0; -} - -static void -spamassassin_close(struct spamassassin *sa) -{ - if (sa->fd >= 0) - close(sa->fd); - sa->fd = -1; + return filter_api_accept(sa->id); } static void @@ -226,22 +164,86 @@ spamassassin_clear(struct spamassassin *sa) { if (sa == NULL) return; + io_clear(&sa->io); iobuf_clear(&sa->iobuf); - spamassassin_close(sa); free(sa); } +static void +spamassassin_io(struct io *io, int evt) +{ + struct spamassassin *sa = io->arg; + char *l; + + switch (evt) { + case IO_CONNECTED: + io_set_write(io); + iobuf_xfqueue(&sa->iobuf, "io", "PROCESS SPAMC/1.5\r\n\r\n"); /* spamd.raw source: content length header is optional */ + break; + case IO_LOWAT: + if (sa->s == SA_EOM) { + if (shutdown(sa->io.sock, SHUT_WR) == -1) { + log_warn("warn: io: shutdown"); + goto fail; + } + sa->s++; + io_set_read(io); + } + break; + case IO_DATAIN: + while ((l = iobuf_getline(&sa->iobuf, NULL))) { + if (iobuf_len(&sa->iobuf) >= LINE_MAX) { + log_warnx("warn: io: iobuf_getline"); + goto fail; + } + if (spamassassin_response(sa, l) == -1) + goto fail; + } + iobuf_normalize(&sa->iobuf); + break; + case IO_DISCONNECTED: + if (sa->s == SA_MSG) { + if (iobuf_len(&sa->iobuf)) { + log_warnx("warn: io: incomplete"); + goto fail; + } + if (spamassassin_result(sa) == -1) + goto fail; + io_clear(io); + break; + } + /* FALLTHROUGH */ + case IO_TIMEOUT: + case IO_ERROR: + log_warnx("warn: io: %s %s", io_strevent(evt), sa->io.error); + goto fail; + default: + fatalx("io: bad event"); + } + return; +fail: + filter_api_reject_code(sa->id, FILTER_FAIL, 451, "4.7.1 Spam filter failed"); + filter_api_set_udata(sa->id, NULL); + spamassassin_clear(sa); +} + +static void +spamassassin_init(struct spamassassin *sa, uint64_t id) +{ + iobuf_xinit(&sa->iobuf, LINE_MAX, LINE_MAX, "init"); + io_init(&sa->io, -1, sa, spamassassin_io, &sa->iobuf); + sa->id = id; + sa->r = INT_MIN; +} + static int spamassassin_on_data(uint64_t id) { struct spamassassin *sa; - spamassassin_init((sa = xcalloc(1, sizeof(struct spamassassin), "on_data"))); - if (spamassassin_open(sa) == -1) { - spamassassin_clear(sa); - return filter_api_accept(id); - } - if (spamassassin_request(sa) == -1) { + spamassassin_init((sa = xcalloc(1, sizeof(struct spamassassin), "on_data")), id); + if (io_connect(&sa->io, (struct sockaddr *)&spamassassin_ss, NULL) == -1) { + log_warnx("warn: on_data: io_connect %s", sa->io.error); spamassassin_clear(sa); return filter_api_accept(id); } @@ -261,14 +263,16 @@ spamassassin_on_dataline(uint64_t id, const char *l) sa->l += strlen(l); if (spamassassin_limit && sa->l >= spamassassin_limit) { log_info("info: dataline: limit reached"); +#if 0 /* todo */ spamassassin_response(sa, id); filter_api_writeln(id, l); spamassassin_clear(sa); filter_api_set_udata(id, NULL); return; +#endif } - if (sa->fd >= 0 && spamassassin_write(sa, l) == -1) - spamassassin_close(sa); + iobuf_xfqueue(&sa->iobuf, "on_dataline", "%s\n", l); + io_reload(&sa->io); } static int @@ -278,22 +282,10 @@ spamassassin_on_eom(uint64_t id, size_t size) if ((sa = filter_api_get_udata(id)) == NULL) return filter_api_accept(id); - if (spamassassin_response(sa, id) == -1) { - spamassassin_clear(sa); - filter_api_set_udata(id, NULL); - return filter_api_reject_code(id, FILTER_FAIL, 451, "4.7.1 Spam filter failed"); - } - if (sa->r) { - if (spamassassin_strategy == SPAMASSASSIN_ACCEPT) { - log_warnx("warn: session %016"PRIx64": on_eom: ACCEPT spam", id); - return filter_api_accept(id); - } - if (spamassassin_strategy == SPAMASSASSIN_REJECT) { - log_warnx("warn: session %016"PRIx64": on_eom: REJECT spam", id); - return filter_api_reject_code(id, FILTER_CLOSE, 554, "5.7.1 Message considered spam"); - } - } - return filter_api_accept(id); + sa->s++; + if (iobuf_queued(&sa->iobuf) == 0) + spamassassin_io(&sa->io, IO_LOWAT); + return 1; /* defer accept or reject */ } static void @@ -315,7 +307,7 @@ main(int argc, char **argv) { int ch, C = 0, d = 0, v = 0; const char *errstr, *l = NULL; - char *c = NULL, *h = NULL, *p = NULL, *s = NULL; + char *c = NULL, *h = SPAMASSASSIN_HOST, *p = SPAMASSASSIN_PORT, *s = NULL; log_init(1); @@ -357,9 +349,9 @@ main(int argc, char **argv) if (c) c = strip(c); if (h) - spamassassin_host = strip(h); + h = strip(h); if (p) - spamassassin_port = strip(p); + p = strip(p); if (l) { spamassassin_limit = strtonum(l, 1, SIZE_MAX, &errstr); if (errstr) @@ -379,6 +371,7 @@ main(int argc, char **argv) log_verbose(v); log_debug("debug: starting..."); + spamassassin_resolve(h, p); filter_api_on_data(spamassassin_on_data); filter_api_on_dataline(spamassassin_on_dataline); |