aboutsummaryrefslogtreecommitdiffstats
path: root/broken/propagate/src/route.c
blob: 99f52ec46cfb106b7d4a7062310e991587b52f68 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>

#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>

#include <poll.h>

#include "pg.h"

static int route_start(struct route *r);
static int frontend_start(struct route *);

int
route_add(char dest, int type, char *cmd, int async, char *path, char gw) {
    struct route *r;
    char **argv;
    int argc;

    if (routes_count >= ROUTES_MAX) {
        log_warn("routes max reached");
        return -1;
    }

    r = xcalloc(1, sizeof(struct route));
    if (!r)
        return -1;

    r->dest = dest;
    r->type = type;
    switch (type) {
    case ROUTE_PROC:
        argv = explode(cmd, strlen(cmd), " ", &argc);
        log_tmp("route_add: explode argv 0 %s 1 %s 2 %s 3 %s argc %d",
                argv[0], argv[1], argv[2], argv[3], argc);
        if (!argv)
            return -1;
        r->proc.cmd = argv[0];
        r->proc.argv = argv;
        r->proc.async = async;
        break;
    case ROUTE_GW:
        r->gw.dest = gw;
    }

    LIST_INSERT_HEAD(&routes, r, entry);

    route_start(r);

    return 0;
}

int
route_fw(int fd, struct msg_header *hdr, uint8_t *hbuf) {
    struct route *r;
    int len;

    log_debug("route %c to %c", hdr->orig, hdr->dest);

    r = route_find(hdr->dest);
    if (r) {
        log_debug("route_fw: no route to %c", hdr->dest);
        goto err;
    }

    // XXX HERE async bufferize

    len = writebuf(hbuf, r->proc.fd[1], MSG_HEADER_SIZE_ENCODED);
    if (len < 0)
        goto err;
    len = readwrite(fd, r->proc.fd[1], hdr->datalen);
    if (len < 0)
        goto err;

    return 0;

err:
    log_warn("route_fw: err");
    return -1;
}

int
route_bufferize(struct route *r, int fd) {
    uint8_t buf[BUFMAX];

    log_warn("XXX route bufferize not implemented");
    read(fd, buf, sizeof(buf));
    return -1;
}

struct route *
route_find(char dest) {
    struct route *r;

    LIST_FOREACH(r, &routes, entry) {
        if (r->dest == dest) {
            if (r->type == ROUTE_GW) {
                dest = r->gw.dest;
                continue;
            }
            return r;
        }
    }

    return NULL;
}

static int
route_start(struct route *r) {
    switch (r->type) {
    case ROUTE_PROC:
        if (frontend_start(r) < 0)
            log_warn("frontend for dest %c failed to start", r->dest);
        break;
    }

    send_cmd(r->dest, MSG_INIT, 0, NULL, 0);

    return 0;
}

static int
frontend_start(struct route *r) {
    int pid;

    /* XXX monitor death of fe / broken pipe, to restart it */
    pid = execpipe(r->proc.cmd, r->proc.argv, r->proc.fd);
    if (pid > 0)
        r->proc.pid = pid;
    log_debug("frontend_start: forked pid %d, fd %d %d", pid, r->proc.fd[0], r->proc.fd[1]);
    setnonblock(r->proc.fd[0]);

    return pid;
}