diff options
author | Laurent Ghigonis <laurent@p1sec.com> | 2013-09-13 17:01:07 +0200 |
---|---|---|
committer | Laurent Ghigonis <laurent@p1sec.com> | 2013-09-13 17:01:07 +0200 |
commit | 46f1f786d9b012b6e99b4f2d9d56f64defd77b90 (patch) | |
tree | 8ecad31cfe10c963caefa2ed499ae59abea0073a | |
parent | libsendbuf: guide to alternatives (libevent bufferevent / evbuffer) (diff) | |
download | glouglou-46f1f786d9b012b6e99b4f2d9d56f64defd77b90.tar.xz glouglou-46f1f786d9b012b6e99b4f2d9d56f64defd77b90.zip |
tcp server
-rw-r--r-- | v3/glougloud/glougloud.h | 3 | ||||
-rw-r--r-- | v3/glougloud/probes.c | 2 | ||||
-rw-r--r-- | v3/glougloud/redis.c | 9 | ||||
-rw-r--r-- | v3/glougloud/viz.c | 205 | ||||
-rw-r--r-- | v3/libglouglou/libglouglou.h | 1 | ||||
-rw-r--r-- | v3/libglouglou/utils.c | 35 |
6 files changed, 192 insertions, 63 deletions
diff --git a/v3/glougloud/glougloud.h b/v3/glougloud/glougloud.h index 3835fac..468c341 100644 --- a/v3/glougloud/glougloud.h +++ b/v3/glougloud/glougloud.h @@ -5,6 +5,8 @@ #include <hiredis/hiredis.h> #include <hiredis/async.h> +#define GLOUGLOUD_VERSION 3 + #define GLOUGLOUD_USER_PROBES "_ggdprobe" #define GLOUGLOUD_USER_VIZ "_ggdviz" #define GLOUGLOUD_LOGFILE "/var/log/glougloud.log" @@ -37,6 +39,7 @@ void redis_shutdown(void); redisAsyncContext *redis_connect(struct event_base *, void (*cb_connect)(const redisAsyncContext *, int), void (*cb_disconnect)(const redisAsyncContext *, int)); +void redis_disconnect(redisAsyncContext *); /* probes.c */ diff --git a/v3/glougloud/probes.c b/v3/glougloud/probes.c index e1e69c7..f282f39 100644 --- a/v3/glougloud/probes.c +++ b/v3/glougloud/probes.c @@ -22,7 +22,7 @@ cb_connect(const redisAsyncContext *c, int status) log_warn("redis error: %s", c->errstr); return; } - log_info("probes: redis connected\n"); + log_info("probes: redis connected"); } static void diff --git a/v3/glougloud/redis.c b/v3/glougloud/redis.c index 65351d3..12f531a 100644 --- a/v3/glougloud/redis.c +++ b/v3/glougloud/redis.c @@ -79,7 +79,7 @@ redis_connect(struct event_base *evb, do { rc = redisAsyncConnectUnix(_ggd->redis.socket_chrooted); if (rc->err) { - log_warn("redis connect: %s\n", rc->errstr); + log_warn("redis connect: %s", rc->errstr); sleep(1); } } while (rc->err); @@ -89,3 +89,10 @@ redis_connect(struct event_base *evb, return rc; } + +void +redis_disconnect(redisAsyncContext *rc) +{ + redisAsyncDisconnect(rc); + redisAsyncFree(rc); +} diff --git a/v3/glougloud/viz.c b/v3/glougloud/viz.c index 28a32ed..a46d253 100644 --- a/v3/glougloud/viz.c +++ b/v3/glougloud/viz.c @@ -1,44 +1,71 @@ -#include <unistd.h> - #include <libglouglou.h> +#include <unistd.h> +#include <stdlib.h> +#include <string.h> +#include <errno.h> +#include <event2/listener.h> +#include <event2/bufferevent.h> +#include <event2/buffer.h> + #include "glougloud.h" +#define MSG_WELCOME "Welcome to glougloud\n" + struct glougloud_viz { int pid; struct event_base *evb; struct modules *mods; redisAsyncContext *rc; struct { - struct event *ev; - } srv_tcp; + struct evconnlistener *listener; + LIST_HEAD(, servtcp_cli) clients; + int clients_count; + } servtcp; + uint clients_ids_counter; +}; + +struct servtcp_cli { + LIST_ENTRY(servtcp_cli) entry; + int id; + struct bufferevent *bev; + struct addr addr; }; struct glougloud *_ggd; struct glougloud_viz *_viz; +int +_modules_load(void) +{ + _viz->evb = event_base_new(); + _viz->mods = modules_load(GLOUGLOUD_MOD_PATH, NULL); + + return 0; +} + static void -cb_notification(redisAsyncContext *c, void *r, void *privdata) +_redis_cb_notification(redisAsyncContext *c, void *r, void *privdata) { redisReply *reply = r; if (!reply) return; /* XXX notify modules */ - log_debug("viz: redis cb_notification: %s\n", reply->str); + log_debug("viz: redis cb_notification: %s", reply->str); } static void -cb_connect(const redisAsyncContext *c, int status) +_redis_cb_connect(const redisAsyncContext *c, int status) { if (status != REDIS_OK) { log_warn("redis error: %s", c->errstr); return; } - log_info("viz: redis connected\n"); + log_info("viz: redis connected"); } static void -cb_disconnect(const redisAsyncContext *c, int status) +_redis_cb_disconnect(const redisAsyncContext *c, int status) { if (status != REDIS_OK) { log_warn("redis error: %s", c->errstr); @@ -47,10 +74,141 @@ cb_disconnect(const redisAsyncContext *c, int status) log_info("viz: redis disconnected"); } +int +_redis_connect(void) +{ + _viz->rc = redis_connect(_viz->evb, + _redis_cb_connect, _redis_cb_disconnect); + if (_viz->rc->err) + return -1; + redisAsyncCommand(_viz->rc, _redis_cb_notification, "event", + "SUBSCRIBE __keyevent@ggd__:*"); + + return 0; +} + +void +_redis_disconnect(void) +{ + redis_disconnect(_viz->rc); + _viz->rc = NULL; +} + + +struct servtcp_cli * +_servtcp_cli_add(struct sockaddr *sa, struct bufferevent *bev) +{ + struct servtcp_cli *cli; + + cli = xcalloc(1, sizeof(struct servtcp_cli)); + cli->id = _viz->clients_ids_counter; + _viz->clients_ids_counter++; + cli->bev = bev; + addr_ston(sa, &cli->addr); + LIST_INSERT_HEAD(&_viz->servtcp.clients, cli, entry); + + log_debug("viz: _servtcp_cli_add, cli %d %s", + cli->id, addr_ntoa(&cli->addr)); + bufferevent_write(bev, MSG_WELCOME, strlen(MSG_WELCOME)); + + return cli; +} + +void +_servtcp_cli_del(struct servtcp_cli *cli) +{ + bufferevent_free(cli->bev); + LIST_REMOVE(cli, entry); + _viz->servtcp.clients_count--; + free(cli); +} + static void -cb_srv_conn(evutil_socket_t listener, short event, void *arg) +_servtcp_cb_read(struct bufferevent *bev, void *user_data) { + struct servtcp_cli *cli; + struct evbuffer *buf; + char *line; + size_t len; + + cli = user_data; + buf = bufferevent_get_input(bev); + line = evbuffer_readln(buf, &len, EVBUFFER_EOL_CRLF); + log_debug("viz: _servtcp_cb_read, cli %d %s, len %d: %s", + cli->id, addr_ntoa(&cli->addr), len, line); +} + +static void +_servtcp_cb_event(struct bufferevent *bev, short events, void *user_data) +{ + struct servtcp_cli *cli; + + cli = user_data; + if (events & BEV_EVENT_EOF) { + log_debug("viz: cli %d: connection closed", cli->id); + } else if (events & BEV_EVENT_TIMEOUT) { + log_debug("viz: cli %d: connection timeout", cli->id); + } else if (events & BEV_EVENT_ERROR) { + log_debug("viz: cli %d: connection error: %s", cli->id, strerror(errno)); + } + _servtcp_cli_del(cli); +} + +static void +_servtcp_cb_listener(struct evconnlistener *listener, + evutil_socket_t fd, struct sockaddr *sa, int socklen, + void *user_data) +{ + struct servtcp_cli *cli; + struct bufferevent *bev; + + bev = bufferevent_socket_new(_viz->evb, fd, BEV_OPT_CLOSE_ON_FREE); + if (!bev) { + log_warn("viz: error constructing bufferevent !"); + return; + } + + cli = _servtcp_cli_add(sa, bev); + bufferevent_setcb(bev, _servtcp_cb_read, NULL, + _servtcp_cb_event, cli); + bufferevent_enable(bev, EV_WRITE|EV_READ); +} + +int +_servtcp_start(void) +{ + struct evconnlistener *listener; + struct sockaddr_in sin; + + memset(&sin, 0, sizeof(sin)); + sin.sin_family = AF_INET; + addr_ntos(&_ggd->viz.serv_ip, (struct sockaddr *)&sin); + sin.sin_port = htons(_ggd->viz.serv_port); + + listener = evconnlistener_new_bind(_viz->evb, + _servtcp_cb_listener, NULL, + LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE, -1, + (struct sockaddr*)&sin, sizeof(sin)); + + if (!listener) + return -1; + _viz->servtcp.listener = listener; + + return 0; +} + +void +_servtcp_stop(void) +{ + struct servtcp_cli *cli, *clitmp; + + if (_viz->servtcp.listener) + evconnlistener_free(_viz->servtcp.listener); + _viz->servtcp.listener = NULL; + + LIST_FOREACH_SAFE(cli, &_viz->servtcp.clients, entry, clitmp) + _servtcp_cli_del(cli); } int @@ -64,28 +222,25 @@ viz_init(struct glougloud *ggd) setprocname("viz"); droppriv(GLOUGLOUD_USER_VIZ, 1, NULL); - _viz->evb = event_base_new(); - _viz->mods = modules_load(GLOUGLOUD_MOD_PATH, NULL); - - _viz->rc = redis_connect(_viz->evb, cb_connect, cb_disconnect); - if (_viz->rc->err) - return -1; - redisAsyncCommand(_viz->rc, cb_notification, "event", - "SUBSCRIBE __keyevent@ggd__:*"); - - _viz->srv_tcp.ev = tcp_server_create(_viz->evb, &_ggd->viz.serv_ip, - _ggd->viz.serv_port, cb_srv_conn, NULL); - if (!_viz->srv_tcp.ev) - return -1; - - log_debug("XXX viz ok"); + if (_modules_load() < 0) + goto err; + if (_redis_connect() < 0) + goto err; + if (_servtcp_start() < 0) + goto err; event_base_dispatch(_viz->evb); return 0; + +err: + viz_shutdown(); + return -1; } void viz_shutdown(void) { + _redis_disconnect(); + _servtcp_stop(); } diff --git a/v3/libglouglou/libglouglou.h b/v3/libglouglou/libglouglou.h index f3a9d46..27a2464 100644 --- a/v3/libglouglou/libglouglou.h +++ b/v3/libglouglou/libglouglou.h @@ -129,7 +129,6 @@ void droppriv(char *, int, char *); struct modules *modules_load(char *, char *); int exec_pipe(char *, char **, char *, char **); void kill_wait(pid_t, int); -struct event *tcp_server_create(struct event_base *, struct addr *, int, event_callback_fn, void *); struct event *udp_server_create(struct event_base *, struct addr *, int, event_callback_fn, void *); void setprocname(const char *); diff --git a/v3/libglouglou/utils.c b/v3/libglouglou/utils.c index 3ab999d..a3581c2 100644 --- a/v3/libglouglou/utils.c +++ b/v3/libglouglou/utils.c @@ -191,41 +191,6 @@ kill_wait(pid_t pid, int seconds) { } struct event * -tcp_server_create(struct event_base *evb, struct addr *ip, int port, event_callback_fn cb_conn, void *cb_conn_data) -{ - int s = -1; - struct sockaddr_in sock_addr; - struct event *ev = NULL; - int sock_on = 1; - - s = socket(AF_INET, SOCK_STREAM, 0); - if (s < 0) - goto err; - setsockopt(s, SOL_SOCKET, SO_REUSEADDR, - &sock_on, sizeof(sock_on)); - fd_nonblock(s); - - bzero(&sock_addr, sizeof(sock_addr)); - addr_ntos(ip, (struct sockaddr *)&sock_addr); - sock_addr.sin_port = htons(port); - - if (bind(s, (struct sockaddr *)&sock_addr, - sizeof(sock_addr)) != 0) - goto err; - - ev = event_new(evb, s, EV_READ|EV_PERSIST, cb_conn, cb_conn_data); - event_add(ev, NULL); - return ev; - -err: - if (s != -1) - close(s); - if (ev) - event_del(ev); - return NULL; -} - -struct event * udp_server_create(struct event_base *evb, struct addr *ip, int port, event_callback_fn cb_srv, void *cb_srv_data) { int s = -1; |