util-logopenfile: implement redis pipelining

This patch implements redis pipelining. This consist in contacting
the redis server every N events to minimize the number of TCP
exchange. This is optional and setup via the configuration file.
pull/1712/head
Eric Leblond 10 years ago committed by Victor Julien
parent f953fdfbac
commit b834e2d19a

@ -362,6 +362,24 @@ int SCConfLogOpenRedis(ConfNode *redis_node, LogFileCtx *log_ctx)
exit(EXIT_FAILURE);
}
log_ctx->redis_setup.batch_size = 0;
ConfNode *pipelining = ConfNodeLookupChild(redis_node, "pipelining");
if (pipelining) {
int enabled = 0;
int ret;
intmax_t val;
ret = ConfGetChildValueBool(pipelining, "enabled", &enabled);
if (ret && enabled) {
ret = ConfGetChildValueInt(pipelining, "batch-size", &val);
if (ret) {
log_ctx->redis_setup.batch_size = val;
} else {
log_ctx->redis_setup.batch_size = 10;
}
}
}
if (!strcmp(redis_mode, "list")) {
log_ctx->redis_setup.command = SCStrdup("LPUSH");
if (!log_ctx->redis_setup.command) {
@ -404,6 +422,10 @@ LogFileCtx *LogFileNewCtx(void)
lf_ctx->Write = SCLogFileWrite;
lf_ctx->Close = SCLogFileClose;
#ifdef HAVE_LIBHIREDIS
SC_ATOMIC_INIT(lf_ctx->redis_setup.batch_count);
#endif
return lf_ctx;
}
@ -459,24 +481,46 @@ int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer, char *string, size_t s
}
#if HAVE_LIBHIREDIS
else if (file_ctx->type == LOGFILE_TYPE_REDIS) {
/* FIXME go async here */
redisReply *reply = redisCommand(file_ctx->redis, "%s %s %s",
file_ctx->redis_setup.command,
file_ctx->redis_setup.key,
string);
switch (reply->type) {
case REDIS_REPLY_ERROR:
SCLogWarning(SC_WARN_NO_UNITTESTS, "Redis error: %s", reply->str);
break;
case REDIS_REPLY_INTEGER:
SCLogDebug("Redis integer %lld", reply->integer);
break;
default:
SCLogError(SC_ERR_INVALID_VALUE,
"Redis default triggered with %d", reply->type);
break;
/* FIXME go async here ? */
if (file_ctx->redis_setup.batch_size) {
redisAppendCommand(file_ctx->redis, "%s %s %s",
file_ctx->redis_setup.command,
file_ctx->redis_setup.key,
string);
if (SC_ATOMIC_CAS(&file_ctx->redis_setup.batch_count, file_ctx->redis_setup.batch_size, 0)) {
redisReply *reply;
int i;
for(i = 0; i <= file_ctx->redis_setup.batch_size; i++) {
if (redisGetReply(file_ctx->redis, (void **)&reply) == REDIS_OK) {
freeReplyObject(reply);
} else {
/* FIXME treat error */
SCLogInfo("Error when fetching reply");
}
}
} else {
SC_ATOMIC_ADD(file_ctx->redis_setup.batch_count, 1);
}
} else {
redisReply *reply = redisCommand(file_ctx->redis, "%s %s %s",
file_ctx->redis_setup.command,
file_ctx->redis_setup.key,
string);
switch (reply->type) {
case REDIS_REPLY_ERROR:
SCLogWarning(SC_WARN_NO_UNITTESTS, "Redis error: %s", reply->str);
break;
case REDIS_REPLY_INTEGER:
SCLogDebug("Redis integer %lld", reply->integer);
break;
default:
SCLogError(SC_ERR_INVALID_VALUE,
"Redis default triggered with %d", reply->type);
break;
}
freeReplyObject(reply);
}
freeReplyObject(reply);
}
#endif
SCMutexUnlock(&file_ctx->fp_mutex);

@ -54,6 +54,8 @@ typedef struct RedisSetup_ {
char *command;
char *key;
char *sensor_name;
int batch_size;
SC_ATOMIC_DECLARE(int, batch_count);
} RedisSetup;
#endif

@ -109,6 +109,13 @@ outputs:
# port: 6379
# mode: list ## possible values: list (default), channel
# key: suricata ## key or channel to use (default to suricata)
# Redis pipelining set up. This will enable to only do a query every
# 'batch-size' events. This should lower the latency induced by network
# connection at the cost of some memory. There is no flushing implemented
# so this setting as to be reserved to high traffic suricata.
# pipelining:
# enabled: yes ## set enable to yes to enable query pipelining
# batch-size: 10 ## number of entry to keep in buffer
types:
- alert:
# payload: yes # enable dumping payload in Base64

Loading…
Cancel
Save