util-logopenfile: reconnect handling

This patch implements reconnection handling for the redis output.
A reconnect limitation has been implemented with a limitation of
one connection per second.
pull/1712/head
Eric Leblond 11 years ago committed by Victor Julien
parent b834e2d19a
commit 594f62b523

@ -331,7 +331,16 @@ int SCConfLogReopen(LogFileCtx *log_ctx)
}
#if HAVE_LIBHIREDIS
#ifdef HAVE_LIBHIREDIS
static void SCLogFileCloseRedis(LogFileCtx *log_ctx)
{
if (log_ctx->redis)
redisFree(log_ctx->redis);
log_ctx->redis_setup.tried = 0;
SC_ATOMIC_SET(log_ctx->redis_setup.batch_count, 0);
}
int SCConfLogOpenRedis(ConfNode *redis_node, LogFileCtx *log_ctx)
{
const char *redis_server = NULL;
@ -398,10 +407,50 @@ int SCConfLogOpenRedis(ConfNode *redis_node, LogFileCtx *log_ctx)
SCLogError(SC_ERR_SOCKET, "Error connecting to redis server: %s", c->errstr);
exit(EXIT_FAILURE);
}
/* store server params for reconnection */
log_ctx->redis_setup.server = SCStrdup(redis_server);
if (!log_ctx->redis_setup.server) {
SCLogError(SC_ERR_MEM_ALLOC, "Error allocating redis server string");
exit(EXIT_FAILURE);
}
log_ctx->redis_setup.port = atoi(redis_port);
log_ctx->redis_setup.tried = 0;
log_ctx->redis = c;
log_ctx->Close = SCLogFileCloseRedis;
return 0;
}
int SCConfLogReopenRedis(LogFileCtx *log_ctx)
{
if (log_ctx->redis != NULL) {
redisFree(log_ctx->redis);
log_ctx->redis = NULL;
}
/* only try to reconnect once per second */
if (log_ctx->redis_setup.tried >= time(NULL)) {
return -1;
}
redisContext *c = redisConnect(log_ctx->redis_setup.server, log_ctx->redis_setup.port);
if (c != NULL && c->err) {
if (log_ctx->redis_setup.tried == 0) {
SCLogError(SC_ERR_SOCKET, "Error connecting to redis server: %s\n", c->errstr);
}
redisFree(c);
log_ctx->redis_setup.tried = time(NULL);
return -1;
}
log_ctx->redis = c;
log_ctx->redis_setup.tried = 0;
SC_ATOMIC_SET(log_ctx->redis_setup.batch_count, 0);
return 0;
}
#endif
/** \brief LogFileNewCtx() Get a new LogFileCtx
@ -448,6 +497,9 @@ int LogFileFreeCtx(LogFileCtx *lf_ctx)
#ifdef HAVE_LIBHIREDIS
if (lf_ctx->type == LOGFILE_TYPE_REDIS && lf_ctx->redis) {
redisFree(lf_ctx->redis);
SCFree(lf_ctx->redis_setup.server);
SCFree(lf_ctx->redis_setup.command);
SCFree(lf_ctx->redis_setup.key);
}
#endif
@ -481,6 +533,16 @@ int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer, char *string, size_t s
}
#if HAVE_LIBHIREDIS
else if (file_ctx->type == LOGFILE_TYPE_REDIS) {
if (file_ctx->redis == NULL) {
/* FIXME temporisation */
SCConfLogReopenRedis(file_ctx);
if (file_ctx->redis == NULL) {
SCMutexUnlock(&file_ctx->fp_mutex);
return -1;
} else {
SCLogInfo("Reconnected to redis server");
}
}
/* FIXME go async here ? */
if (file_ctx->redis_setup.batch_size) {
redisAppendCommand(file_ctx->redis, "%s %s %s",
@ -495,7 +557,30 @@ int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer, char *string, size_t s
freeReplyObject(reply);
} else {
/* FIXME treat error */
SCLogInfo("Error when fetching reply");
if (file_ctx->redis->err) {
SCLogInfo("Error when fetching reply: %s (%d)",
file_ctx->redis->errstr,
file_ctx->redis->err);
}
switch (file_ctx->redis->err) {
case REDIS_ERR_EOF:
case REDIS_ERR_IO:
SCLogInfo("Reopening connection to redis server");
SCConfLogReopenRedis(file_ctx);
if (file_ctx->redis) {
SCLogInfo("Reconnected to redis server");
SCMutexUnlock(&file_ctx->fp_mutex);
return 0;
} else {
SCLogInfo("Unable to reconnect to redis server");
SCMutexUnlock(&file_ctx->fp_mutex);
return 0;
}
break;
default:
SCLogInfo("Unsupported error code %d",
file_ctx->redis->err);
}
}
}
} else {
@ -510,6 +595,7 @@ int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer, char *string, size_t s
switch (reply->type) {
case REDIS_REPLY_ERROR:
SCLogWarning(SC_WARN_NO_UNITTESTS, "Redis error: %s", reply->str);
SCConfLogReopenRedis(file_ctx);
break;
case REDIS_REPLY_INTEGER:
SCLogDebug("Redis integer %lld", reply->integer);
@ -517,6 +603,7 @@ int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer, char *string, size_t s
default:
SCLogError(SC_ERR_INVALID_VALUE,
"Redis default triggered with %d", reply->type);
SCConfLogReopenRedis(file_ctx);
break;
}
freeReplyObject(reply);

@ -56,6 +56,9 @@ typedef struct RedisSetup_ {
char *sensor_name;
int batch_size;
SC_ATOMIC_DECLARE(int, batch_count);
char *server;
int port;
time_t tried;
} RedisSetup;
#endif

Loading…
Cancel
Save