diff --git a/src/output-eve-stream.c b/src/output-eve-stream.c index 15d262b435..3fcd12ea41 100644 --- a/src/output-eve-stream.c +++ b/src/output-eve-stream.c @@ -454,7 +454,7 @@ void EveStreamLogRegister(void) { OutputPacketLoggerFunctions output_logger_functions = { .LogFunc = EveStreamLogger, - .FlushFunc = NULL, + .FlushFunc = OutputJsonLogFlush, .ConditionFunc = EveStreamLogCondition, .ThreadInitFunc = EveStreamLogThreadInit, .ThreadDeinitFunc = EveStreamLogThreadDeinit, diff --git a/src/output-json-alert.c b/src/output-json-alert.c index 419a1d2b42..ed6066be08 100644 --- a/src/output-json-alert.c +++ b/src/output-json-alert.c @@ -823,6 +823,14 @@ static int AlertJsonDecoderEvent(ThreadVars *tv, JsonAlertLogThread *aft, const return TM_ECODE_OK; } +static int JsonAlertFlush(ThreadVars *tv, void *thread_data, const Packet *p) +{ + JsonAlertLogThread *aft = thread_data; + SCLogDebug("%s flushing %s", tv->name, ((LogFileCtx *)(aft->ctx->file_ctx))->filename); + OutputJsonFlush(aft->ctx); + return 0; +} + static int JsonAlertLogger(ThreadVars *tv, void *thread_data, const Packet *p) { JsonAlertLogThread *aft = thread_data; @@ -1067,7 +1075,7 @@ void JsonAlertLogRegister (void) { OutputPacketLoggerFunctions output_logger_functions = { .LogFunc = JsonAlertLogger, - .FlushFunc = NULL, + .FlushFunc = JsonAlertFlush, .ConditionFunc = JsonAlertLogCondition, .ThreadInitFunc = JsonAlertLogThreadInit, .ThreadDeinitFunc = JsonAlertLogThreadDeinit, diff --git a/src/output-json-anomaly.c b/src/output-json-anomaly.c index cd9e5dc068..00c4cbd570 100644 --- a/src/output-json-anomaly.c +++ b/src/output-json-anomaly.c @@ -272,6 +272,14 @@ static int AnomalyJson(ThreadVars *tv, JsonAnomalyLogThread *aft, const Packet * return rc; } +static int JsonAnomalyFlush(ThreadVars *tv, void *thread_data, const Packet *p) +{ + JsonAnomalyLogThread *aft = thread_data; + SCLogDebug("%s flushing %s", tv->name, ((LogFileCtx *)(aft->ctx->file_ctx))->filename); + OutputJsonFlush(aft->ctx); + return 0; +} + static int JsonAnomalyLogger(ThreadVars *tv, void *thread_data, const Packet *p) { JsonAnomalyLogThread *aft = thread_data; @@ -451,7 +459,7 @@ void JsonAnomalyLogRegister (void) { OutputPacketLoggerFunctions output_logger_functions = { .LogFunc = JsonAnomalyLogger, - .FlushFunc = NULL, + .FlushFunc = JsonAnomalyFlush, .ConditionFunc = JsonAnomalyLogCondition, .ThreadInitFunc = JsonAnomalyLogThreadInit, .ThreadDeinitFunc = JsonAnomalyLogThreadDeinit, diff --git a/src/output-json-common.c b/src/output-json-common.c index 5723e05a38..1ec08b6905 100644 --- a/src/output-json-common.c +++ b/src/output-json-common.c @@ -70,6 +70,15 @@ static void OutputJsonLogDeInitCtxSub(OutputCtx *output_ctx) SCFree(output_ctx); } +int OutputJsonLogFlush(ThreadVars *tv, void *thread_data, const Packet *p) +{ + OutputJsonThreadCtx *aft = thread_data; + LogFileCtx *file_ctx = aft->ctx->file_ctx; + SCLogDebug("%s flushing %s", tv->name, file_ctx->filename); + LogFileFlush(file_ctx); + return 0; +} + OutputInitResult OutputJsonLogInitSub(ConfNode *conf, OutputCtx *parent_ctx) { OutputInitResult result = { NULL, false }; diff --git a/src/output-json-drop.c b/src/output-json-drop.c index 29ead13e07..79e663c437 100644 --- a/src/output-json-drop.c +++ b/src/output-json-drop.c @@ -392,7 +392,7 @@ void JsonDropLogRegister (void) { OutputPacketLoggerFunctions output_logger_functions = { .LogFunc = JsonDropLogger, - .FlushFunc = NULL, + .FlushFunc = OutputJsonLogFlush, .ConditionFunc = JsonDropLogCondition, .ThreadInitFunc = JsonDropLogThreadInit, .ThreadDeinitFunc = JsonDropLogThreadDeinit, diff --git a/src/output-json-frame.c b/src/output-json-frame.c index dfd895b8ab..3ae80b820f 100644 --- a/src/output-json-frame.c +++ b/src/output-json-frame.c @@ -562,7 +562,7 @@ void JsonFrameLogRegister(void) { OutputPacketLoggerFunctions output_logger_functions = { .LogFunc = JsonFrameLogger, - .FlushFunc = NULL, + .FlushFunc = OutputJsonLogFlush, .ConditionFunc = JsonFrameLogCondition, .ThreadInitFunc = JsonFrameLogThreadInit, .ThreadDeinitFunc = JsonFrameLogThreadDeinit, diff --git a/src/output-json-metadata.c b/src/output-json-metadata.c index a87d735839..c930eaf177 100644 --- a/src/output-json-metadata.c +++ b/src/output-json-metadata.c @@ -96,7 +96,7 @@ void JsonMetadataLogRegister (void) { OutputPacketLoggerFunctions output_logger_functions = { .LogFunc = JsonMetadataLogger, - .FlushFunc = NULL, + .FlushFunc = OutputJsonLogFlush, .ConditionFunc = JsonMetadataLogCondition, .ThreadInitFunc = JsonLogThreadInit, .ThreadDeinitFunc = JsonLogThreadDeinit, diff --git a/src/output-json.c b/src/output-json.c index 37e0a87b6f..422191adbc 100644 --- a/src/output-json.c +++ b/src/output-json.c @@ -956,6 +956,12 @@ int OutputJSONBuffer(json_t *js, LogFileCtx *file_ctx, MemBuffer **buffer) return 0; } +void OutputJsonFlush(OutputJsonThreadCtx *ctx) +{ + LogFileCtx *file_ctx = ctx->file_ctx; + LogFileFlush(file_ctx); +} + void OutputJsonBuilderBuffer( ThreadVars *tv, const Packet *p, Flow *f, JsonBuilder *js, OutputJsonThreadCtx *ctx) { diff --git a/src/output-json.h b/src/output-json.h index b8c11778ed..82989a1156 100644 --- a/src/output-json.h +++ b/src/output-json.h @@ -114,6 +114,7 @@ TmEcode JsonLogThreadDeinit(ThreadVars *t, void *data); void EveAddCommonOptions(const OutputJsonCommonSettings *cfg, const Packet *p, const Flow *f, JsonBuilder *js, enum OutputJsonLogDirection dir); +int OutputJsonLogFlush(ThreadVars *tv, void *thread_data, const Packet *p); void EveAddMetadata(const Packet *p, const Flow *f, JsonBuilder *js); int OutputJSONMemBufferCallback(const char *str, size_t size, void *data); @@ -121,5 +122,6 @@ int OutputJSONMemBufferCallback(const char *str, size_t size, void *data); OutputJsonThreadCtx *CreateEveThreadCtx(ThreadVars *t, OutputJsonCtx *ctx); void FreeEveThreadCtx(OutputJsonThreadCtx *ctx); void JSONFormatAndAddMACAddr(JsonBuilder *js, const char *key, const uint8_t *val, bool is_array); +void OutputJsonFlush(OutputJsonThreadCtx *ctx); #endif /* SURICATA_OUTPUT_JSON_H */ diff --git a/src/util-logopenfile.c b/src/util-logopenfile.c index 7d4459ebc9..c27dcf8bab 100644 --- a/src/util-logopenfile.c +++ b/src/util-logopenfile.c @@ -126,7 +126,7 @@ static int SCLogUnixSocketReconnect(LogFileCtx *log_ctx) log_ctx->fp = SCLogOpenUnixSocketFp(log_ctx->filename, log_ctx->sock_type, 0); if (log_ctx->fp) { /* Connected at last (or reconnected) */ - SCLogNotice("Reconnected socket \"%s\"", log_ctx->filename); + SCLogDebug("Reconnected socket \"%s\"", log_ctx->filename); } else if (disconnected) { SCLogWarning("Reconnect failed: %s (will keep trying)", strerror(errno)); } @@ -189,6 +189,22 @@ static inline void OutputWriteLock(pthread_mutex_t *m) } +/** + * \brief Flush a log file. + */ +static void SCLogFileFlushNoLock(LogFileCtx *log_ctx) +{ + log_ctx->bytes_since_last_flush = 0; + SCFflushUnlocked(log_ctx->fp); +} + +static void SCLogFileFlush(LogFileCtx *log_ctx) +{ + OutputWriteLock(&log_ctx->fp_mutex); + SCLogFileFlushNoLock(log_ctx); + SCMutexUnlock(&log_ctx->fp_mutex); +} + /** * \brief Write buffer to log file. * \retval 0 on failure; otherwise, the return value of fwrite_unlocked (number of @@ -224,8 +240,15 @@ static int SCLogFileWriteNoLock(const char *buffer, int buffer_len, LogFileCtx * log_ctx->filename); } log_ctx->output_errors++; - } else if (log_ctx->buffer_size) { - SCFflushUnlocked(log_ctx->fp); + return ret; + } + + log_ctx->bytes_since_last_flush += buffer_len; + + if (log_ctx->buffer_size && log_ctx->bytes_since_last_flush >= log_ctx->buffer_size) { + SCLogDebug("%s: flushing %" PRIu64 " during write", log_ctx->filename, + log_ctx->bytes_since_last_flush); + SCLogFileFlushNoLock(log_ctx); } } @@ -248,35 +271,7 @@ static int SCLogFileWrite(const char *buffer, int buffer_len, LogFileCtx *log_ct } else #endif { - - /* Check for rotation. */ - if (log_ctx->rotation_flag) { - log_ctx->rotation_flag = 0; - SCConfLogReopen(log_ctx); - } - - if (log_ctx->flags & LOGFILE_ROTATE_INTERVAL) { - time_t now = time(NULL); - if (now >= log_ctx->rotate_time) { - SCConfLogReopen(log_ctx); - log_ctx->rotate_time = now + log_ctx->rotate_interval; - } - } - - if (log_ctx->fp) { - clearerr(log_ctx->fp); - if (1 != fwrite(buffer, buffer_len, 1, log_ctx->fp)) { - /* Only the first error is logged */ - if (!log_ctx->output_errors) { - SCLogError("%s error while writing to %s", - ferror(log_ctx->fp) ? strerror(errno) : "unknown error", - log_ctx->filename); - } - log_ctx->output_errors++; - } else { - fflush(log_ctx->fp); - } - } + ret = SCLogFileWriteNoLock(buffer, buffer_len, log_ctx); } SCMutexUnlock(&log_ctx->fp_mutex); @@ -709,6 +704,7 @@ LogFileCtx *LogFileNewCtx(void) lf_ctx->Write = SCLogFileWrite; lf_ctx->Close = SCLogFileClose; + lf_ctx->Flush = SCLogFileFlush; return lf_ctx; } @@ -977,6 +973,12 @@ int LogFileFreeCtx(LogFileCtx *lf_ctx) SCReturnInt(1); } +void LogFileFlush(LogFileCtx *file_ctx) +{ + SCLogDebug("%s: bytes-to-flush %ld", file_ctx->filename, file_ctx->bytes_since_last_flush); + file_ctx->Flush(file_ctx); +} + int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer) { if (file_ctx->type == LOGFILE_TYPE_FILE || file_ctx->type == LOGFILE_TYPE_UNIX_DGRAM || diff --git a/src/util-logopenfile.h b/src/util-logopenfile.h index efa8159686..19c9e5e1c7 100644 --- a/src/util-logopenfile.h +++ b/src/util-logopenfile.h @@ -86,6 +86,7 @@ typedef struct LogFileCtx_ { int (*Write)(const char *buffer, int buffer_len, struct LogFileCtx_ *fp); void (*Close)(struct LogFileCtx_ *fp); + void (*Flush)(struct LogFileCtx_ *fp); LogFileTypeCtx filetype; @@ -159,6 +160,9 @@ typedef struct LogFileCtx_ { uint64_t dropped; uint64_t output_errors; + + /* Track buffered content */ + uint64_t bytes_since_last_flush; } LogFileCtx; /* Min time (msecs) before trying to reconnect a Unix domain socket */ @@ -173,6 +177,7 @@ typedef struct LogFileCtx_ { LogFileCtx *LogFileNewCtx(void); int LogFileFreeCtx(LogFileCtx *); int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer); +void LogFileFlush(LogFileCtx *file_ctx); LogFileCtx *LogFileEnsureExists(ThreadId thread_id, LogFileCtx *lf_ctx); int SCConfLogOpenGeneric(ConfNode *conf, LogFileCtx *, const char *, int);