#include #include #include #include #include #include #include #include #include #include #include #include #include #include "libglouglou.h" #include "sendbuf.h" #define error(fmt, ...) \ if (_verbosity >= 0) \ printf("libgg: %s: ERROR: " fmt "\n", __func__, ##__VA_ARGS__) #define verbose(fmt, ...) \ if (_verbosity >= 1) \ printf("libgg: %s: " fmt "\n", __func__, ##__VA_ARGS__) #define debug(fmt, ...) \ if (_verbosity >= 2) \ printf("libgg: %s: XXX: " fmt "\n", __func__, ##__VA_ARGS__) void cb_srv_receive(evutil_socket_t, short, void *); struct gg_user *user_add(struct gg_server *, struct sockaddr_in *); void user_del(struct gg_server *, struct gg_user *); struct gg_user * user_find(struct gg_server *, struct sockaddr_in *); int user_send(struct gg_user *, void *, int); int cb_usr_send(void *, int, void *); int client_send(struct gg_client *, void *, int); void cb_cli_receive(evutil_socket_t, short, void *); void cb_cli_timer(evutil_socket_t, short, void *); int cb_cli_send(void *, int, void *); struct gg_packet *pkt_decode(char **, int *); int pkt_getsize(struct gg_packet *); int pkt_encode(struct gg_packet *, struct gg_packet *); int _verbosity = 0; gg_packet_props_t gg_packet_props[] = { [PACKET_NEWCONN] = \ { (PACKET_HEADER_SIZE + sizeof((struct gg_packet *)0)->pdat.newconn) }, [PACKET_DELCONN] = \ { (PACKET_HEADER_SIZE + sizeof((struct gg_packet *)0)->pdat.delconn) }, [PACKET_DATA] = \ { (PACKET_HEADER_SIZE + sizeof((struct gg_packet *)0)->pdat.data) }, [PACKET_NAME] = \ { ((PACKET_HEADER_SIZE + sizeof((struct gg_packet *)0)->pdat.name) - GG_PKTARG_MAX) }, [PACKET_FORK] = \ { (PACKET_HEADER_SIZE + sizeof((struct gg_packet *)0)->pdat.fork) }, [PACKET_EXEC] = \ { ((PACKET_HEADER_SIZE + sizeof((struct gg_packet *)0)->pdat.exec) - GG_PKTARG_MAX) }, [PACKET_EXIT] = \ { (PACKET_HEADER_SIZE + sizeof((struct gg_packet *)0)->pdat.exit) }, }; /* * Server */ /* * start a server * For security reasons, do not set handle_packet if you don't need it. * Also note that the packet passed by handle_packet is static, so you should * not modify it or free it. */ struct gg_server * gg_server_start(struct event_base *ev_base, char *ip, int port, int (*handle_conn)(struct gg_server *, struct gg_user *), int (*handle_packet)(struct gg_server *, struct gg_user *, struct gg_packet *), void *usrdata) { struct gg_server *srv; struct sockaddr_in sock_addr; struct event *ev; int s; int sock_on = 1; srv = xcalloc(1, sizeof(struct gg_server)); srv->ev_base = ev_base; srv->ip = ip; srv->port = port; srv->handle_conn = handle_conn; srv->handle_packet = handle_packet; srv->usrdata = usrdata; s = socket(AF_INET, SOCK_DGRAM, 0); if (s < 0) goto err; srv->sock = s; setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &sock_on, sizeof(sock_on)); fd_nonblock(s); bzero(&sock_addr, sizeof(sock_addr)); sock_addr.sin_family = AF_INET; sock_addr.sin_addr.s_addr = inet_addr(ip); sock_addr.sin_port = htons(port); addrcpy(&srv->addr, &sock_addr); if (bind(s, (struct sockaddr *)&sock_addr, sizeof(sock_addr)) != 0) goto err; ev = event_new(ev_base, s, EV_READ|EV_PERSIST, cb_srv_receive, srv); event_add(ev, NULL); srv->ev = ev; return srv; err: error("%s", strerror(errno)); gg_server_stop(srv); return NULL; } void gg_server_stop(struct gg_server *srv) { struct gg_user *usr; while ((usr = LIST_FIRST(&srv->user_list))) { user_del(srv, usr); } if (srv->sock) close(srv->sock); if (srv->ev) event_del(srv->ev); free(srv); } int gg_server_send(struct gg_server *srv, struct gg_packet *pkt, struct gg_user *usr) { static struct gg_packet *newpkt; static struct gg_packet pktbuf; struct gg_user *u; int size; int res = 0; /* optimisation to use sendbuf_gettoken */ if (!usr && srv->user_count == 1) usr = LIST_FIRST(&srv->user_list); if (usr) { size = pkt_getsize(pkt); newpkt = sendbuf_gettoken(usr->sbuf, size); if (!newpkt) return -1; return pkt_encode(pkt, newpkt); } else { size = pkt_encode(pkt, &pktbuf); LIST_FOREACH(u, &srv->user_list, entry) { res = res + sendbuf_append(u->sbuf, &pktbuf, size); } } return res; } void gg_server_send_flush(struct gg_server *srv, struct gg_user *usr) { struct gg_user *u; if (usr) sendbuf_flush(usr->sbuf); else LIST_FOREACH(u, &srv->user_list, entry) sendbuf_flush(u->sbuf); } /* * Server - private */ void cb_srv_receive(evutil_socket_t fd, short what, void *arg) { struct gg_server *srv; struct gg_user *usr; struct gg_packet *pkt; struct sockaddr_in remote; socklen_t remote_len; char buf[PACKET_BUFFER_SIZE]; char *buf_p; int buf_len; int len; srv = arg; remote_len = sizeof(struct sockaddr_in); len = recvfrom(fd, buf, sizeof(buf), 0, (struct sockaddr *)&remote, &remote_len); if (len < 0) { error("recvfrom failed"); return; } usr = user_find(srv, &remote); if (!usr) { usr = user_add(srv, &remote); if (srv->handle_conn) srv->handle_conn(srv, usr); user_send(usr, "", 0); } else { debug("Incoming data from existing user !"); if (len == 0) { user_del(srv, usr); return; } if (srv->handle_packet) { buf_p = buf; buf_len = len; while (buf_len > 0 && (pkt = pkt_decode(&buf_p, &buf_len))) srv->handle_packet(srv, usr, pkt); if (buf_len > 0) { /* XXX store incomplete packet for next recv */ error("incomplete packet, dropped %d bytes !", buf_len); } } } } struct gg_user * user_add(struct gg_server *srv, struct sockaddr_in *remote) { struct gg_user *usr; struct sendbuf *sbuf; usr = xcalloc(1, sizeof(struct gg_user)); usr->id = srv->user_id_count; srv->user_id_count++; srv->user_count++; usr->sock = srv->sock; addrcpy(&usr->addr, remote); sbuf = sendbuf_new(srv->ev_base, PACKET_SNDBUF_MAX, 200, cb_usr_send, usr); if (!sbuf) goto err; usr->sbuf = sbuf; LIST_INSERT_HEAD(&srv->user_list, usr, entry); verbose("Add user %d !", usr->id); return usr; err: user_del(srv, usr); return NULL; } void user_del(struct gg_server *srv, struct gg_user *usr) { verbose("Del user %d !", usr->id); if (usr->sbuf) sendbuf_free(usr->sbuf); user_send(usr, "", 0); LIST_REMOVE(usr, entry); srv->user_count--; free(usr); } struct gg_user * user_find(struct gg_server *srv, struct sockaddr_in *remote) { struct gg_user *usr; struct sockaddr_in *u; LIST_FOREACH(usr, &srv->user_list, entry) { u = &usr->addr; if (u->sin_addr.s_addr == remote->sin_addr.s_addr && u->sin_port == remote->sin_port) return usr; } return NULL; } int user_send(struct gg_user *usr, void *data, int size) { int sent; sent = sendto(usr->sock, data, size, 0, (struct sockaddr *)&usr->addr, sizeof(struct sockaddr_in)); if (sent == -1) error("failed: %s", strerror(errno)); return sent; } int cb_usr_send(void *data, int size, void *usrdata) { struct gg_user *usr; usr = usrdata; return user_send(usr, data, size); } /* * Client */ /* * connect to a server * For security reasons, do not set handle_packet if you don't need it. * Also note that the packet passed by handle_packet is static, so you should * not modify it or free it. */ struct gg_client * gg_client_connect(struct event_base *ev_base, char *ip, int port, int (*handle_conn)(struct gg_client *), int (*handle_packet)(struct gg_client *, struct gg_packet *), void *usrdata) { struct gg_client *cli; struct sockaddr_in sock_addr; struct sendbuf *sbuf; struct event *ev; struct timeval tv; int s; int sock_on = 1; cli = xcalloc(1, sizeof(struct gg_client)); cli->ev_base = ev_base; cli->ip = ip; cli->port = port; cli->handle_conn = handle_conn; cli->handle_packet = handle_packet; cli->usrdata = usrdata; cli->status = GG_CLIENT_STATUS_CONNECTING; s = socket(AF_INET, SOCK_DGRAM, 0); if (s < 0) goto err; cli->sock = s; setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &sock_on, sizeof(sock_on)); fd_nonblock(s); bzero(&sock_addr, sizeof(sock_addr)); sock_addr.sin_family = AF_INET; sock_addr.sin_addr.s_addr=inet_addr(ip); sock_addr.sin_port=htons(port); addrcpy(&cli->addr, &sock_addr); ev = event_new(ev_base, s, EV_READ|EV_PERSIST, cb_cli_receive, cli); event_add(ev, NULL); cli->ev = ev; ev = evtimer_new(ev_base, cb_cli_timer, cli); cli->ev_timer = ev; bzero(&tv, sizeof(struct timeval)); tv.tv_sec = 0; evtimer_add(ev, &tv); sbuf = sendbuf_new(cli->ev_base, PACKET_SNDBUF_MAX, 200, cb_cli_send, cli); if (!sbuf) goto err; cli->sbuf = sbuf; return cli; err: error("Error ! %s", strerror(errno)); gg_client_disconnect(cli); return NULL; } void gg_client_disconnect(struct gg_client *cli) { if (cli->sbuf) sendbuf_free(cli->sbuf); if (cli->sock) { client_send(cli, "", 0); close(cli->sock); } if (cli->ev) event_del(cli->ev); if (cli->ev_timer) event_del(cli->ev_timer); free(cli); } int gg_client_send(struct gg_client *cli, struct gg_packet *pkt) { static struct gg_packet *newpkt; int size; size = pkt_getsize(pkt); newpkt = sendbuf_gettoken(cli->sbuf, size); if (!newpkt) return -1; return pkt_encode(pkt, newpkt); } void gg_client_send_flush(struct gg_client *cli) { sendbuf_flush(cli->sbuf); } /* * Client - private */ int client_send(struct gg_client *cli, void *data, int size) { int sent; sent = sendto(cli->sock, data, size, 0, (struct sockaddr *)&cli->addr, sizeof(struct sockaddr_in)); if (sent == -1) error("failed: %s", strerror(errno)); return sent; } void cb_cli_receive(evutil_socket_t fd, short what, void *arg) { struct gg_client *cli; struct gg_packet *pkt; struct sockaddr_in remote; socklen_t remote_len; char buf[PACKET_BUFFER_SIZE]; char *buf_p; int buf_len; int len; cli = arg; remote_len = sizeof(struct sockaddr_in); len = recvfrom(fd, buf, sizeof(buf), 0, (struct sockaddr *)&remote, &remote_len); if (addrcmp(&cli->addr, &remote)) { error("receiving from stranger !"); return; } if (len < 0) { error("recvfrom failed"); return; } switch (cli->status) { case GG_CLIENT_STATUS_CONNECTING: verbose("Connected !"); cli->status = GG_CLIENT_STATUS_CONNECTED; if (cli->handle_conn) cli->handle_conn(cli); break; case GG_CLIENT_STATUS_CONNECTED: if (len == 0) { verbose("libglouglou: cb_cli_receive: recvfrom = 0"); cli->status = GG_CLIENT_STATUS_CONNECTING; return; } debug("Incoming data !"); if (cli->handle_packet) { buf_p = buf; buf_len = len; while (buf_len > 0 && (pkt = pkt_decode(&buf_p, &buf_len))) cli->handle_packet(cli, pkt); if (buf_len > 0) { /* XXX store incomplete packet for next recv */ error("incomplete packet, dropped %d bytes !", buf_len); } } break; } } void cb_cli_timer(evutil_socket_t fd, short what, void *arg) { struct timeval tv; struct gg_client *cli; cli = arg; switch (cli->status) { case GG_CLIENT_STATUS_CONNECTING: client_send(cli, "", 0); break; case GG_CLIENT_STATUS_CONNECTED: // XXX send keepalive break; } bzero(&tv, sizeof(struct timeval)); tv.tv_sec = 2; event_add(cli->ev_timer, &tv); } int cb_cli_send(void *data, int size, void *usrdata) { struct gg_client *cli; cli = usrdata; return client_send(cli, data, size); } /* * Packets - private */ struct gg_packet * pkt_decode(char **buf, int *buf_len) { static struct gg_packet newpkt; struct gg_packet *pkt; int len; int packet_len; len = *buf_len; #define invalid(msg) \ { verbose(msg); goto invalid; } #define incomplete(msg) \ { verbose(msg); goto incomplete; } if (len < PACKET_HEADER_SIZE) invalid("len"); if (len > PACKET_BUFFER_SIZE) invalid("len"); pkt = (struct gg_packet *)*buf; if (pkt->ver != PACKET_VERSION) invalid("ver"); if (pkt->type < PACKET_TYPE_MIN || pkt->type > PACKET_TYPE_MAX) invalid("type"); packet_len = gg_packet_props[pkt->type].size; // XXX never overflow ? debug("type %d: %d %d", pkt->type, len, packet_len); if (len < packet_len) invalid("type len"); newpkt.ver = pkt->ver; newpkt.type = pkt->type; switch(pkt->type) { case PACKET_NEWCONN: newpkt.newconn_id = ntohs(pkt->newconn_id); newpkt.newconn_src = ntohl(pkt->newconn_src); newpkt.newconn_dst = ntohl(pkt->newconn_dst); newpkt.newconn_proto = pkt->newconn_proto; newpkt.newconn_size = pkt->newconn_size; break; case PACKET_DELCONN: newpkt.delconn_id = ntohs(pkt->delconn_id); break; case PACKET_DATA: newpkt.data_connid = ntohs(pkt->data_connid); newpkt.data_size = pkt->data_size; break; case PACKET_NAME: newpkt.name_addr = ntohl(pkt->name_addr); newpkt.name_len = pkt->name_len; if (newpkt.name_len > GG_PKTARG_MAX) invalid("type name name_len"); if (len < packet_len + newpkt.name_len) goto incomplete; packet_len = packet_len + newpkt.name_len; strncpy((char *)newpkt.name_fqdn, (char *)pkt->name_fqdn, newpkt.name_len); break; case PACKET_FORK: newpkt.fork_pid = ntohl(pkt->fork_pid); newpkt.fork_ppid = ntohl(pkt->fork_ppid); newpkt.fork_cpid = ntohl(pkt->fork_cpid); newpkt.fork_tgid = ntohl(pkt->fork_tgid); break; case PACKET_EXEC: newpkt.exec_pid = ntohl(pkt->exec_pid); newpkt.exec_cmdlen = pkt->exec_cmdlen; if (newpkt.exec_cmdlen > GG_PKTARG_MAX) invalid("type exec cmdlen"); if (len < packet_len + newpkt.exec_cmdlen) goto incomplete; packet_len = packet_len + newpkt.exec_cmdlen; strncpy((char *)newpkt.exec_cmd, (char *)pkt->exec_cmd, newpkt.exec_cmdlen); break; case PACKET_EXIT: newpkt.exit_pid = ntohl(pkt->exit_pid); newpkt.exit_tgid = ntohl(pkt->exit_tgid); newpkt.exit_ecode = pkt->exit_ecode; break; default: goto invalid; invalid("type switch"); } *buf = *buf + packet_len; *buf_len = len - packet_len; return &newpkt; incomplete: error("incomplete packet"); *buf_len = len; return NULL; invalid: error("invalid packet"); *buf_len = 0; return NULL; } /* * encodes a packet and returns the size before sending it on the wire. * it assumes that the packet contains correct content. */ int pkt_encode(struct gg_packet *pkt, struct gg_packet *newpkt) { if (pkt->type < PACKET_TYPE_MIN || pkt->type > PACKET_TYPE_MAX) invalid("type"); newpkt->ver = pkt->ver; newpkt->type = pkt->type; switch(pkt->type) { case PACKET_NEWCONN: newpkt->newconn_id = htons(pkt->newconn_id); newpkt->newconn_src = htonl(pkt->newconn_src); newpkt->newconn_dst = htonl(pkt->newconn_dst); newpkt->newconn_proto = pkt->newconn_proto; newpkt->newconn_size = pkt->newconn_size; break; case PACKET_DELCONN: newpkt->delconn_id = htons(pkt->delconn_id); break; case PACKET_DATA: newpkt->data_connid = htons(pkt->data_connid); newpkt->data_size = pkt->data_size; break; case PACKET_NAME: if (pkt->name_len > GG_PKTARG_MAX) goto invalid; newpkt->name_addr = htonl(pkt->name_addr); newpkt->name_len = pkt->name_len; strncpy((char *)newpkt->name_fqdn, (char *)pkt->name_fqdn, pkt->name_len); break; case PACKET_FORK: newpkt->fork_pid = htonl(pkt->fork_pid); newpkt->fork_ppid = htonl(pkt->fork_ppid); newpkt->fork_cpid = htonl(pkt->fork_cpid); newpkt->fork_tgid = htonl(pkt->fork_tgid); break; case PACKET_EXEC: if (pkt->exec_cmdlen > GG_PKTARG_MAX) goto invalid; newpkt->exec_pid = htonl(pkt->exec_pid); newpkt->exec_cmdlen = pkt->exec_cmdlen; strncpy((char *)newpkt->exec_cmd, (char *)pkt->exec_cmd, pkt->exec_cmdlen); break; case PACKET_EXIT: newpkt->exit_pid = htonl(pkt->exit_pid); newpkt->exit_tgid = htonl(pkt->exit_tgid); newpkt->exit_ecode = pkt->exit_ecode; break; default: error("Unsupported packet type"); return -1; } return pkt_getsize(newpkt); invalid: error("invalid packet"); return -1; } int pkt_getsize(struct gg_packet *pkt) { int packet_len; packet_len = gg_packet_props[pkt->type].size; // XXX never overflow ? switch(pkt->type) { case PACKET_NEWCONN: case PACKET_DELCONN: case PACKET_DATA: case PACKET_FORK: case PACKET_EXIT: break; case PACKET_NAME: if (pkt->name_len > GG_PKTARG_MAX) goto invalid; packet_len = packet_len + pkt->name_len; break; case PACKET_EXEC: if (pkt->exec_cmdlen > GG_PKTARG_MAX) goto invalid; packet_len = packet_len + pkt->exec_cmdlen; break; default: error("Unsupported packet type"); return -1; } return packet_len; invalid: error("invalid packet"); return -1; } /* * Verbosity */ int gg_verbosity_get(void) { return _verbosity; } void gg_verbosity_set(int verb) { _verbosity = verb; }