output/log: Add flushing infrastructure

Issue: 3449

Add flushing functions and infrastructure. This includes:
- Flushing functions for packet loggers
- Log file flushing support
pull/12679/head
Jeff Lucovsky 10 months ago committed by Victor Julien
parent 04767f69fc
commit b18622554d

@ -454,7 +454,7 @@ void EveStreamLogRegister(void)
{
OutputPacketLoggerFunctions output_logger_functions = {
.LogFunc = EveStreamLogger,
.FlushFunc = NULL,
.FlushFunc = OutputJsonLogFlush,
.ConditionFunc = EveStreamLogCondition,
.ThreadInitFunc = EveStreamLogThreadInit,
.ThreadDeinitFunc = EveStreamLogThreadDeinit,

@ -823,6 +823,14 @@ static int AlertJsonDecoderEvent(ThreadVars *tv, JsonAlertLogThread *aft, const
return TM_ECODE_OK;
}
static int JsonAlertFlush(ThreadVars *tv, void *thread_data, const Packet *p)
{
JsonAlertLogThread *aft = thread_data;
SCLogDebug("%s flushing %s", tv->name, ((LogFileCtx *)(aft->ctx->file_ctx))->filename);
OutputJsonFlush(aft->ctx);
return 0;
}
static int JsonAlertLogger(ThreadVars *tv, void *thread_data, const Packet *p)
{
JsonAlertLogThread *aft = thread_data;
@ -1067,7 +1075,7 @@ void JsonAlertLogRegister (void)
{
OutputPacketLoggerFunctions output_logger_functions = {
.LogFunc = JsonAlertLogger,
.FlushFunc = NULL,
.FlushFunc = JsonAlertFlush,
.ConditionFunc = JsonAlertLogCondition,
.ThreadInitFunc = JsonAlertLogThreadInit,
.ThreadDeinitFunc = JsonAlertLogThreadDeinit,

@ -272,6 +272,14 @@ static int AnomalyJson(ThreadVars *tv, JsonAnomalyLogThread *aft, const Packet *
return rc;
}
static int JsonAnomalyFlush(ThreadVars *tv, void *thread_data, const Packet *p)
{
JsonAnomalyLogThread *aft = thread_data;
SCLogDebug("%s flushing %s", tv->name, ((LogFileCtx *)(aft->ctx->file_ctx))->filename);
OutputJsonFlush(aft->ctx);
return 0;
}
static int JsonAnomalyLogger(ThreadVars *tv, void *thread_data, const Packet *p)
{
JsonAnomalyLogThread *aft = thread_data;
@ -451,7 +459,7 @@ void JsonAnomalyLogRegister (void)
{
OutputPacketLoggerFunctions output_logger_functions = {
.LogFunc = JsonAnomalyLogger,
.FlushFunc = NULL,
.FlushFunc = JsonAnomalyFlush,
.ConditionFunc = JsonAnomalyLogCondition,
.ThreadInitFunc = JsonAnomalyLogThreadInit,
.ThreadDeinitFunc = JsonAnomalyLogThreadDeinit,

@ -70,6 +70,15 @@ static void OutputJsonLogDeInitCtxSub(OutputCtx *output_ctx)
SCFree(output_ctx);
}
int OutputJsonLogFlush(ThreadVars *tv, void *thread_data, const Packet *p)
{
OutputJsonThreadCtx *aft = thread_data;
LogFileCtx *file_ctx = aft->ctx->file_ctx;
SCLogDebug("%s flushing %s", tv->name, file_ctx->filename);
LogFileFlush(file_ctx);
return 0;
}
OutputInitResult OutputJsonLogInitSub(ConfNode *conf, OutputCtx *parent_ctx)
{
OutputInitResult result = { NULL, false };

@ -392,7 +392,7 @@ void JsonDropLogRegister (void)
{
OutputPacketLoggerFunctions output_logger_functions = {
.LogFunc = JsonDropLogger,
.FlushFunc = NULL,
.FlushFunc = OutputJsonLogFlush,
.ConditionFunc = JsonDropLogCondition,
.ThreadInitFunc = JsonDropLogThreadInit,
.ThreadDeinitFunc = JsonDropLogThreadDeinit,

@ -562,7 +562,7 @@ void JsonFrameLogRegister(void)
{
OutputPacketLoggerFunctions output_logger_functions = {
.LogFunc = JsonFrameLogger,
.FlushFunc = NULL,
.FlushFunc = OutputJsonLogFlush,
.ConditionFunc = JsonFrameLogCondition,
.ThreadInitFunc = JsonFrameLogThreadInit,
.ThreadDeinitFunc = JsonFrameLogThreadDeinit,

@ -96,7 +96,7 @@ void JsonMetadataLogRegister (void)
{
OutputPacketLoggerFunctions output_logger_functions = {
.LogFunc = JsonMetadataLogger,
.FlushFunc = NULL,
.FlushFunc = OutputJsonLogFlush,
.ConditionFunc = JsonMetadataLogCondition,
.ThreadInitFunc = JsonLogThreadInit,
.ThreadDeinitFunc = JsonLogThreadDeinit,

@ -956,6 +956,12 @@ int OutputJSONBuffer(json_t *js, LogFileCtx *file_ctx, MemBuffer **buffer)
return 0;
}
void OutputJsonFlush(OutputJsonThreadCtx *ctx)
{
LogFileCtx *file_ctx = ctx->file_ctx;
LogFileFlush(file_ctx);
}
void OutputJsonBuilderBuffer(
ThreadVars *tv, const Packet *p, Flow *f, JsonBuilder *js, OutputJsonThreadCtx *ctx)
{

@ -114,6 +114,7 @@ TmEcode JsonLogThreadDeinit(ThreadVars *t, void *data);
void EveAddCommonOptions(const OutputJsonCommonSettings *cfg, const Packet *p, const Flow *f,
JsonBuilder *js, enum OutputJsonLogDirection dir);
int OutputJsonLogFlush(ThreadVars *tv, void *thread_data, const Packet *p);
void EveAddMetadata(const Packet *p, const Flow *f, JsonBuilder *js);
int OutputJSONMemBufferCallback(const char *str, size_t size, void *data);
@ -121,5 +122,6 @@ int OutputJSONMemBufferCallback(const char *str, size_t size, void *data);
OutputJsonThreadCtx *CreateEveThreadCtx(ThreadVars *t, OutputJsonCtx *ctx);
void FreeEveThreadCtx(OutputJsonThreadCtx *ctx);
void JSONFormatAndAddMACAddr(JsonBuilder *js, const char *key, const uint8_t *val, bool is_array);
void OutputJsonFlush(OutputJsonThreadCtx *ctx);
#endif /* SURICATA_OUTPUT_JSON_H */

@ -126,7 +126,7 @@ static int SCLogUnixSocketReconnect(LogFileCtx *log_ctx)
log_ctx->fp = SCLogOpenUnixSocketFp(log_ctx->filename, log_ctx->sock_type, 0);
if (log_ctx->fp) {
/* Connected at last (or reconnected) */
SCLogNotice("Reconnected socket \"%s\"", log_ctx->filename);
SCLogDebug("Reconnected socket \"%s\"", log_ctx->filename);
} else if (disconnected) {
SCLogWarning("Reconnect failed: %s (will keep trying)", strerror(errno));
}
@ -189,6 +189,22 @@ static inline void OutputWriteLock(pthread_mutex_t *m)
}
/**
* \brief Flush a log file.
*/
static void SCLogFileFlushNoLock(LogFileCtx *log_ctx)
{
log_ctx->bytes_since_last_flush = 0;
SCFflushUnlocked(log_ctx->fp);
}
static void SCLogFileFlush(LogFileCtx *log_ctx)
{
OutputWriteLock(&log_ctx->fp_mutex);
SCLogFileFlushNoLock(log_ctx);
SCMutexUnlock(&log_ctx->fp_mutex);
}
/**
* \brief Write buffer to log file.
* \retval 0 on failure; otherwise, the return value of fwrite_unlocked (number of
@ -224,8 +240,15 @@ static int SCLogFileWriteNoLock(const char *buffer, int buffer_len, LogFileCtx *
log_ctx->filename);
}
log_ctx->output_errors++;
} else if (log_ctx->buffer_size) {
SCFflushUnlocked(log_ctx->fp);
return ret;
}
log_ctx->bytes_since_last_flush += buffer_len;
if (log_ctx->buffer_size && log_ctx->bytes_since_last_flush >= log_ctx->buffer_size) {
SCLogDebug("%s: flushing %" PRIu64 " during write", log_ctx->filename,
log_ctx->bytes_since_last_flush);
SCLogFileFlushNoLock(log_ctx);
}
}
@ -248,35 +271,7 @@ static int SCLogFileWrite(const char *buffer, int buffer_len, LogFileCtx *log_ct
} else
#endif
{
/* Check for rotation. */
if (log_ctx->rotation_flag) {
log_ctx->rotation_flag = 0;
SCConfLogReopen(log_ctx);
}
if (log_ctx->flags & LOGFILE_ROTATE_INTERVAL) {
time_t now = time(NULL);
if (now >= log_ctx->rotate_time) {
SCConfLogReopen(log_ctx);
log_ctx->rotate_time = now + log_ctx->rotate_interval;
}
}
if (log_ctx->fp) {
clearerr(log_ctx->fp);
if (1 != fwrite(buffer, buffer_len, 1, log_ctx->fp)) {
/* Only the first error is logged */
if (!log_ctx->output_errors) {
SCLogError("%s error while writing to %s",
ferror(log_ctx->fp) ? strerror(errno) : "unknown error",
log_ctx->filename);
}
log_ctx->output_errors++;
} else {
fflush(log_ctx->fp);
}
}
ret = SCLogFileWriteNoLock(buffer, buffer_len, log_ctx);
}
SCMutexUnlock(&log_ctx->fp_mutex);
@ -709,6 +704,7 @@ LogFileCtx *LogFileNewCtx(void)
lf_ctx->Write = SCLogFileWrite;
lf_ctx->Close = SCLogFileClose;
lf_ctx->Flush = SCLogFileFlush;
return lf_ctx;
}
@ -977,6 +973,12 @@ int LogFileFreeCtx(LogFileCtx *lf_ctx)
SCReturnInt(1);
}
void LogFileFlush(LogFileCtx *file_ctx)
{
SCLogDebug("%s: bytes-to-flush %ld", file_ctx->filename, file_ctx->bytes_since_last_flush);
file_ctx->Flush(file_ctx);
}
int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer)
{
if (file_ctx->type == LOGFILE_TYPE_FILE || file_ctx->type == LOGFILE_TYPE_UNIX_DGRAM ||

@ -86,6 +86,7 @@ typedef struct LogFileCtx_ {
int (*Write)(const char *buffer, int buffer_len, struct LogFileCtx_ *fp);
void (*Close)(struct LogFileCtx_ *fp);
void (*Flush)(struct LogFileCtx_ *fp);
LogFileTypeCtx filetype;
@ -159,6 +160,9 @@ typedef struct LogFileCtx_ {
uint64_t dropped;
uint64_t output_errors;
/* Track buffered content */
uint64_t bytes_since_last_flush;
} LogFileCtx;
/* Min time (msecs) before trying to reconnect a Unix domain socket */
@ -173,6 +177,7 @@ typedef struct LogFileCtx_ {
LogFileCtx *LogFileNewCtx(void);
int LogFileFreeCtx(LogFileCtx *);
int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer);
void LogFileFlush(LogFileCtx *file_ctx);
LogFileCtx *LogFileEnsureExists(ThreadId thread_id, LogFileCtx *lf_ctx);
int SCConfLogOpenGeneric(ConfNode *conf, LogFileCtx *, const char *, int);

Loading…
Cancel
Save