diff options
Diffstat (limited to 'propagate/src/pg.c')
-rw-r--r-- | propagate/src/pg.c | 448 |
1 files changed, 448 insertions, 0 deletions
diff --git a/propagate/src/pg.c b/propagate/src/pg.c new file mode 100644 index 0000000..281363e --- /dev/null +++ b/propagate/src/pg.c @@ -0,0 +1,448 @@ +/* XXX + * we should inspire from netcat + * http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/nc/ + * see readwrite(), atomicio() + */ + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> + +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> + +#include <sys/socket.h> +#include <poll.h> + +#include "pg.h" + +extern char *__progname; + +static int usage(); +static void process(void); +static int pfds_create(struct pollfd *); +static int cmd_init(struct conn *, int, struct msg_header *); +static int cmd_exec(struct conn *, int, struct msg_header *); +static int cmd_kill(struct conn *, int, struct msg_header *); +static int cmd_read(struct conn *, int, struct msg_header *); +static int cmd_write(struct conn *, int, struct msg_header *); +static int cmd_data_conn(struct conn *, int, struct msg_header *); +static int cmd_data_route(struct route *, int, struct msg_header *); +static int cmd_ok(struct route *, int, struct msg_header *); +static int cmd_err(struct route *, int, struct msg_header *); + +int +main(int argc, char *argv[]) { + int opt; + int loglevel = 1, logstdout = 0; + int deamonize = 0; + int server = 0; + char dest = 0; + char *cmd = NULL; + + while ((opt = getopt(argc, argv, "dhltv")) != -1) { + switch (opt) { + case 'd': + logstdout = 1; + deamonize = 0; + break; + case 'h': + usage(); + break; + case 'l': + server = 1; + break; + case 't': + dest = optarg[0]; + break; + case 'v': + loglevel++; + break; + default: + usage(); + break; + } + } + + argc -= optind; + argv += optind; + + if ((server && argc != 0) || + (!server && argc != 1) || + (server && conf.client_dest)) + usage(); + if (!server) { + cmd = argv[0]; + if (dest == 0) + dest = 'B'; + conf.client_dest = dest; + } + + log_init(loglevel, logstdout); + log_warn("** Starting propagate v%.1f", VERSION); + log_warn("** using message version %d", MSG_VERSION); + log_warn("** loglevel %d", loglevel); + + conf.server = server; + LIST_INIT(&routes); + routes_count = 0; + LIST_INIT(&listeners); + listeners_count = 0; + + /* XXX load conf */ + if (server) { + conf.me = 'B'; + listener_add(LISTENER_UNIX, "/tmp/propagate_sock"); + } else { + conf.me = 'A'; + route_add('B', ROUTE_PROC, "netcat 127.0.0.1 3333", 0, NULL, 0); + } + + if (server) + log_info("starting server %c", conf.me); + else + log_info("running client %c, dest %c, command %s", conf.me, conf.client_dest, cmd); + + if (deamonize) + log_info("XXX deamonize not implemented yet"); + + if (!server) + send_cmd(conf.client_dest, MSG_EXEC, 0, (uint8_t *)cmd, strlen(cmd)); + + process(); + + return 0; +} + +static int +usage() { + printf("Usage: %s [-dhv] [-t destination] command\n", __progname); + printf(" %s -l [-dhv]\n", __progname); + + exit(1); +} + +static void +process(void) { + struct listener *l; + struct conn *c; + struct route *r; + uint8_t *buf; + struct msg_header *hdr; + struct sockaddr_storage cliaddr; + socklen_t slen = sizeof(cliaddr); + int len, n, fd, cfd; + struct pollfd pfds[POLLER_MAX]; + int pfds_count; + /* keep in sync with pg.h MSG enum */ + int (*cmd_conn[MSG_MAX+1]) + (struct conn *, int, struct msg_header *) = { + &cmd_init, + &cmd_init, + &cmd_kill, + &cmd_exec, + &cmd_read, + &cmd_write, + &cmd_data_conn, + NULL, + NULL + }; + int (*cmd_route[MSG_MAX+1]) + (struct route *, int, struct msg_header *) = { + NULL, + NULL, + NULL, + NULL, + NULL, + NULL, + &cmd_data_route, + &cmd_ok, + &cmd_err + }; + + + /* OLD TODO + * fork frontend and open pipe_fe + * poll() + * - pipe_fe input -> write stdout + * - stdin -> append send_buf + * sigalarm() every second + * - MSG_READ and write on stdout + * - if size(send_buf) > 0: MSG_WRITE send_buf + * handle network errors / retransmission in fe + * MSG_[READ|WRITE] do not expect MSG_OK, but warns on MSG_ERROR + * MSG_OK only for MSG_INIT and MSG_EXEC + */ + + for (;;) { + pfds_count = pfds_create(pfds); + n = poll(pfds, pfds_count, POLL_TIMEOUT); + log_tmp("end of poll, %d fds on %d", n, pfds_count); + if (n < 0) { + log_warn("polling error"); // XXX fatal ? + continue; + } + + for (n=0; n<pfds_count; n++) { + if ((pfds[n].revents & POLLIN) == 0) + continue; + fd = pfds[n].fd; + log_tmp(" fd %d got something", fd); + + if ((l = listener_find(fd))) { + log_debug("got listener new connection"); + cfd = accept(fd, (struct sockaddr *)&cliaddr, &slen); + listener_conn_add(l, cfd); + } else if ((c = listener_conn_find_exec(fd))) { + log_debug("got exec data"); + if (c->async) + listener_conn_exec_bufferize(c, fd); + else + msg_send_from_fd(c->fd, MSG_DATA, c->orig, 0, fd); + } else if (fd == fileno(stdin)) { + log_debug("got stdin data"); + r = route_find(conf.client_dest); + if (r->proc.async) + route_bufferize(r, fd); + else + msg_send_from_fd(r->proc.fd[1], MSG_DATA, r->dest, 0, fd); + } else { + log_debug("got command"); + len = readbuf(fd, &buf, sizeof(MSG_MAGIC)); + if (len < sizeof(MSG_MAGIC)) { + log_info("Magic number too short"); + continue; + } + if (memcmp(buf, MSG_MAGIC, sizeof(MSG_MAGIC))) { + log_info("Invalid magic number %4x", buf); + continue; + } + len = readbuf(fd, &buf, MSG_HEADER_SIZE_ENCODED); + if (len < MSG_HEADER_SIZE_ENCODED) { + log_warn("Message header too short"); + continue; + } + hdr = msg_unpack_header(buf); + if (!hdr) { + log_warn("Invalid message header"); + continue; + } + + if (hdr->dest != conf.me) { + route_fw(fd, hdr, buf); + free(hdr); + continue; + } + + if ((c = listener_conn_find(fd))) { + cmd_conn[hdr->type](c, fd, hdr); + break; + } else if ((r = route_find(hdr->orig))) { + cmd_route[hdr->type](r, fd, hdr); + break; + } else + log_warn("host %c has no reference ! ignoring command...", + hdr->orig); + free(hdr); + } + } + } +} + +static int +pfds_create(struct pollfd *pfds) { + struct route *r; + struct listener *l; + struct conn *c; + int count = 0; + +#define ADDFD(myfd) { \ +pfds[count].fd = myfd; \ +pfds[count].events = POLLIN; \ +count++; \ +} + + if (!conf.server) + ADDFD(fileno(stdin)) + + LIST_FOREACH(r, &routes, entry) { + switch (r->type) { + case ROUTE_PROC: + ADDFD(r->proc.fd[0]) + } + } + + LIST_FOREACH(l, &listeners, entry) { + ADDFD(l->sock) + LIST_FOREACH(c, &l->conns, entry) { + ADDFD(c->fd) + if (c->exec.cmd) + ADDFD(c->exec.fd[0]) + } + } + + return count; +} + +static int +cmd_init(struct conn *c, int fd, struct msg_header *hdr) { + struct conn *oldc; + + log_debug("received INIT from %c :)", hdr->orig); + if (c->state == CONN_READY) { + log_warn("received INIT on an already open connection !"); + send_cmd(hdr->orig, MSG_ERR, 0, NULL, 0); + return -1; + } + + /* checking if this client already has a connection opened on + * another fd */ + oldc = listener_conn_find_orig(hdr->orig); + if (oldc) + listener_conn_move(oldc, c); + + c->orig = hdr->orig; + c->state = CONN_READY; + + switch (hdr->type) { + case MSG_INIT: + c->async = 0; + break; + case MSG_INIT_ASYNC: + c->async = 1; + break; + } + + send_cmd(hdr->orig, MSG_OK, 0, NULL, 0); + + return 0; +} + +static int +cmd_exec(struct conn *c, int fd, struct msg_header *hdr) { + char *cmd = NULL; + int cmdlen; + char **argv = NULL; + int argc; + + cmdlen = readbuf(fd, (uint8_t **)&cmd, hdr->datalen); + if (cmdlen <= 0 || !cmd) + goto err; + argv = explode(cmd, cmdlen, " ", &argc); + if (!argv) + goto err; + if (listener_conn_exec(c, argv[0], argv) < 0) + goto err; + + send_cmd(hdr->orig, MSG_OK, 0, NULL, 0); + + return 0; + +err: + log_debug("exec failed"); + if (cmd) + free(cmd); + if (argv) + free(argv); + send_cmd(hdr->orig, MSG_ERR, 0, NULL, 0); + + return -1; +} + +static int +cmd_kill(struct conn *c, int fd, struct msg_header *hdr) { + listener_conn_exec_kill(c); + send_cmd(hdr->orig, MSG_OK, 0, NULL, 0); + + return 0; +} + +static int +cmd_read(struct conn *c, int fd, struct msg_header *hdr) { + int len; + + if (!c->exec.cmd) { + log_warn("cmd_read: no exec in progress !"); + goto err; + } + if (c->async == 0) { + log_warn("cmd_read: not in async mode !"); + goto err; + } + + len = send_cmd(c->orig, MSG_DATA, 0, + c->exec.async_writebuf, c->exec.async_writebuf_size); + if (len < 0) + goto err; + c->exec.async_writebuf_size -= len; + c->exec.async_writebuf = realloc(c->exec.async_writebuf, + c->exec.async_writebuf_size); + return 0; + +err: + send_cmd(c->orig, MSG_ERR, 0, NULL, 0); + return -1; +} + +static int +cmd_write(struct conn *c, int fd, struct msg_header *hdr) { + if (!c->exec.cmd) { + log_warn("cmd_write: no exec in progress !"); + goto err; + } + if (c->async == 0) { + log_warn("cmd_write: not in async mode !"); + goto err; + } + + if (msg_read_data_to_fd(fd, c->exec.fd[1], hdr->datalen) < 0) { + goto err; + } + + return 0; + +err: + send_cmd(c->orig, MSG_ERR, 0, NULL, 0); + return -1; +} + +static int +cmd_data_conn(struct conn *c, int fd, struct msg_header *hdr) { + log_debug("received DATA from conn %c !", hdr->orig); + + if (msg_read_data_to_fd(fd, c->exec.fd[1], hdr->datalen) < 0) { + log_warn("cmd_data: recvwrite failed"); + return -1; + } + + return 0; +} + +static int +cmd_data_route(struct route *r, int fd, struct msg_header *hdr) { + log_debug("received DATA from route %c !", hdr->orig); + + if (msg_read_data_to_fd(fd, fileno(stdout), hdr->datalen) < 0) { + log_warn("cmd_data: recvwrite failed"); + return -1; + } + + return 0; +} + +static int +cmd_ok(struct route *r, int fd, struct msg_header *hdr) { + log_info("received OK from %c !", hdr->orig); + + return 0; +} + +static int +cmd_err(struct route *r, int fd, struct msg_header *hdr) { + log_info("received ERROR from %c !", hdr->orig); + + return 0; +} + |