aboutsummaryrefslogtreecommitdiffstats
path: root/broken/propagate/src/listener.c
diff options
context:
space:
mode:
Diffstat (limited to 'broken/propagate/src/listener.c')
-rw-r--r--broken/propagate/src/listener.c271
1 files changed, 271 insertions, 0 deletions
diff --git a/broken/propagate/src/listener.c b/broken/propagate/src/listener.c
new file mode 100644
index 0000000..782385d
--- /dev/null
+++ b/broken/propagate/src/listener.c
@@ -0,0 +1,271 @@
+#include <stdlib.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <errno.h>
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <signal.h>
+
+#include <poll.h>
+
+#include "pg.h"
+
+#define SOCK_BASENAME "/tmp/propagate_sock"
+#define SOCK_NAME_MAXLEN 255
+
+static int listener_start(struct listener *);
+static int unix_start(struct listener *);
+static int inet_start(struct listener *);
+
+int
+listener_add(int type, char *path) {
+ struct listener *l;
+
+ if (listeners_count >= LISTENERS_MAX) {
+ log_warn("listeners max reached");
+ return -1;
+ }
+ l = xcalloc(1, sizeof(struct listener));
+ if (!l)
+ return -1;
+
+ l->type = type;
+ switch (type) {
+ case LISTENER_UNIX:
+ l->sock_unix.path = path;
+ break;
+ case LISTENER_INET:
+ log_warn("listener_add: XXX inet not implemented yet");
+ break;
+ }
+ LIST_INIT(&(l->conns));
+ l->conns_count = 0;
+
+ LIST_INSERT_HEAD(&listeners, l, entry);
+ listeners_count++;
+
+ listener_start(l);
+
+ return 0;
+}
+
+struct listener *
+listener_find(int fd) {
+ struct listener *l;
+
+ LIST_FOREACH(l, &listeners, entry) {
+ if (l->sock == fd)
+ return l;
+ }
+ return NULL;
+}
+
+int
+listener_conn_add(struct listener *l, int fd) {
+ struct conn *c;
+
+ if (l->conns_count >= LISTENER_CONN_MAX) {
+ log_warn("conn max reached");
+ return -1;
+ }
+ c = xcalloc(1, sizeof(struct conn));
+ if (!c)
+ return -1;
+
+ c->listener = l;
+ c->fd = fd;
+ c->state = CONN_OPEN;
+
+ LIST_INSERT_HEAD(&(l->conns), c, entry);
+ l->conns_count++;
+
+ setnonblock(fd);
+
+ return 0;
+}
+
+void
+listener_conn_del(struct conn *c) {
+ close(c->fd);
+ if (c->exec.cmd)
+ listener_conn_exec_kill(c);
+ LIST_REMOVE(c, entry);
+ c->listener->conns_count--;
+ free(c);
+}
+
+void
+listener_conn_move(struct conn *oldc, struct conn *newc) {
+ memcpy(newc, oldc, sizeof(struct conn));
+ LIST_REMOVE(oldc, entry);
+ newc->listener->conns_count--;
+ free(oldc);
+}
+
+struct conn *
+listener_conn_find(int fd) {
+ struct listener *l;
+ struct conn *c;
+
+ LIST_FOREACH(l, &listeners, entry) {
+ LIST_FOREACH(c, &l->conns, entry) {
+ if (c->fd == fd)
+ return c;
+ }
+ }
+ return NULL;
+}
+
+struct conn *
+listener_conn_find_orig(char orig) {
+ struct listener *l;
+ struct conn *c;
+
+ LIST_FOREACH(l, &listeners, entry) {
+ LIST_FOREACH(c, &l->conns, entry) {
+ if (c->orig == orig)
+ return c;
+ }
+ }
+ return NULL;
+
+}
+
+/* XXX indicate that we are only looking in input pipe */
+struct conn *
+listener_conn_find_exec(int fd) {
+ struct listener *l;
+ struct conn *c;
+
+ LIST_FOREACH(l, &listeners, entry) {
+ LIST_FOREACH(c, &l->conns, entry) {
+ if (c->exec.fd[0] == fd)
+ return c;
+ }
+ }
+ return NULL;
+}
+
+int
+listener_conn_exec(struct conn *c, char *cmd, char **argv) {
+ int pid;
+
+ log_debug("listener_conn_exec %s", cmd);
+
+ if (c->exec.cmd) {
+ log_warn("%c is already executing a command", c->orig);
+ return -1;
+ }
+ c->exec.cmd = cmd;
+ c->exec.argv = argv;
+
+ pid = execpipe(cmd, argv, c->exec.fd);
+ if (pid < 0)
+ return -1;
+ c->exec.pid = pid;
+
+ // XXX if async then writebuf ?
+ // XXX if err then MSG_ERR
+
+ return 0;
+}
+
+void
+listener_conn_exec_kill(struct conn *c) {
+ int i;
+
+ if (c->exec.cmd) {
+ free(c->exec.cmd);
+ c->exec.cmd = NULL;
+ }
+ if (c->exec.argv) {
+ for (i=0; c->exec.argv[i] != NULL; i++)
+ free(c->exec.argv[i]);
+ free(c->exec.argv);
+ c->exec.argv = NULL;
+ }
+ kill(c->exec.pid, SIGTERM);
+ close(c->exec.fd[0]);
+ close(c->exec.fd[1]);
+ if (c->exec.async_writebuf) {
+ free(c->exec.async_writebuf);
+ c->exec.async_writebuf = NULL;
+ }
+}
+
+
+int
+listener_conn_exec_bufferize(struct conn *c, int fd) {
+ uint8_t buf[BUFMAX];
+
+ log_warn("XXX conn exec bufferize not implemented");
+ read(fd, buf, sizeof(buf));
+ return -1;
+}
+
+static int
+listener_start(struct listener *l) {
+ switch (l->type) {
+ case LISTENER_UNIX:
+ if (unix_start(l) < 0)
+ log_warn("listener socket failed to start");
+ break;
+ case LISTENER_INET:
+ if (inet_start(l) < 0)
+ log_warn("listener inet failed to start");
+ break;
+ }
+ return 0;
+}
+
+static int
+unix_start(struct listener *l) {
+ int s, len;
+ struct sockaddr_un local;
+ char *path;
+
+ path = xmalloc(sizeof(SOCK_NAME_MAXLEN));
+ if (!path)
+ return -1;
+ snprintf(path, SOCK_NAME_MAXLEN, "%s", l->sock_unix.path);
+
+ /* XXX monitor death of the socket, to restart it */
+
+ if ((s = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
+ log_warn("unix_start: socket %s: %s", path, strerror(errno));
+ free(path);
+ return -1;
+ }
+
+ memset(&local, 0, sizeof(struct sockaddr_un));
+ local.sun_family = AF_UNIX;
+ strncpy(local.sun_path, path, sizeof(local.sun_path));
+ unlink(local.sun_path);
+ len = strlen(local.sun_path) + sizeof(local.sun_family);
+ if (bind(s, (struct sockaddr *)&local, len) == -1) {
+ log_warn("unix_start: bind %s: %s", path, strerror(errno));
+ free(path);
+ return -1;
+ }
+
+ if (listen(s, 5) == -1) {
+ log_warn("unix_start: listen %s: %s", path, strerror(errno));
+ free(path);
+ return -1;
+ }
+
+ // XXX setnonblock(s);
+
+ l->sock = s;
+ l->sock_unix.path = path;
+
+ return 0;
+}
+
+static int
+inet_start(struct listener *l) {
+ log_warn("inet_start: XXX not implemented yet");
+ return -1;
+}