aboutsummaryrefslogtreecommitdiffstats
path: root/broken/propagate/src/route.c
diff options
context:
space:
mode:
Diffstat (limited to 'broken/propagate/src/route.c')
-rw-r--r--broken/propagate/src/route.c138
1 files changed, 138 insertions, 0 deletions
diff --git a/broken/propagate/src/route.c b/broken/propagate/src/route.c
new file mode 100644
index 0000000..99f52ec
--- /dev/null
+++ b/broken/propagate/src/route.c
@@ -0,0 +1,138 @@
+#include <stdlib.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <errno.h>
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+
+#include <poll.h>
+
+#include "pg.h"
+
+static int route_start(struct route *r);
+static int frontend_start(struct route *);
+
+int
+route_add(char dest, int type, char *cmd, int async, char *path, char gw) {
+ struct route *r;
+ char **argv;
+ int argc;
+
+ if (routes_count >= ROUTES_MAX) {
+ log_warn("routes max reached");
+ return -1;
+ }
+
+ r = xcalloc(1, sizeof(struct route));
+ if (!r)
+ return -1;
+
+ r->dest = dest;
+ r->type = type;
+ switch (type) {
+ case ROUTE_PROC:
+ argv = explode(cmd, strlen(cmd), " ", &argc);
+ log_tmp("route_add: explode argv 0 %s 1 %s 2 %s 3 %s argc %d",
+ argv[0], argv[1], argv[2], argv[3], argc);
+ if (!argv)
+ return -1;
+ r->proc.cmd = argv[0];
+ r->proc.argv = argv;
+ r->proc.async = async;
+ break;
+ case ROUTE_GW:
+ r->gw.dest = gw;
+ }
+
+ LIST_INSERT_HEAD(&routes, r, entry);
+
+ route_start(r);
+
+ return 0;
+}
+
+int
+route_fw(int fd, struct msg_header *hdr, uint8_t *hbuf) {
+ struct route *r;
+ int len;
+
+ log_debug("route %c to %c", hdr->orig, hdr->dest);
+
+ r = route_find(hdr->dest);
+ if (r) {
+ log_debug("route_fw: no route to %c", hdr->dest);
+ goto err;
+ }
+
+ // XXX HERE async bufferize
+
+ len = writebuf(hbuf, r->proc.fd[1], MSG_HEADER_SIZE_ENCODED);
+ if (len < 0)
+ goto err;
+ len = readwrite(fd, r->proc.fd[1], hdr->datalen);
+ if (len < 0)
+ goto err;
+
+ return 0;
+
+err:
+ log_warn("route_fw: err");
+ return -1;
+}
+
+int
+route_bufferize(struct route *r, int fd) {
+ uint8_t buf[BUFMAX];
+
+ log_warn("XXX route bufferize not implemented");
+ read(fd, buf, sizeof(buf));
+ return -1;
+}
+
+struct route *
+route_find(char dest) {
+ struct route *r;
+
+ LIST_FOREACH(r, &routes, entry) {
+ if (r->dest == dest) {
+ if (r->type == ROUTE_GW) {
+ dest = r->gw.dest;
+ continue;
+ }
+ return r;
+ }
+ }
+
+ return NULL;
+}
+
+static int
+route_start(struct route *r) {
+ switch (r->type) {
+ case ROUTE_PROC:
+ if (frontend_start(r) < 0)
+ log_warn("frontend for dest %c failed to start", r->dest);
+ break;
+ }
+
+ send_cmd(r->dest, MSG_INIT, 0, NULL, 0);
+
+ return 0;
+}
+
+static int
+frontend_start(struct route *r) {
+ int pid;
+
+ /* XXX monitor death of fe / broken pipe, to restart it */
+ pid = execpipe(r->proc.cmd, r->proc.argv, r->proc.fd);
+ if (pid > 0)
+ r->proc.pid = pid;
+ log_debug("frontend_start: forked pid %d, fd %d %d", pid, r->proc.fd[0], r->proc.fd[1]);
+ setnonblock(r->proc.fd[0]);
+
+ return pid;
+}
+