aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLaurent Ghigonis <laurent@p1sec.com>2013-09-13 17:01:07 +0200
committerLaurent Ghigonis <laurent@p1sec.com>2013-09-13 17:01:07 +0200
commit46f1f786d9b012b6e99b4f2d9d56f64defd77b90 (patch)
tree8ecad31cfe10c963caefa2ed499ae59abea0073a
parentlibsendbuf: guide to alternatives (libevent bufferevent / evbuffer) (diff)
downloadglouglou-46f1f786d9b012b6e99b4f2d9d56f64defd77b90.tar.xz
glouglou-46f1f786d9b012b6e99b4f2d9d56f64defd77b90.zip
tcp server
-rw-r--r--v3/glougloud/glougloud.h3
-rw-r--r--v3/glougloud/probes.c2
-rw-r--r--v3/glougloud/redis.c9
-rw-r--r--v3/glougloud/viz.c205
-rw-r--r--v3/libglouglou/libglouglou.h1
-rw-r--r--v3/libglouglou/utils.c35
6 files changed, 192 insertions, 63 deletions
diff --git a/v3/glougloud/glougloud.h b/v3/glougloud/glougloud.h
index 3835fac..468c341 100644
--- a/v3/glougloud/glougloud.h
+++ b/v3/glougloud/glougloud.h
@@ -5,6 +5,8 @@
#include <hiredis/hiredis.h>
#include <hiredis/async.h>
+#define GLOUGLOUD_VERSION 3
+
#define GLOUGLOUD_USER_PROBES "_ggdprobe"
#define GLOUGLOUD_USER_VIZ "_ggdviz"
#define GLOUGLOUD_LOGFILE "/var/log/glougloud.log"
@@ -37,6 +39,7 @@ void redis_shutdown(void);
redisAsyncContext *redis_connect(struct event_base *,
void (*cb_connect)(const redisAsyncContext *, int),
void (*cb_disconnect)(const redisAsyncContext *, int));
+void redis_disconnect(redisAsyncContext *);
/* probes.c */
diff --git a/v3/glougloud/probes.c b/v3/glougloud/probes.c
index e1e69c7..f282f39 100644
--- a/v3/glougloud/probes.c
+++ b/v3/glougloud/probes.c
@@ -22,7 +22,7 @@ cb_connect(const redisAsyncContext *c, int status)
log_warn("redis error: %s", c->errstr);
return;
}
- log_info("probes: redis connected\n");
+ log_info("probes: redis connected");
}
static void
diff --git a/v3/glougloud/redis.c b/v3/glougloud/redis.c
index 65351d3..12f531a 100644
--- a/v3/glougloud/redis.c
+++ b/v3/glougloud/redis.c
@@ -79,7 +79,7 @@ redis_connect(struct event_base *evb,
do {
rc = redisAsyncConnectUnix(_ggd->redis.socket_chrooted);
if (rc->err) {
- log_warn("redis connect: %s\n", rc->errstr);
+ log_warn("redis connect: %s", rc->errstr);
sleep(1);
}
} while (rc->err);
@@ -89,3 +89,10 @@ redis_connect(struct event_base *evb,
return rc;
}
+
+void
+redis_disconnect(redisAsyncContext *rc)
+{
+ redisAsyncDisconnect(rc);
+ redisAsyncFree(rc);
+}
diff --git a/v3/glougloud/viz.c b/v3/glougloud/viz.c
index 28a32ed..a46d253 100644
--- a/v3/glougloud/viz.c
+++ b/v3/glougloud/viz.c
@@ -1,44 +1,71 @@
-#include <unistd.h>
-
#include <libglouglou.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <event2/listener.h>
+#include <event2/bufferevent.h>
+#include <event2/buffer.h>
+
#include "glougloud.h"
+#define MSG_WELCOME "Welcome to glougloud\n"
+
struct glougloud_viz {
int pid;
struct event_base *evb;
struct modules *mods;
redisAsyncContext *rc;
struct {
- struct event *ev;
- } srv_tcp;
+ struct evconnlistener *listener;
+ LIST_HEAD(, servtcp_cli) clients;
+ int clients_count;
+ } servtcp;
+ uint clients_ids_counter;
+};
+
+struct servtcp_cli {
+ LIST_ENTRY(servtcp_cli) entry;
+ int id;
+ struct bufferevent *bev;
+ struct addr addr;
};
struct glougloud *_ggd;
struct glougloud_viz *_viz;
+int
+_modules_load(void)
+{
+ _viz->evb = event_base_new();
+ _viz->mods = modules_load(GLOUGLOUD_MOD_PATH, NULL);
+
+ return 0;
+}
+
static void
-cb_notification(redisAsyncContext *c, void *r, void *privdata)
+_redis_cb_notification(redisAsyncContext *c, void *r, void *privdata)
{
redisReply *reply = r;
if (!reply)
return;
/* XXX notify modules */
- log_debug("viz: redis cb_notification: %s\n", reply->str);
+ log_debug("viz: redis cb_notification: %s", reply->str);
}
static void
-cb_connect(const redisAsyncContext *c, int status)
+_redis_cb_connect(const redisAsyncContext *c, int status)
{
if (status != REDIS_OK) {
log_warn("redis error: %s", c->errstr);
return;
}
- log_info("viz: redis connected\n");
+ log_info("viz: redis connected");
}
static void
-cb_disconnect(const redisAsyncContext *c, int status)
+_redis_cb_disconnect(const redisAsyncContext *c, int status)
{
if (status != REDIS_OK) {
log_warn("redis error: %s", c->errstr);
@@ -47,10 +74,141 @@ cb_disconnect(const redisAsyncContext *c, int status)
log_info("viz: redis disconnected");
}
+int
+_redis_connect(void)
+{
+ _viz->rc = redis_connect(_viz->evb,
+ _redis_cb_connect, _redis_cb_disconnect);
+ if (_viz->rc->err)
+ return -1;
+ redisAsyncCommand(_viz->rc, _redis_cb_notification, "event",
+ "SUBSCRIBE __keyevent@ggd__:*");
+
+ return 0;
+}
+
+void
+_redis_disconnect(void)
+{
+ redis_disconnect(_viz->rc);
+ _viz->rc = NULL;
+}
+
+
+struct servtcp_cli *
+_servtcp_cli_add(struct sockaddr *sa, struct bufferevent *bev)
+{
+ struct servtcp_cli *cli;
+
+ cli = xcalloc(1, sizeof(struct servtcp_cli));
+ cli->id = _viz->clients_ids_counter;
+ _viz->clients_ids_counter++;
+ cli->bev = bev;
+ addr_ston(sa, &cli->addr);
+ LIST_INSERT_HEAD(&_viz->servtcp.clients, cli, entry);
+
+ log_debug("viz: _servtcp_cli_add, cli %d %s",
+ cli->id, addr_ntoa(&cli->addr));
+ bufferevent_write(bev, MSG_WELCOME, strlen(MSG_WELCOME));
+
+ return cli;
+}
+
+void
+_servtcp_cli_del(struct servtcp_cli *cli)
+{
+ bufferevent_free(cli->bev);
+ LIST_REMOVE(cli, entry);
+ _viz->servtcp.clients_count--;
+ free(cli);
+}
+
static void
-cb_srv_conn(evutil_socket_t listener, short event, void *arg)
+_servtcp_cb_read(struct bufferevent *bev, void *user_data)
{
+ struct servtcp_cli *cli;
+ struct evbuffer *buf;
+ char *line;
+ size_t len;
+
+ cli = user_data;
+ buf = bufferevent_get_input(bev);
+ line = evbuffer_readln(buf, &len, EVBUFFER_EOL_CRLF);
+ log_debug("viz: _servtcp_cb_read, cli %d %s, len %d: %s",
+ cli->id, addr_ntoa(&cli->addr), len, line);
+}
+
+static void
+_servtcp_cb_event(struct bufferevent *bev, short events, void *user_data)
+{
+ struct servtcp_cli *cli;
+
+ cli = user_data;
+ if (events & BEV_EVENT_EOF) {
+ log_debug("viz: cli %d: connection closed", cli->id);
+ } else if (events & BEV_EVENT_TIMEOUT) {
+ log_debug("viz: cli %d: connection timeout", cli->id);
+ } else if (events & BEV_EVENT_ERROR) {
+ log_debug("viz: cli %d: connection error: %s", cli->id, strerror(errno));
+ }
+ _servtcp_cli_del(cli);
+}
+
+static void
+_servtcp_cb_listener(struct evconnlistener *listener,
+ evutil_socket_t fd, struct sockaddr *sa, int socklen,
+ void *user_data)
+{
+ struct servtcp_cli *cli;
+ struct bufferevent *bev;
+
+ bev = bufferevent_socket_new(_viz->evb, fd, BEV_OPT_CLOSE_ON_FREE);
+ if (!bev) {
+ log_warn("viz: error constructing bufferevent !");
+ return;
+ }
+
+ cli = _servtcp_cli_add(sa, bev);
+ bufferevent_setcb(bev, _servtcp_cb_read, NULL,
+ _servtcp_cb_event, cli);
+ bufferevent_enable(bev, EV_WRITE|EV_READ);
+}
+
+int
+_servtcp_start(void)
+{
+ struct evconnlistener *listener;
+ struct sockaddr_in sin;
+
+ memset(&sin, 0, sizeof(sin));
+ sin.sin_family = AF_INET;
+ addr_ntos(&_ggd->viz.serv_ip, (struct sockaddr *)&sin);
+ sin.sin_port = htons(_ggd->viz.serv_port);
+
+ listener = evconnlistener_new_bind(_viz->evb,
+ _servtcp_cb_listener, NULL,
+ LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE, -1,
+ (struct sockaddr*)&sin, sizeof(sin));
+
+ if (!listener)
+ return -1;
+ _viz->servtcp.listener = listener;
+
+ return 0;
+}
+
+void
+_servtcp_stop(void)
+{
+ struct servtcp_cli *cli, *clitmp;
+
+ if (_viz->servtcp.listener)
+ evconnlistener_free(_viz->servtcp.listener);
+ _viz->servtcp.listener = NULL;
+
+ LIST_FOREACH_SAFE(cli, &_viz->servtcp.clients, entry, clitmp)
+ _servtcp_cli_del(cli);
}
int
@@ -64,28 +222,25 @@ viz_init(struct glougloud *ggd)
setprocname("viz");
droppriv(GLOUGLOUD_USER_VIZ, 1, NULL);
- _viz->evb = event_base_new();
- _viz->mods = modules_load(GLOUGLOUD_MOD_PATH, NULL);
-
- _viz->rc = redis_connect(_viz->evb, cb_connect, cb_disconnect);
- if (_viz->rc->err)
- return -1;
- redisAsyncCommand(_viz->rc, cb_notification, "event",
- "SUBSCRIBE __keyevent@ggd__:*");
-
- _viz->srv_tcp.ev = tcp_server_create(_viz->evb, &_ggd->viz.serv_ip,
- _ggd->viz.serv_port, cb_srv_conn, NULL);
- if (!_viz->srv_tcp.ev)
- return -1;
-
- log_debug("XXX viz ok");
+ if (_modules_load() < 0)
+ goto err;
+ if (_redis_connect() < 0)
+ goto err;
+ if (_servtcp_start() < 0)
+ goto err;
event_base_dispatch(_viz->evb);
return 0;
+
+err:
+ viz_shutdown();
+ return -1;
}
void
viz_shutdown(void) {
+ _redis_disconnect();
+ _servtcp_stop();
}
diff --git a/v3/libglouglou/libglouglou.h b/v3/libglouglou/libglouglou.h
index f3a9d46..27a2464 100644
--- a/v3/libglouglou/libglouglou.h
+++ b/v3/libglouglou/libglouglou.h
@@ -129,7 +129,6 @@ void droppriv(char *, int, char *);
struct modules *modules_load(char *, char *);
int exec_pipe(char *, char **, char *, char **);
void kill_wait(pid_t, int);
-struct event *tcp_server_create(struct event_base *, struct addr *, int, event_callback_fn, void *);
struct event *udp_server_create(struct event_base *, struct addr *, int, event_callback_fn, void *);
void setprocname(const char *);
diff --git a/v3/libglouglou/utils.c b/v3/libglouglou/utils.c
index 3ab999d..a3581c2 100644
--- a/v3/libglouglou/utils.c
+++ b/v3/libglouglou/utils.c
@@ -191,41 +191,6 @@ kill_wait(pid_t pid, int seconds) {
}
struct event *
-tcp_server_create(struct event_base *evb, struct addr *ip, int port, event_callback_fn cb_conn, void *cb_conn_data)
-{
- int s = -1;
- struct sockaddr_in sock_addr;
- struct event *ev = NULL;
- int sock_on = 1;
-
- s = socket(AF_INET, SOCK_STREAM, 0);
- if (s < 0)
- goto err;
- setsockopt(s, SOL_SOCKET, SO_REUSEADDR,
- &sock_on, sizeof(sock_on));
- fd_nonblock(s);
-
- bzero(&sock_addr, sizeof(sock_addr));
- addr_ntos(ip, (struct sockaddr *)&sock_addr);
- sock_addr.sin_port = htons(port);
-
- if (bind(s, (struct sockaddr *)&sock_addr,
- sizeof(sock_addr)) != 0)
- goto err;
-
- ev = event_new(evb, s, EV_READ|EV_PERSIST, cb_conn, cb_conn_data);
- event_add(ev, NULL);
- return ev;
-
-err:
- if (s != -1)
- close(s);
- if (ev)
- event_del(ev);
- return NULL;
-}
-
-struct event *
udp_server_create(struct event_base *evb, struct addr *ip, int port, event_callback_fn cb_srv, void *cb_srv_data)
{
int s = -1;