aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLaurent Ghigonis <laurent@p1sec.com>2012-12-02 05:54:31 +0100
committerLaurent Ghigonis <laurent@p1sec.com>2012-12-02 05:54:31 +0100
commit5e67a4962e242e05776d731a7cb6f5cdd5cfd9da (patch)
treefcd38b9334320b977eca2cfbda5c050f6a87050a
parentwork in progress on sendbuf (diff)
downloadglouglou-5e67a4962e242e05776d731a7cb6f5cdd5cfd9da.tar.xz
glouglou-5e67a4962e242e05776d731a7cb6f5cdd5cfd9da.zip
work in progress for sendbuf support.
now it compiles :)
-rw-r--r--libglouglou/libglouglou.c206
-rw-r--r--libglouglou/libglouglou.h4
-rw-r--r--libglouglou/sendbuf.c95
-rw-r--r--libglouglou/sendbuf.h32
4 files changed, 210 insertions, 127 deletions
diff --git a/libglouglou/libglouglou.c b/libglouglou/libglouglou.c
index 85993ce..01355d8 100644
--- a/libglouglou/libglouglou.c
+++ b/libglouglou/libglouglou.c
@@ -36,8 +36,10 @@ int cb_usr_send(void *, int, void *);
int client_send(struct gg_client *, void *, int);
void cb_cli_receive(evutil_socket_t, short, void *);
void cb_cli_timer(evutil_socket_t, short, void *);
+int cb_cli_send(void *, int, void *);
struct gg_packet *pkt_decode(char **, int *);
-struct gg_packet *pkt_encode(struct gg_packet *, int *);
+int pkt_getsize(struct gg_packet *);
+int pkt_encode(struct gg_packet *, struct gg_packet *);
int _verbosity = 0;
@@ -93,12 +95,6 @@ gg_server_start(struct event_base *ev_base, char *ip, int port,
event_add(ev, NULL);
srv->ev = ev;
- sbuf = sendbuf_init(srv->ev_base, PACKET_SNDBUF_MAX * 2, PACKET_SNDBUF_MAX,
- 200);
- if (!sbuf)
- goto err;
- srv->sbuf = sbuf;
-
return srv;
err:
@@ -111,26 +107,40 @@ int
gg_server_send(struct gg_server *srv, struct gg_packet *pkt, struct gg_user *usr)
{
static struct gg_packet *newpkt;
+ static struct gg_packet pktbuf;
+ struct gg_user *u;
int size;
+ int res = 0;
+
+ /* optimisation to use sendbuf_gettoken */
+ if (!usr && srv->user_count == 1)
+ usr = LIST_FIRST(&srv->user_list);
+
+ if (usr) {
+ size = pkt_getsize(pkt);
+ newpkt = sendbuf_gettoken(usr->sbuf, size);
+ if (!newpkt)
+ return -1;
+ return pkt_encode(pkt, newpkt);
+ } else {
+ size = pkt_encode(pkt, &pktbuf);
+ LIST_FOREACH(u, &srv->user_list, entry) {
+ res = res + sendbuf_append(u->sbuf, &pktbuf, size);
+ }
+ }
- // XXX IN PROGRESS get size before
- size = pkt_getsize(pkt);
- newpkt = sendbuf_token_get(srv->sndbuf, size);
- pkt_encode(pkt, newpkt);
- sendbuf_token_ready(srv->sndbuf, newpkt);
-
- if (!newpkt)
- return -1;
- return server_send(srv, usr, newpkt, size);
+ return res;
}
void
gg_server_stop(struct gg_server *srv)
{
- if (srv->sock) {
- server_send(srv, NULL, "", 0);
+ struct gg_user *usr;
+
+ LIST_FOREACH(usr, &srv->user_list, entry)
+ user_del(srv, usr);
+ if (srv->sock)
close(srv->sock);
- }
if (srv->ev)
event_del(srv->ev);
free(srv);
@@ -208,22 +218,34 @@ user_add(struct gg_server *srv, struct sockaddr_in *remote)
usr = xcalloc(1, sizeof(struct gg_user));
usr->id = srv->user_id_count;
srv->user_id_count++;
+ srv->user_count++;
usr->sock = srv->sock;
addrcpy(&usr->addr, remote);
- sendbuf_queue_add(srv->sbuf, cb_usr_send, usr);
+ sbuf = sendbuf_new(srv->ev_base, PACKET_SNDBUF_MAX, 200, cb_usr_send, usr);
+ if (!sbuf)
+ goto err;
+ usr->sbuf = sbuf;
LIST_INSERT_HEAD(&srv->user_list, usr, entry);
verbose("Add user %d !", usr->id);
return usr;
+
+err:
+ user_del(srv, usr);
+ return NULL;
}
void
user_del(struct gg_server *srv, struct gg_user *usr)
{
verbose("Del user %d !", usr->id);
+ if (usr->sbuf)
+ sendbuf_free(usr->sbuf);
+ user_send(usr, "", 0);
LIST_REMOVE(usr, entry);
+ srv->user_count--;
free(usr);
}
@@ -245,12 +267,13 @@ user_find(struct gg_server *srv, struct sockaddr_in *remote)
int
user_send(struct gg_user *usr, void *data, int size)
{
- if (sendto(usr->sock, data, size, 0,
- (struct sockaddr *)&usr->addr, sizeof(struct sockaddr_in)) == -1) {
+ int sent;
+
+ sent = sendto(usr->sock, data, size, 0, (struct sockaddr *)&usr->addr,
+ sizeof(struct sockaddr_in));
+ if (sent == -1)
error("failed: %s", strerror(errno));
- return -1;
- }
- return 0;
+ return sent;
}
int
@@ -259,10 +282,9 @@ cb_usr_send(void *data, int size, void *usrdata)
struct gg_user *usr;
usr = usrdata;
- user_send(usr, NULL, data, size);
+ return user_send(usr, data, size);
}
-
/*
* Client
*/
@@ -281,6 +303,7 @@ gg_client_connect(struct event_base *ev_base, char *ip, int port,
{
struct gg_client *cli;
struct sockaddr_in sock_addr;
+ struct sendbuf *sbuf;
struct event *ev;
struct timeval tv;
int s;
@@ -318,6 +341,11 @@ gg_client_connect(struct event_base *ev_base, char *ip, int port,
tv.tv_sec = 0;
evtimer_add(ev, &tv);
+ sbuf = sendbuf_new(cli->ev_base, PACKET_SNDBUF_MAX, 200, cb_cli_send, cli);
+ if (!sbuf)
+ goto err;
+ cli->sbuf = sbuf;
+
return cli;
err:
@@ -332,15 +360,18 @@ gg_client_send(struct gg_client *cli, struct gg_packet *pkt)
static struct gg_packet *newpkt;
int size;
- newpkt = pkt_encode(pkt, &size);
+ size = pkt_getsize(pkt);
+ newpkt = sendbuf_gettoken(cli->sbuf, size);
if (!newpkt)
return -1;
- return client_send(cli, newpkt, size);
+ return pkt_encode(pkt, newpkt);
}
void
gg_client_disconnect(struct gg_client *cli)
{
+ if (cli->sbuf)
+ sendbuf_free(cli->sbuf);
if (cli->sock) {
client_send(cli, "", 0);
close(cli->sock);
@@ -359,12 +390,13 @@ gg_client_disconnect(struct gg_client *cli)
int
client_send(struct gg_client *cli, void *data, int size)
{
- if (sendto(cli->sock, data, size, 0,
- (struct sockaddr *)&cli->addr, sizeof(struct sockaddr_in)) == -1) {
+ int sent;
+
+ sent = sendto(cli->sock, data, size, 0, (struct sockaddr *)&cli->addr,
+ sizeof(struct sockaddr_in));
+ if (sent == -1)
error("failed: %s", strerror(errno));
- return -1;
- }
- return 0;
+ return sent;
}
void cb_cli_receive(evutil_socket_t fd, short what, void *arg)
@@ -440,6 +472,15 @@ void cb_cli_timer(evutil_socket_t fd, short what, void *arg)
event_add(cli->ev_timer, &tv);
}
+int
+cb_cli_send(void *data, int size, void *usrdata)
+{
+ struct gg_client *cli;
+
+ cli = usrdata;
+ return client_send(cli, data, size);
+}
+
/*
* Packets - private
*/
@@ -547,72 +588,99 @@ invalid:
* encodes a packet and returns the size before sending it on the wire.
* it assumes that the packet contains correct content.
*/
-struct gg_packet *
-pkt_encode(struct gg_packet *pkt, int *len)
+int
+pkt_encode(struct gg_packet *pkt, struct gg_packet *newpkt)
{
- static struct gg_packet newpkt;
- int packet_len;
-
if (pkt->type < PACKET_TYPE_MIN || pkt->type > PACKET_TYPE_MAX)
invalid("type");
- packet_len = gg_packet_props[pkt->type].size; // XXX never overflow ?
- newpkt.ver = pkt->ver;
- newpkt.type = pkt->type;
+ newpkt->ver = pkt->ver;
+ newpkt->type = pkt->type;
switch(pkt->type) {
case PACKET_NEWCONN:
- newpkt.newconn_id = htons(pkt->newconn_id);
- newpkt.newconn_src = htonl(pkt->newconn_src);
- newpkt.newconn_dst = htonl(pkt->newconn_dst);
- newpkt.newconn_proto = pkt->newconn_proto;
- newpkt.newconn_size = pkt->newconn_size;
+ newpkt->newconn_id = htons(pkt->newconn_id);
+ newpkt->newconn_src = htonl(pkt->newconn_src);
+ newpkt->newconn_dst = htonl(pkt->newconn_dst);
+ newpkt->newconn_proto = pkt->newconn_proto;
+ newpkt->newconn_size = pkt->newconn_size;
break;
case PACKET_DELCONN:
- newpkt.delconn_id = htons(pkt->delconn_id);
+ newpkt->delconn_id = htons(pkt->delconn_id);
break;
case PACKET_DATA:
- newpkt.data_connid = htons(pkt->data_connid);
- newpkt.data_size = pkt->data_size;
+ newpkt->data_connid = htons(pkt->data_connid);
+ newpkt->data_size = pkt->data_size;
break;
case PACKET_NAME:
if (pkt->name_len > GG_PKTARG_MAX)
goto invalid;
- packet_len = packet_len + pkt->name_len;
- newpkt.name_addr = htonl(pkt->name_addr);
- newpkt.name_len = pkt->name_len;
- strncpy((char *)newpkt.name_fqdn, (char *)pkt->name_fqdn,
+ newpkt->name_addr = htonl(pkt->name_addr);
+ newpkt->name_len = pkt->name_len;
+ strncpy((char *)newpkt->name_fqdn, (char *)pkt->name_fqdn,
pkt->name_len);
break;
case PACKET_FORK:
- newpkt.fork_pid = htonl(pkt->fork_pid);
- newpkt.fork_ppid = htonl(pkt->fork_ppid);
- newpkt.fork_cpid = htonl(pkt->fork_cpid);
- newpkt.fork_tgid = htonl(pkt->fork_tgid);
+ newpkt->fork_pid = htonl(pkt->fork_pid);
+ newpkt->fork_ppid = htonl(pkt->fork_ppid);
+ newpkt->fork_cpid = htonl(pkt->fork_cpid);
+ newpkt->fork_tgid = htonl(pkt->fork_tgid);
break;
case PACKET_EXEC:
if (pkt->exec_cmdlen > GG_PKTARG_MAX)
goto invalid;
- packet_len = packet_len + pkt->exec_cmdlen;
- newpkt.exec_pid = htonl(pkt->exec_pid);
- newpkt.exec_cmdlen = pkt->exec_cmdlen;
- strncpy((char *)newpkt.exec_cmd, (char *)pkt->exec_cmd,
+ newpkt->exec_pid = htonl(pkt->exec_pid);
+ newpkt->exec_cmdlen = pkt->exec_cmdlen;
+ strncpy((char *)newpkt->exec_cmd, (char *)pkt->exec_cmd,
pkt->exec_cmdlen);
break;
case PACKET_EXIT:
- newpkt.exit_pid = htonl(pkt->exit_pid);
- newpkt.exit_tgid = htonl(pkt->exit_tgid);
- newpkt.exit_ecode = pkt->exit_ecode;
+ newpkt->exit_pid = htonl(pkt->exit_pid);
+ newpkt->exit_tgid = htonl(pkt->exit_tgid);
+ newpkt->exit_ecode = pkt->exit_ecode;
break;
default:
error("Unsupported packet type");
- return NULL;
+ return -1;
}
- *len = packet_len;
- return &newpkt;
+ return pkt_getsize(newpkt);
invalid:
error("invalid packet");
- return NULL;
+ return -1;
+}
+
+int
+pkt_getsize(struct gg_packet *pkt)
+{
+ int packet_len;
+
+ packet_len = gg_packet_props[pkt->type].size; // XXX never overflow ?
+ switch(pkt->type) {
+ case PACKET_NEWCONN:
+ case PACKET_DELCONN:
+ case PACKET_DATA:
+ case PACKET_FORK:
+ case PACKET_EXIT:
+ break;
+ case PACKET_NAME:
+ if (pkt->name_len > GG_PKTARG_MAX)
+ goto invalid;
+ packet_len = packet_len + pkt->name_len;
+ break;
+ case PACKET_EXEC:
+ if (pkt->exec_cmdlen > GG_PKTARG_MAX)
+ goto invalid;
+ packet_len = packet_len + pkt->exec_cmdlen;
+ break;
+ default:
+ error("Unsupported packet type");
+ return -1;
+ }
+ return packet_len;
+
+invalid:
+ error("invalid packet");
+ return -1;
}
/*
diff --git a/libglouglou/libglouglou.h b/libglouglou/libglouglou.h
index 4cd17f2..989bdff 100644
--- a/libglouglou/libglouglou.h
+++ b/libglouglou/libglouglou.h
@@ -131,7 +131,8 @@ struct gg_server {
int (*handle_conn)(struct gg_server *, struct gg_user *);
int (*handle_packet)(struct gg_server *, struct gg_user *, struct gg_packet *);
void *usrdata;
- LIST_HEAD(, gg_user) user_list;
+ LIST_HEAD(, gg_user) user_list;
+ int user_count;
int user_id_count;
};
@@ -152,6 +153,7 @@ struct gg_client {
int (*handle_conn)(struct gg_client *);
int (*handle_packet)(struct gg_client *, struct gg_packet *);
void *usrdata;
+ struct sendbuf *sbuf;
};
struct gg_server *gg_server_start(struct event_base *, char *, int,
diff --git a/libglouglou/sendbuf.c b/libglouglou/sendbuf.c
index 078d6a6..7571cc4 100644
--- a/libglouglou/sendbuf.c
+++ b/libglouglou/sendbuf.c
@@ -1,32 +1,33 @@
+#include <stdlib.h>
+#include <string.h>
+
#include "sendbuf.h"
+static int flushbuf(struct sendbuf *);
+static void cb_timer(evutil_socket_t, short, void *);
+
/*
* Public
*/
/*
* Create a sendbuf
- * send_func should return the number of successfuly sent bytes
*/
struct sendbuf *
-sendbuf_init(struct event_base *ev_base, int size, int sndbuf_max, int msec_max,
- int (*send_func)(void *, int, void *), void *usrdata)
+sendbuf_new(struct event_base *ev_base, int buffer_size, int msec_max,
+ int (*send_func)(void *, int, void *), void *usrdata)
{
struct sendbuf *sbuf = NULL;
- struct event ev_timer;
- struct timeval *tv;
- void *buffer;
- void *data;
+ struct event *ev_timer;
sbuf = calloc(1, sizeof(struct sendbuf));
if (!sbuf)
return NULL;
sbuf->ev_base = ev_base;
- sbuf->sndbuf_max = sndbuf_max;
sbuf->msec_max = msec_max;
- sbuf->usrdata = usrdata;
+ sbuf->buffer_size = buffer_size;
sbuf->send_func = send_func;
- sbuf->buffer_size = size;
+ sbuf->usrdata = usrdata;
sbuf->buffer = malloc(sbuf->buffer_size);
if (!sbuf->buffer)
goto err;
@@ -34,20 +35,20 @@ sendbuf_init(struct event_base *ev_base, int size, int sndbuf_max, int msec_max,
ev_timer = evtimer_new(ev_base, cb_timer, sbuf);
sbuf->ev_timer = ev_timer;
sbuf->ev_timer_tv.tv_usec = msec_max * 1000;
- evtimer_add(ev, &sbuf->ev_timer_tv);
+ evtimer_add(ev_timer, &sbuf->ev_timer_tv);
return sbuf;
err:
- sendbuf_shutdown(sbuf);
+ sendbuf_free(sbuf);
return NULL;
}
void
-sendbuf_shutdown(struct sendbuf *sbuf)
+sendbuf_free(struct sendbuf *sbuf)
{
- if (sbuf->send_timer)
- event_del(sbuf->send_timer);
+ if (sbuf->ev_timer)
+ event_del(sbuf->ev_timer);
if (sbuf->buffer && sbuf->send_func)
flushbuf(sbuf);
if (sbuf->buffer)
@@ -56,17 +57,41 @@ sendbuf_shutdown(struct sendbuf *sbuf)
}
/*
- * Returns a buffer to write data to be sent
+ * Append to the token buffer data to be sent
+ * uses a memcpy, in contrary to sendbuf_gettoken().
+ * return size on success, -1 on error
+ */
+int
+sendbuf_append(struct sendbuf *sbuf, void *token, int size)
+{
+ if (sbuf->buffer_pos + size >= sbuf->buffer_size)
+ if (flushbuf(sbuf) == -1)
+ return -1;
+
+ memcpy(sbuf->buffer + sbuf->buffer_pos, token, size);
+ sbuf->buffer_pos = sbuf->buffer_pos + size;
+
+ return size;
+}
+
+/*
+ * Returns a token buffer to write data to be sent
+ * avoids memcpy, in contrary to sendbuf_append().
* might return NULL if the sendbuf is temporary full
*/
void *
-sendbuf_getoken(struct sendbuf *sbuf, int size)
+sendbuf_gettoken(struct sendbuf *sbuf, int size)
{
+ void *token;
+
if (sbuf->buffer_pos + size >= sbuf->buffer_size)
if (flushbuf(sbuf) == -1)
return NULL;
- return sbuf->buffer_pos;
+ token = sbuf->buffer + sbuf->buffer_pos;
+ sbuf->buffer_pos = sbuf->buffer_pos + size;
+
+ return token;
}
/*
@@ -77,36 +102,30 @@ sendbuf_getoken(struct sendbuf *sbuf, int size)
* Note that you can still add data to the buffer even if flushing is in
* progress
*/
-int
+static int
flushbuf(struct sendbuf *sbuf)
{
- int sent;
- int sent_total;
- int len;
+ int tosend, sent;
sbuf->flushing = 1;
- do {
- pos = sbuf->buffer + sbuf->flushed_len;
- if (sbuf->buffer_pos > sbuf->sndbuf_max)
- tosend = sbuf->sndbuf_max;
- else
- tosend = sbuf->buffer_pos;
- sent = sbuf->send_func(sbuf->buffer + pos, tosend, sbuf->usrdata);
- sbuf->flushed_len = sbuf->flushed_len + sent;
- if (sent == -1) {
- // XXX handle erorr
- } else if (sent < tosend) {
- return -1;
- }
- } while (sbuf->flushed_len < sbuf->buffer_pos)
+ tosend = sbuf->buffer_size - sbuf->flushing_pos;
+ sent = sbuf->send_func(sbuf->buffer + sbuf->flushing_pos,
+ tosend, sbuf->usrdata);
+ if (sent == -1) {
+ // XXX handle error
+ return -1;
+ } else if (sent < tosend) {
+ sbuf->flushing_pos = sbuf->flushing_pos + sent;
+ return -1;
+ }
sbuf->flushing = 0;
- sbuf->flushed_len = 0;
+ sbuf->flushing_pos = 0;
return 0;
}
-void
+static void
cb_timer(evutil_socket_t fd, short what, void *arg)
{
struct sendbuf *sbuf;
diff --git a/libglouglou/sendbuf.h b/libglouglou/sendbuf.h
index 340e1c7..0290202 100644
--- a/libglouglou/sendbuf.h
+++ b/libglouglou/sendbuf.h
@@ -3,27 +3,21 @@
struct sendbuf {
struct event_base *ev_base;
- struct event *ev_timer;
- struct timeval ev_timer_tv;
- int sndbuf_max;
+ struct event *ev_timer;
+ struct timeval ev_timer_tv;
int msec_max;
-};
-
-struct sendbuf_queue {
- void *buffer;
int buffer_size;
- int buffer_pos;
+ void *buffer;
+ int buffer_pos; /* next to use in buffer */
+ int flushing;
+ int flushing_pos; /* next to send in buffer */
int (*send_func)(void *, int, void *);
void *usrdata;
- int flushing;
- int flushed_len;
-}
-
-struct sendbuf *sendbuf_init(struct event_base *, int, int, int);
-void sendbuf_shutdown(struct sendbuf *);
-void sendbuf_queue_add(struct sendbuf *,
- int (*send_func)(void *, int, void *),
- void *);
-void *sendbuf_token_get(struct sendbuf *, int);
-void sendbuf_token_ready(struct sendbuf *, void *);
+};
+struct sendbuf *sendbuf_new(struct event_base *, int, int,
+ int (*send_func)(void *, int, void *),
+ void *);
+void sendbuf_free(struct sendbuf *);
+int sendbuf_append(struct sendbuf *, void *, int);
+void *sendbuf_gettoken(struct sendbuf *, int);