From 5e67a4962e242e05776d731a7cb6f5cdd5cfd9da Mon Sep 17 00:00:00 2001 From: Laurent Ghigonis Date: Sun, 2 Dec 2012 05:54:31 +0100 Subject: work in progress for sendbuf support. now it compiles :) --- libglouglou/libglouglou.c | 206 ++++++++++++++++++++++++++++++---------------- libglouglou/libglouglou.h | 4 +- libglouglou/sendbuf.c | 95 ++++++++++++--------- libglouglou/sendbuf.h | 32 +++---- 4 files changed, 210 insertions(+), 127 deletions(-) diff --git a/libglouglou/libglouglou.c b/libglouglou/libglouglou.c index 85993ce..01355d8 100644 --- a/libglouglou/libglouglou.c +++ b/libglouglou/libglouglou.c @@ -36,8 +36,10 @@ int cb_usr_send(void *, int, void *); int client_send(struct gg_client *, void *, int); void cb_cli_receive(evutil_socket_t, short, void *); void cb_cli_timer(evutil_socket_t, short, void *); +int cb_cli_send(void *, int, void *); struct gg_packet *pkt_decode(char **, int *); -struct gg_packet *pkt_encode(struct gg_packet *, int *); +int pkt_getsize(struct gg_packet *); +int pkt_encode(struct gg_packet *, struct gg_packet *); int _verbosity = 0; @@ -93,12 +95,6 @@ gg_server_start(struct event_base *ev_base, char *ip, int port, event_add(ev, NULL); srv->ev = ev; - sbuf = sendbuf_init(srv->ev_base, PACKET_SNDBUF_MAX * 2, PACKET_SNDBUF_MAX, - 200); - if (!sbuf) - goto err; - srv->sbuf = sbuf; - return srv; err: @@ -111,26 +107,40 @@ int gg_server_send(struct gg_server *srv, struct gg_packet *pkt, struct gg_user *usr) { static struct gg_packet *newpkt; + static struct gg_packet pktbuf; + struct gg_user *u; int size; + int res = 0; + + /* optimisation to use sendbuf_gettoken */ + if (!usr && srv->user_count == 1) + usr = LIST_FIRST(&srv->user_list); + + if (usr) { + size = pkt_getsize(pkt); + newpkt = sendbuf_gettoken(usr->sbuf, size); + if (!newpkt) + return -1; + return pkt_encode(pkt, newpkt); + } else { + size = pkt_encode(pkt, &pktbuf); + LIST_FOREACH(u, &srv->user_list, entry) { + res = res + sendbuf_append(u->sbuf, &pktbuf, size); + } + } - // XXX IN PROGRESS get size before - size = pkt_getsize(pkt); - newpkt = sendbuf_token_get(srv->sndbuf, size); - pkt_encode(pkt, newpkt); - sendbuf_token_ready(srv->sndbuf, newpkt); - - if (!newpkt) - return -1; - return server_send(srv, usr, newpkt, size); + return res; } void gg_server_stop(struct gg_server *srv) { - if (srv->sock) { - server_send(srv, NULL, "", 0); + struct gg_user *usr; + + LIST_FOREACH(usr, &srv->user_list, entry) + user_del(srv, usr); + if (srv->sock) close(srv->sock); - } if (srv->ev) event_del(srv->ev); free(srv); @@ -208,22 +218,34 @@ user_add(struct gg_server *srv, struct sockaddr_in *remote) usr = xcalloc(1, sizeof(struct gg_user)); usr->id = srv->user_id_count; srv->user_id_count++; + srv->user_count++; usr->sock = srv->sock; addrcpy(&usr->addr, remote); - sendbuf_queue_add(srv->sbuf, cb_usr_send, usr); + sbuf = sendbuf_new(srv->ev_base, PACKET_SNDBUF_MAX, 200, cb_usr_send, usr); + if (!sbuf) + goto err; + usr->sbuf = sbuf; LIST_INSERT_HEAD(&srv->user_list, usr, entry); verbose("Add user %d !", usr->id); return usr; + +err: + user_del(srv, usr); + return NULL; } void user_del(struct gg_server *srv, struct gg_user *usr) { verbose("Del user %d !", usr->id); + if (usr->sbuf) + sendbuf_free(usr->sbuf); + user_send(usr, "", 0); LIST_REMOVE(usr, entry); + srv->user_count--; free(usr); } @@ -245,12 +267,13 @@ user_find(struct gg_server *srv, struct sockaddr_in *remote) int user_send(struct gg_user *usr, void *data, int size) { - if (sendto(usr->sock, data, size, 0, - (struct sockaddr *)&usr->addr, sizeof(struct sockaddr_in)) == -1) { + int sent; + + sent = sendto(usr->sock, data, size, 0, (struct sockaddr *)&usr->addr, + sizeof(struct sockaddr_in)); + if (sent == -1) error("failed: %s", strerror(errno)); - return -1; - } - return 0; + return sent; } int @@ -259,10 +282,9 @@ cb_usr_send(void *data, int size, void *usrdata) struct gg_user *usr; usr = usrdata; - user_send(usr, NULL, data, size); + return user_send(usr, data, size); } - /* * Client */ @@ -281,6 +303,7 @@ gg_client_connect(struct event_base *ev_base, char *ip, int port, { struct gg_client *cli; struct sockaddr_in sock_addr; + struct sendbuf *sbuf; struct event *ev; struct timeval tv; int s; @@ -318,6 +341,11 @@ gg_client_connect(struct event_base *ev_base, char *ip, int port, tv.tv_sec = 0; evtimer_add(ev, &tv); + sbuf = sendbuf_new(cli->ev_base, PACKET_SNDBUF_MAX, 200, cb_cli_send, cli); + if (!sbuf) + goto err; + cli->sbuf = sbuf; + return cli; err: @@ -332,15 +360,18 @@ gg_client_send(struct gg_client *cli, struct gg_packet *pkt) static struct gg_packet *newpkt; int size; - newpkt = pkt_encode(pkt, &size); + size = pkt_getsize(pkt); + newpkt = sendbuf_gettoken(cli->sbuf, size); if (!newpkt) return -1; - return client_send(cli, newpkt, size); + return pkt_encode(pkt, newpkt); } void gg_client_disconnect(struct gg_client *cli) { + if (cli->sbuf) + sendbuf_free(cli->sbuf); if (cli->sock) { client_send(cli, "", 0); close(cli->sock); @@ -359,12 +390,13 @@ gg_client_disconnect(struct gg_client *cli) int client_send(struct gg_client *cli, void *data, int size) { - if (sendto(cli->sock, data, size, 0, - (struct sockaddr *)&cli->addr, sizeof(struct sockaddr_in)) == -1) { + int sent; + + sent = sendto(cli->sock, data, size, 0, (struct sockaddr *)&cli->addr, + sizeof(struct sockaddr_in)); + if (sent == -1) error("failed: %s", strerror(errno)); - return -1; - } - return 0; + return sent; } void cb_cli_receive(evutil_socket_t fd, short what, void *arg) @@ -440,6 +472,15 @@ void cb_cli_timer(evutil_socket_t fd, short what, void *arg) event_add(cli->ev_timer, &tv); } +int +cb_cli_send(void *data, int size, void *usrdata) +{ + struct gg_client *cli; + + cli = usrdata; + return client_send(cli, data, size); +} + /* * Packets - private */ @@ -547,72 +588,99 @@ invalid: * encodes a packet and returns the size before sending it on the wire. * it assumes that the packet contains correct content. */ -struct gg_packet * -pkt_encode(struct gg_packet *pkt, int *len) +int +pkt_encode(struct gg_packet *pkt, struct gg_packet *newpkt) { - static struct gg_packet newpkt; - int packet_len; - if (pkt->type < PACKET_TYPE_MIN || pkt->type > PACKET_TYPE_MAX) invalid("type"); - packet_len = gg_packet_props[pkt->type].size; // XXX never overflow ? - newpkt.ver = pkt->ver; - newpkt.type = pkt->type; + newpkt->ver = pkt->ver; + newpkt->type = pkt->type; switch(pkt->type) { case PACKET_NEWCONN: - newpkt.newconn_id = htons(pkt->newconn_id); - newpkt.newconn_src = htonl(pkt->newconn_src); - newpkt.newconn_dst = htonl(pkt->newconn_dst); - newpkt.newconn_proto = pkt->newconn_proto; - newpkt.newconn_size = pkt->newconn_size; + newpkt->newconn_id = htons(pkt->newconn_id); + newpkt->newconn_src = htonl(pkt->newconn_src); + newpkt->newconn_dst = htonl(pkt->newconn_dst); + newpkt->newconn_proto = pkt->newconn_proto; + newpkt->newconn_size = pkt->newconn_size; break; case PACKET_DELCONN: - newpkt.delconn_id = htons(pkt->delconn_id); + newpkt->delconn_id = htons(pkt->delconn_id); break; case PACKET_DATA: - newpkt.data_connid = htons(pkt->data_connid); - newpkt.data_size = pkt->data_size; + newpkt->data_connid = htons(pkt->data_connid); + newpkt->data_size = pkt->data_size; break; case PACKET_NAME: if (pkt->name_len > GG_PKTARG_MAX) goto invalid; - packet_len = packet_len + pkt->name_len; - newpkt.name_addr = htonl(pkt->name_addr); - newpkt.name_len = pkt->name_len; - strncpy((char *)newpkt.name_fqdn, (char *)pkt->name_fqdn, + newpkt->name_addr = htonl(pkt->name_addr); + newpkt->name_len = pkt->name_len; + strncpy((char *)newpkt->name_fqdn, (char *)pkt->name_fqdn, pkt->name_len); break; case PACKET_FORK: - newpkt.fork_pid = htonl(pkt->fork_pid); - newpkt.fork_ppid = htonl(pkt->fork_ppid); - newpkt.fork_cpid = htonl(pkt->fork_cpid); - newpkt.fork_tgid = htonl(pkt->fork_tgid); + newpkt->fork_pid = htonl(pkt->fork_pid); + newpkt->fork_ppid = htonl(pkt->fork_ppid); + newpkt->fork_cpid = htonl(pkt->fork_cpid); + newpkt->fork_tgid = htonl(pkt->fork_tgid); break; case PACKET_EXEC: if (pkt->exec_cmdlen > GG_PKTARG_MAX) goto invalid; - packet_len = packet_len + pkt->exec_cmdlen; - newpkt.exec_pid = htonl(pkt->exec_pid); - newpkt.exec_cmdlen = pkt->exec_cmdlen; - strncpy((char *)newpkt.exec_cmd, (char *)pkt->exec_cmd, + newpkt->exec_pid = htonl(pkt->exec_pid); + newpkt->exec_cmdlen = pkt->exec_cmdlen; + strncpy((char *)newpkt->exec_cmd, (char *)pkt->exec_cmd, pkt->exec_cmdlen); break; case PACKET_EXIT: - newpkt.exit_pid = htonl(pkt->exit_pid); - newpkt.exit_tgid = htonl(pkt->exit_tgid); - newpkt.exit_ecode = pkt->exit_ecode; + newpkt->exit_pid = htonl(pkt->exit_pid); + newpkt->exit_tgid = htonl(pkt->exit_tgid); + newpkt->exit_ecode = pkt->exit_ecode; break; default: error("Unsupported packet type"); - return NULL; + return -1; } - *len = packet_len; - return &newpkt; + return pkt_getsize(newpkt); invalid: error("invalid packet"); - return NULL; + return -1; +} + +int +pkt_getsize(struct gg_packet *pkt) +{ + int packet_len; + + packet_len = gg_packet_props[pkt->type].size; // XXX never overflow ? + switch(pkt->type) { + case PACKET_NEWCONN: + case PACKET_DELCONN: + case PACKET_DATA: + case PACKET_FORK: + case PACKET_EXIT: + break; + case PACKET_NAME: + if (pkt->name_len > GG_PKTARG_MAX) + goto invalid; + packet_len = packet_len + pkt->name_len; + break; + case PACKET_EXEC: + if (pkt->exec_cmdlen > GG_PKTARG_MAX) + goto invalid; + packet_len = packet_len + pkt->exec_cmdlen; + break; + default: + error("Unsupported packet type"); + return -1; + } + return packet_len; + +invalid: + error("invalid packet"); + return -1; } /* diff --git a/libglouglou/libglouglou.h b/libglouglou/libglouglou.h index 4cd17f2..989bdff 100644 --- a/libglouglou/libglouglou.h +++ b/libglouglou/libglouglou.h @@ -131,7 +131,8 @@ struct gg_server { int (*handle_conn)(struct gg_server *, struct gg_user *); int (*handle_packet)(struct gg_server *, struct gg_user *, struct gg_packet *); void *usrdata; - LIST_HEAD(, gg_user) user_list; + LIST_HEAD(, gg_user) user_list; + int user_count; int user_id_count; }; @@ -152,6 +153,7 @@ struct gg_client { int (*handle_conn)(struct gg_client *); int (*handle_packet)(struct gg_client *, struct gg_packet *); void *usrdata; + struct sendbuf *sbuf; }; struct gg_server *gg_server_start(struct event_base *, char *, int, diff --git a/libglouglou/sendbuf.c b/libglouglou/sendbuf.c index 078d6a6..7571cc4 100644 --- a/libglouglou/sendbuf.c +++ b/libglouglou/sendbuf.c @@ -1,32 +1,33 @@ +#include +#include + #include "sendbuf.h" +static int flushbuf(struct sendbuf *); +static void cb_timer(evutil_socket_t, short, void *); + /* * Public */ /* * Create a sendbuf - * send_func should return the number of successfuly sent bytes */ struct sendbuf * -sendbuf_init(struct event_base *ev_base, int size, int sndbuf_max, int msec_max, - int (*send_func)(void *, int, void *), void *usrdata) +sendbuf_new(struct event_base *ev_base, int buffer_size, int msec_max, + int (*send_func)(void *, int, void *), void *usrdata) { struct sendbuf *sbuf = NULL; - struct event ev_timer; - struct timeval *tv; - void *buffer; - void *data; + struct event *ev_timer; sbuf = calloc(1, sizeof(struct sendbuf)); if (!sbuf) return NULL; sbuf->ev_base = ev_base; - sbuf->sndbuf_max = sndbuf_max; sbuf->msec_max = msec_max; - sbuf->usrdata = usrdata; + sbuf->buffer_size = buffer_size; sbuf->send_func = send_func; - sbuf->buffer_size = size; + sbuf->usrdata = usrdata; sbuf->buffer = malloc(sbuf->buffer_size); if (!sbuf->buffer) goto err; @@ -34,20 +35,20 @@ sendbuf_init(struct event_base *ev_base, int size, int sndbuf_max, int msec_max, ev_timer = evtimer_new(ev_base, cb_timer, sbuf); sbuf->ev_timer = ev_timer; sbuf->ev_timer_tv.tv_usec = msec_max * 1000; - evtimer_add(ev, &sbuf->ev_timer_tv); + evtimer_add(ev_timer, &sbuf->ev_timer_tv); return sbuf; err: - sendbuf_shutdown(sbuf); + sendbuf_free(sbuf); return NULL; } void -sendbuf_shutdown(struct sendbuf *sbuf) +sendbuf_free(struct sendbuf *sbuf) { - if (sbuf->send_timer) - event_del(sbuf->send_timer); + if (sbuf->ev_timer) + event_del(sbuf->ev_timer); if (sbuf->buffer && sbuf->send_func) flushbuf(sbuf); if (sbuf->buffer) @@ -56,17 +57,41 @@ sendbuf_shutdown(struct sendbuf *sbuf) } /* - * Returns a buffer to write data to be sent + * Append to the token buffer data to be sent + * uses a memcpy, in contrary to sendbuf_gettoken(). + * return size on success, -1 on error + */ +int +sendbuf_append(struct sendbuf *sbuf, void *token, int size) +{ + if (sbuf->buffer_pos + size >= sbuf->buffer_size) + if (flushbuf(sbuf) == -1) + return -1; + + memcpy(sbuf->buffer + sbuf->buffer_pos, token, size); + sbuf->buffer_pos = sbuf->buffer_pos + size; + + return size; +} + +/* + * Returns a token buffer to write data to be sent + * avoids memcpy, in contrary to sendbuf_append(). * might return NULL if the sendbuf is temporary full */ void * -sendbuf_getoken(struct sendbuf *sbuf, int size) +sendbuf_gettoken(struct sendbuf *sbuf, int size) { + void *token; + if (sbuf->buffer_pos + size >= sbuf->buffer_size) if (flushbuf(sbuf) == -1) return NULL; - return sbuf->buffer_pos; + token = sbuf->buffer + sbuf->buffer_pos; + sbuf->buffer_pos = sbuf->buffer_pos + size; + + return token; } /* @@ -77,36 +102,30 @@ sendbuf_getoken(struct sendbuf *sbuf, int size) * Note that you can still add data to the buffer even if flushing is in * progress */ -int +static int flushbuf(struct sendbuf *sbuf) { - int sent; - int sent_total; - int len; + int tosend, sent; sbuf->flushing = 1; - do { - pos = sbuf->buffer + sbuf->flushed_len; - if (sbuf->buffer_pos > sbuf->sndbuf_max) - tosend = sbuf->sndbuf_max; - else - tosend = sbuf->buffer_pos; - sent = sbuf->send_func(sbuf->buffer + pos, tosend, sbuf->usrdata); - sbuf->flushed_len = sbuf->flushed_len + sent; - if (sent == -1) { - // XXX handle erorr - } else if (sent < tosend) { - return -1; - } - } while (sbuf->flushed_len < sbuf->buffer_pos) + tosend = sbuf->buffer_size - sbuf->flushing_pos; + sent = sbuf->send_func(sbuf->buffer + sbuf->flushing_pos, + tosend, sbuf->usrdata); + if (sent == -1) { + // XXX handle error + return -1; + } else if (sent < tosend) { + sbuf->flushing_pos = sbuf->flushing_pos + sent; + return -1; + } sbuf->flushing = 0; - sbuf->flushed_len = 0; + sbuf->flushing_pos = 0; return 0; } -void +static void cb_timer(evutil_socket_t fd, short what, void *arg) { struct sendbuf *sbuf; diff --git a/libglouglou/sendbuf.h b/libglouglou/sendbuf.h index 340e1c7..0290202 100644 --- a/libglouglou/sendbuf.h +++ b/libglouglou/sendbuf.h @@ -3,27 +3,21 @@ struct sendbuf { struct event_base *ev_base; - struct event *ev_timer; - struct timeval ev_timer_tv; - int sndbuf_max; + struct event *ev_timer; + struct timeval ev_timer_tv; int msec_max; -}; - -struct sendbuf_queue { - void *buffer; int buffer_size; - int buffer_pos; + void *buffer; + int buffer_pos; /* next to use in buffer */ + int flushing; + int flushing_pos; /* next to send in buffer */ int (*send_func)(void *, int, void *); void *usrdata; - int flushing; - int flushed_len; -} - -struct sendbuf *sendbuf_init(struct event_base *, int, int, int); -void sendbuf_shutdown(struct sendbuf *); -void sendbuf_queue_add(struct sendbuf *, - int (*send_func)(void *, int, void *), - void *); -void *sendbuf_token_get(struct sendbuf *, int); -void sendbuf_token_ready(struct sendbuf *, void *); +}; +struct sendbuf *sendbuf_new(struct event_base *, int, int, + int (*send_func)(void *, int, void *), + void *); +void sendbuf_free(struct sendbuf *); +int sendbuf_append(struct sendbuf *, void *, int); +void *sendbuf_gettoken(struct sendbuf *, int); -- cgit v1.2.3-59-g8ed1b