aboutsummaryrefslogtreecommitdiffstats
path: root/smtpd/mproc.c
diff options
context:
space:
mode:
Diffstat (limited to 'smtpd/mproc.c')
-rw-r--r--smtpd/mproc.c676
1 files changed, 676 insertions, 0 deletions
diff --git a/smtpd/mproc.c b/smtpd/mproc.c
new file mode 100644
index 00000000..bde229e1
--- /dev/null
+++ b/smtpd/mproc.c
@@ -0,0 +1,676 @@
+/* $OpenBSD: mproc.c,v 1.36 2020/03/17 09:01:53 tobhe Exp $ */
+
+/*
+ * Copyright (c) 2012 Eric Faurot <eric@faurot.net>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include "includes.h"
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/tree.h>
+#include <sys/queue.h>
+#include <sys/uio.h>
+
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <arpa/nameser.h>
+
+#include <err.h>
+#include <errno.h>
+#include <event.h>
+#include <imsg.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <limits.h>
+#include <string.h>
+#include <unistd.h>
+
+#include "smtpd.h"
+#include "log.h"
+
+static void mproc_dispatch(int, short, void *);
+
+static ssize_t imsg_read_nofd(struct imsgbuf *);
+
+int
+mproc_fork(struct mproc *p, const char *path, char *argv[])
+{
+ int sp[2];
+
+ if (socketpair(AF_UNIX, SOCK_STREAM, PF_UNSPEC, sp) == -1)
+ return (-1);
+
+ io_set_nonblocking(sp[0]);
+ io_set_nonblocking(sp[1]);
+
+ if ((p->pid = fork()) == -1)
+ goto err;
+
+ if (p->pid == 0) {
+ /* child process */
+ dup2(sp[0], STDIN_FILENO);
+ closefrom(STDERR_FILENO + 1);
+ execv(path, argv);
+ err(1, "execv: %s", path);
+ }
+
+ /* parent process */
+ close(sp[0]);
+ mproc_init(p, sp[1]);
+ return (0);
+
+err:
+ log_warn("warn: Failed to start process %s, instance of %s", argv[0], path);
+ close(sp[0]);
+ close(sp[1]);
+ return (-1);
+}
+
+void
+mproc_init(struct mproc *p, int fd)
+{
+ imsg_init(&p->imsgbuf, fd);
+}
+
+void
+mproc_clear(struct mproc *p)
+{
+ log_debug("debug: clearing p=%s, fd=%d, pid=%d", p->name, p->imsgbuf.fd, p->pid);
+
+ event_del(&p->ev);
+ close(p->imsgbuf.fd);
+ imsg_clear(&p->imsgbuf);
+}
+
+void
+mproc_enable(struct mproc *p)
+{
+ if (p->enable == 0) {
+ log_trace(TRACE_MPROC, "mproc: %s -> %s: enabled",
+ proc_name(smtpd_process),
+ proc_name(p->proc));
+ p->enable = 1;
+ }
+ mproc_event_add(p);
+}
+
+void
+mproc_disable(struct mproc *p)
+{
+ if (p->enable == 1) {
+ log_trace(TRACE_MPROC, "mproc: %s -> %s: disabled",
+ proc_name(smtpd_process),
+ proc_name(p->proc));
+ p->enable = 0;
+ }
+ mproc_event_add(p);
+}
+
+void
+mproc_event_add(struct mproc *p)
+{
+ short events;
+
+ if (p->enable)
+ events = EV_READ;
+ else
+ events = 0;
+
+ if (p->imsgbuf.w.queued)
+ events |= EV_WRITE;
+
+ if (p->events)
+ event_del(&p->ev);
+
+ p->events = events;
+ if (events) {
+ event_set(&p->ev, p->imsgbuf.fd, events, mproc_dispatch, p);
+ event_add(&p->ev, NULL);
+ }
+}
+
+static void
+mproc_dispatch(int fd, short event, void *arg)
+{
+ struct mproc *p = arg;
+ struct imsg imsg;
+ ssize_t n;
+
+ p->events = 0;
+
+ if (event & EV_READ) {
+
+ if (p->proc == PROC_CLIENT)
+ n = imsg_read_nofd(&p->imsgbuf);
+ else
+ n = imsg_read(&p->imsgbuf);
+
+ switch (n) {
+ case -1:
+ if (errno == EAGAIN)
+ break;
+ log_warn("warn: %s -> %s: imsg_read",
+ proc_name(smtpd_process), p->name);
+ fatal("exiting");
+ /* NOTREACHED */
+ case 0:
+ /* this pipe is dead, so remove the event handler */
+ log_debug("debug: %s -> %s: pipe closed",
+ proc_name(smtpd_process), p->name);
+ p->handler(p, NULL);
+ return;
+ default:
+ break;
+ }
+ }
+
+ if (event & EV_WRITE) {
+ n = msgbuf_write(&p->imsgbuf.w);
+ if (n == 0 || (n == -1 && errno != EAGAIN)) {
+ /* this pipe is dead, so remove the event handler */
+ log_debug("debug: %s -> %s: pipe closed",
+ proc_name(smtpd_process), p->name);
+ p->handler(p, NULL);
+ return;
+ }
+ }
+
+ for (;;) {
+ if ((n = imsg_get(&p->imsgbuf, &imsg)) == -1) {
+
+ if (smtpd_process == PROC_CONTROL &&
+ p->proc == PROC_CLIENT) {
+ log_warnx("warn: client sent invalid imsg "
+ "over control socket");
+ p->handler(p, NULL);
+ return;
+ }
+ log_warn("fatal: %s: error in imsg_get for %s",
+ proc_name(smtpd_process), p->name);
+ fatalx(NULL);
+ }
+ if (n == 0)
+ break;
+
+ p->handler(p, &imsg);
+
+ imsg_free(&imsg);
+ }
+
+ mproc_event_add(p);
+}
+
+/* This should go into libutil */
+static ssize_t
+imsg_read_nofd(struct imsgbuf *ibuf)
+{
+ ssize_t n;
+ char *buf;
+ size_t len;
+
+ buf = ibuf->r.buf + ibuf->r.wpos;
+ len = sizeof(ibuf->r.buf) - ibuf->r.wpos;
+
+ while ((n = recv(ibuf->fd, buf, len, 0)) == -1) {
+ if (errno != EINTR)
+ return (n);
+ }
+
+ ibuf->r.wpos += n;
+ return (n);
+}
+
+void
+m_forward(struct mproc *p, struct imsg *imsg)
+{
+ imsg_compose(&p->imsgbuf, imsg->hdr.type, imsg->hdr.peerid,
+ imsg->hdr.pid, imsg->fd, imsg->data,
+ imsg->hdr.len - sizeof(imsg->hdr));
+
+ if (imsg->hdr.type != IMSG_STAT_DECREMENT &&
+ imsg->hdr.type != IMSG_STAT_INCREMENT)
+ log_trace(TRACE_MPROC, "mproc: %s -> %s : %zu %s (forward)",
+ proc_name(smtpd_process),
+ proc_name(p->proc),
+ imsg->hdr.len - sizeof(imsg->hdr),
+ imsg_to_str(imsg->hdr.type));
+
+ mproc_event_add(p);
+}
+
+void
+m_compose(struct mproc *p, uint32_t type, uint32_t peerid, pid_t pid, int fd,
+ void *data, size_t len)
+{
+ imsg_compose(&p->imsgbuf, type, peerid, pid, fd, data, len);
+
+ if (type != IMSG_STAT_DECREMENT &&
+ type != IMSG_STAT_INCREMENT)
+ log_trace(TRACE_MPROC, "mproc: %s -> %s : %zu %s",
+ proc_name(smtpd_process),
+ proc_name(p->proc),
+ len,
+ imsg_to_str(type));
+
+ mproc_event_add(p);
+}
+
+void
+m_composev(struct mproc *p, uint32_t type, uint32_t peerid, pid_t pid,
+ int fd, const struct iovec *iov, int n)
+{
+ size_t len;
+ int i;
+
+ imsg_composev(&p->imsgbuf, type, peerid, pid, fd, iov, n);
+
+ len = 0;
+ for (i = 0; i < n; i++)
+ len += iov[i].iov_len;
+
+ if (type != IMSG_STAT_DECREMENT &&
+ type != IMSG_STAT_INCREMENT)
+ log_trace(TRACE_MPROC, "mproc: %s -> %s : %zu %s",
+ proc_name(smtpd_process),
+ proc_name(p->proc),
+ len,
+ imsg_to_str(type));
+
+ mproc_event_add(p);
+}
+
+void
+m_create(struct mproc *p, uint32_t type, uint32_t peerid, pid_t pid, int fd)
+{
+ p->m_pos = 0;
+ p->m_type = type;
+ p->m_peerid = peerid;
+ p->m_pid = pid;
+ p->m_fd = fd;
+}
+
+void
+m_add(struct mproc *p, const void *data, size_t len)
+{
+ size_t alloc;
+ void *tmp;
+
+ if (p->m_pos + len + IMSG_HEADER_SIZE > MAX_IMSGSIZE) {
+ log_warnx("warn: message too large");
+ fatal(NULL);
+ }
+
+ alloc = p->m_alloc ? p->m_alloc : 128;
+ while (p->m_pos + len > alloc)
+ alloc *= 2;
+ if (alloc != p->m_alloc) {
+ log_trace(TRACE_MPROC, "mproc: %s -> %s: realloc %zu -> %zu",
+ proc_name(smtpd_process),
+ proc_name(p->proc),
+ p->m_alloc,
+ alloc);
+
+ tmp = recallocarray(p->m_buf, p->m_alloc, alloc, 1);
+ if (tmp == NULL)
+ fatal("realloc");
+ p->m_alloc = alloc;
+ p->m_buf = tmp;
+ }
+
+ memmove(p->m_buf + p->m_pos, data, len);
+ p->m_pos += len;
+}
+
+void
+m_close(struct mproc *p)
+{
+ if (imsg_compose(&p->imsgbuf, p->m_type, p->m_peerid, p->m_pid, p->m_fd,
+ p->m_buf, p->m_pos) == -1)
+ fatal("imsg_compose");
+
+ log_trace(TRACE_MPROC, "mproc: %s -> %s : %zu %s",
+ proc_name(smtpd_process),
+ proc_name(p->proc),
+ p->m_pos,
+ imsg_to_str(p->m_type));
+
+ mproc_event_add(p);
+}
+
+void
+m_flush(struct mproc *p)
+{
+ if (imsg_compose(&p->imsgbuf, p->m_type, p->m_peerid, p->m_pid, p->m_fd,
+ p->m_buf, p->m_pos) == -1)
+ fatal("imsg_compose");
+
+ log_trace(TRACE_MPROC, "mproc: %s -> %s : %zu %s (flush)",
+ proc_name(smtpd_process),
+ proc_name(p->proc),
+ p->m_pos,
+ imsg_to_str(p->m_type));
+
+ p->m_pos = 0;
+
+ if (imsg_flush(&p->imsgbuf) == -1)
+ fatal("imsg_flush");
+}
+
+static struct imsg * current;
+
+static void
+m_error(const char *error)
+{
+ char buf[512];
+
+ (void)snprintf(buf, sizeof buf, "%s: %s: %s",
+ proc_name(smtpd_process),
+ imsg_to_str(current->hdr.type),
+ error);
+ fatalx("%s", buf);
+}
+
+void
+m_msg(struct msg *m, struct imsg *imsg)
+{
+ current = imsg;
+ m->pos = imsg->data;
+ m->end = m->pos + (imsg->hdr.len - sizeof(imsg->hdr));
+}
+
+void
+m_end(struct msg *m)
+{
+ if (m->pos != m->end)
+ m_error("not at msg end");
+}
+
+int
+m_is_eom(struct msg *m)
+{
+ return (m->pos == m->end);
+}
+
+static inline void
+m_get(struct msg *m, void *dst, size_t sz)
+{
+ if (sz > MAX_IMSGSIZE ||
+ m->end - m->pos < (ssize_t)sz)
+ fatalx("msg too short");
+
+ memmove(dst, m->pos, sz);
+ m->pos += sz;
+}
+
+void
+m_add_int(struct mproc *m, int v)
+{
+ m_add(m, &v, sizeof(v));
+};
+
+void
+m_add_u32(struct mproc *m, uint32_t u32)
+{
+ m_add(m, &u32, sizeof(u32));
+};
+
+void
+m_add_size(struct mproc *m, size_t sz)
+{
+ m_add(m, &sz, sizeof(sz));
+};
+
+void
+m_add_time(struct mproc *m, time_t v)
+{
+ m_add(m, &v, sizeof(v));
+};
+
+void
+m_add_timeval(struct mproc *m, struct timeval *tv)
+{
+ m_add(m, tv, sizeof(*tv));
+}
+
+
+void
+m_add_string(struct mproc *m, const char *v)
+{
+ if (v) {
+ m_add(m, "s", 1);
+ m_add(m, v, strlen(v) + 1);
+ }
+ else
+ m_add(m, "\0", 1);
+};
+
+void
+m_add_data(struct mproc *m, const void *v, size_t len)
+{
+ m_add_size(m, len);
+ m_add(m, v, len);
+};
+
+void
+m_add_id(struct mproc *m, uint64_t v)
+{
+ m_add(m, &v, sizeof(v));
+}
+
+void
+m_add_evpid(struct mproc *m, uint64_t v)
+{
+ m_add(m, &v, sizeof(v));
+}
+
+void
+m_add_msgid(struct mproc *m, uint32_t v)
+{
+ m_add(m, &v, sizeof(v));
+}
+
+void
+m_add_sockaddr(struct mproc *m, const struct sockaddr *sa)
+{
+ m_add_size(m, SA_LEN(sa));
+ m_add(m, sa, SA_LEN(sa));
+}
+
+void
+m_add_mailaddr(struct mproc *m, const struct mailaddr *maddr)
+{
+ m_add(m, maddr, sizeof(*maddr));
+}
+
+void
+m_add_envelope(struct mproc *m, const struct envelope *evp)
+{
+ char buf[sizeof(*evp)];
+
+ envelope_dump_buffer(evp, buf, sizeof(buf));
+ m_add_evpid(m, evp->id);
+ m_add_string(m, buf);
+}
+
+void
+m_add_params(struct mproc *m, struct dict *d)
+{
+ const char *key;
+ char *value;
+ void *iter;
+
+ if (d == NULL) {
+ m_add_size(m, 0);
+ return;
+ }
+ m_add_size(m, dict_count(d));
+ iter = NULL;
+ while (dict_iter(d, &iter, &key, (void **)&value)) {
+ m_add_string(m, key);
+ m_add_string(m, value);
+ }
+}
+
+void
+m_get_int(struct msg *m, int *i)
+{
+ m_get(m, i, sizeof(*i));
+}
+
+void
+m_get_u32(struct msg *m, uint32_t *u32)
+{
+ m_get(m, u32, sizeof(*u32));
+}
+
+void
+m_get_size(struct msg *m, size_t *sz)
+{
+ m_get(m, sz, sizeof(*sz));
+}
+
+void
+m_get_time(struct msg *m, time_t *t)
+{
+ m_get(m, t, sizeof(*t));
+}
+
+void
+m_get_timeval(struct msg *m, struct timeval *tv)
+{
+ m_get(m, tv, sizeof(*tv));
+}
+
+void
+m_get_string(struct msg *m, const char **s)
+{
+ uint8_t *end;
+ char c;
+
+ if (m->pos >= m->end)
+ m_error("msg too short");
+
+ c = *m->pos++;
+ if (c == '\0') {
+ *s = NULL;
+ return;
+ }
+
+ if (m->pos >= m->end)
+ m_error("msg too short");
+ end = memchr(m->pos, 0, m->end - m->pos);
+ if (end == NULL)
+ m_error("unterminated string");
+
+ *s = m->pos;
+ m->pos = end + 1;
+}
+
+void
+m_get_data(struct msg *m, const void **data, size_t *sz)
+{
+ m_get_size(m, sz);
+
+ if (*sz == 0) {
+ *data = NULL;
+ return;
+ }
+
+ if (m->pos + *sz > m->end)
+ m_error("msg too short");
+
+ *data = m->pos;
+ m->pos += *sz;
+}
+
+void
+m_get_evpid(struct msg *m, uint64_t *evpid)
+{
+ m_get(m, evpid, sizeof(*evpid));
+}
+
+void
+m_get_msgid(struct msg *m, uint32_t *msgid)
+{
+ m_get(m, msgid, sizeof(*msgid));
+}
+
+void
+m_get_id(struct msg *m, uint64_t *id)
+{
+ m_get(m, id, sizeof(*id));
+}
+
+void
+m_get_sockaddr(struct msg *m, struct sockaddr *sa)
+{
+ size_t len;
+
+ m_get_size(m, &len);
+ m_get(m, sa, len);
+}
+
+void
+m_get_mailaddr(struct msg *m, struct mailaddr *maddr)
+{
+ m_get(m, maddr, sizeof(*maddr));
+}
+
+void
+m_get_envelope(struct msg *m, struct envelope *evp)
+{
+ uint64_t evpid;
+ const char *buf;
+
+ m_get_evpid(m, &evpid);
+ m_get_string(m, &buf);
+ if (buf == NULL)
+ fatalx("empty envelope buffer");
+
+ if (!envelope_load_buffer(evp, buf, strlen(buf)))
+ fatalx("failed to retrieve envelope");
+ evp->id = evpid;
+}
+
+void
+m_get_params(struct msg *m, struct dict *d)
+{
+ size_t c;
+ const char *key;
+ const char *value;
+ char *tmp;
+
+ dict_init(d);
+
+ m_get_size(m, &c);
+
+ for (; c; c--) {
+ m_get_string(m, &key);
+ m_get_string(m, &value);
+ if ((tmp = strdup(value)) == NULL)
+ fatal("m_get_params");
+ dict_set(d, key, tmp);
+ }
+}
+
+void
+m_clear_params(struct dict *d)
+{
+ char *value;
+
+ while (dict_poproot(d, (void **)&value))
+ free(value);
+}