diff options
Diffstat (limited to 'broken/propagate/src/listener.c')
-rw-r--r-- | broken/propagate/src/listener.c | 271 |
1 files changed, 271 insertions, 0 deletions
diff --git a/broken/propagate/src/listener.c b/broken/propagate/src/listener.c new file mode 100644 index 0000000..782385d --- /dev/null +++ b/broken/propagate/src/listener.c @@ -0,0 +1,271 @@ +#include <stdlib.h> +#include <stdio.h> +#include <unistd.h> +#include <errno.h> + +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/un.h> +#include <signal.h> + +#include <poll.h> + +#include "pg.h" + +#define SOCK_BASENAME "/tmp/propagate_sock" +#define SOCK_NAME_MAXLEN 255 + +static int listener_start(struct listener *); +static int unix_start(struct listener *); +static int inet_start(struct listener *); + +int +listener_add(int type, char *path) { + struct listener *l; + + if (listeners_count >= LISTENERS_MAX) { + log_warn("listeners max reached"); + return -1; + } + l = xcalloc(1, sizeof(struct listener)); + if (!l) + return -1; + + l->type = type; + switch (type) { + case LISTENER_UNIX: + l->sock_unix.path = path; + break; + case LISTENER_INET: + log_warn("listener_add: XXX inet not implemented yet"); + break; + } + LIST_INIT(&(l->conns)); + l->conns_count = 0; + + LIST_INSERT_HEAD(&listeners, l, entry); + listeners_count++; + + listener_start(l); + + return 0; +} + +struct listener * +listener_find(int fd) { + struct listener *l; + + LIST_FOREACH(l, &listeners, entry) { + if (l->sock == fd) + return l; + } + return NULL; +} + +int +listener_conn_add(struct listener *l, int fd) { + struct conn *c; + + if (l->conns_count >= LISTENER_CONN_MAX) { + log_warn("conn max reached"); + return -1; + } + c = xcalloc(1, sizeof(struct conn)); + if (!c) + return -1; + + c->listener = l; + c->fd = fd; + c->state = CONN_OPEN; + + LIST_INSERT_HEAD(&(l->conns), c, entry); + l->conns_count++; + + setnonblock(fd); + + return 0; +} + +void +listener_conn_del(struct conn *c) { + close(c->fd); + if (c->exec.cmd) + listener_conn_exec_kill(c); + LIST_REMOVE(c, entry); + c->listener->conns_count--; + free(c); +} + +void +listener_conn_move(struct conn *oldc, struct conn *newc) { + memcpy(newc, oldc, sizeof(struct conn)); + LIST_REMOVE(oldc, entry); + newc->listener->conns_count--; + free(oldc); +} + +struct conn * +listener_conn_find(int fd) { + struct listener *l; + struct conn *c; + + LIST_FOREACH(l, &listeners, entry) { + LIST_FOREACH(c, &l->conns, entry) { + if (c->fd == fd) + return c; + } + } + return NULL; +} + +struct conn * +listener_conn_find_orig(char orig) { + struct listener *l; + struct conn *c; + + LIST_FOREACH(l, &listeners, entry) { + LIST_FOREACH(c, &l->conns, entry) { + if (c->orig == orig) + return c; + } + } + return NULL; + +} + +/* XXX indicate that we are only looking in input pipe */ +struct conn * +listener_conn_find_exec(int fd) { + struct listener *l; + struct conn *c; + + LIST_FOREACH(l, &listeners, entry) { + LIST_FOREACH(c, &l->conns, entry) { + if (c->exec.fd[0] == fd) + return c; + } + } + return NULL; +} + +int +listener_conn_exec(struct conn *c, char *cmd, char **argv) { + int pid; + + log_debug("listener_conn_exec %s", cmd); + + if (c->exec.cmd) { + log_warn("%c is already executing a command", c->orig); + return -1; + } + c->exec.cmd = cmd; + c->exec.argv = argv; + + pid = execpipe(cmd, argv, c->exec.fd); + if (pid < 0) + return -1; + c->exec.pid = pid; + + // XXX if async then writebuf ? + // XXX if err then MSG_ERR + + return 0; +} + +void +listener_conn_exec_kill(struct conn *c) { + int i; + + if (c->exec.cmd) { + free(c->exec.cmd); + c->exec.cmd = NULL; + } + if (c->exec.argv) { + for (i=0; c->exec.argv[i] != NULL; i++) + free(c->exec.argv[i]); + free(c->exec.argv); + c->exec.argv = NULL; + } + kill(c->exec.pid, SIGTERM); + close(c->exec.fd[0]); + close(c->exec.fd[1]); + if (c->exec.async_writebuf) { + free(c->exec.async_writebuf); + c->exec.async_writebuf = NULL; + } +} + + +int +listener_conn_exec_bufferize(struct conn *c, int fd) { + uint8_t buf[BUFMAX]; + + log_warn("XXX conn exec bufferize not implemented"); + read(fd, buf, sizeof(buf)); + return -1; +} + +static int +listener_start(struct listener *l) { + switch (l->type) { + case LISTENER_UNIX: + if (unix_start(l) < 0) + log_warn("listener socket failed to start"); + break; + case LISTENER_INET: + if (inet_start(l) < 0) + log_warn("listener inet failed to start"); + break; + } + return 0; +} + +static int +unix_start(struct listener *l) { + int s, len; + struct sockaddr_un local; + char *path; + + path = xmalloc(sizeof(SOCK_NAME_MAXLEN)); + if (!path) + return -1; + snprintf(path, SOCK_NAME_MAXLEN, "%s", l->sock_unix.path); + + /* XXX monitor death of the socket, to restart it */ + + if ((s = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) { + log_warn("unix_start: socket %s: %s", path, strerror(errno)); + free(path); + return -1; + } + + memset(&local, 0, sizeof(struct sockaddr_un)); + local.sun_family = AF_UNIX; + strncpy(local.sun_path, path, sizeof(local.sun_path)); + unlink(local.sun_path); + len = strlen(local.sun_path) + sizeof(local.sun_family); + if (bind(s, (struct sockaddr *)&local, len) == -1) { + log_warn("unix_start: bind %s: %s", path, strerror(errno)); + free(path); + return -1; + } + + if (listen(s, 5) == -1) { + log_warn("unix_start: listen %s: %s", path, strerror(errno)); + free(path); + return -1; + } + + // XXX setnonblock(s); + + l->sock = s; + l->sock_unix.path = path; + + return 0; +} + +static int +inet_start(struct listener *l) { + log_warn("inet_start: XXX not implemented yet"); + return -1; +} |