output/plugin: Use Suri thread-id for plugins

Issue: 6408

Use the Suricata thread id for plugin thread initialization to give the
plugin a better correlating factor to the actual Suricata threads.
pull/10652/head
Jeff Lucovsky 2 years ago committed by Victor Julien
parent 1122fa24b6
commit 85d321a689

@ -47,7 +47,7 @@ static int NullLogWrite(const char *buffer, int buffer_len, void *init_data, voi
return 0; return 0;
} }
static int NullLogThreadInit(void *init_data, int thread_id, void **thread_data) static int NullLogThreadInit(void *init_data, ThreadId thread_id, void **thread_data)
{ {
*thread_data = NULL; *thread_data = NULL;
return 0; return 0;

@ -38,7 +38,7 @@ OutputJsonThreadCtx *CreateEveThreadCtx(ThreadVars *t, OutputJsonCtx *ctx)
goto error; goto error;
} }
thread->file_ctx = LogFileEnsureExists(ctx->file_ctx); thread->file_ctx = LogFileEnsureExists(t->id, ctx->file_ctx);
if (!thread->file_ctx) { if (!thread->file_ctx) {
goto error; goto error;
} }
@ -104,7 +104,7 @@ TmEcode JsonLogThreadInit(ThreadVars *t, const void *initdata, void **data)
} }
thread->ctx = ((OutputCtx *)initdata)->data; thread->ctx = ((OutputCtx *)initdata)->data;
thread->file_ctx = LogFileEnsureExists(thread->ctx->file_ctx); thread->file_ctx = LogFileEnsureExists(t->id, thread->ctx->file_ctx);
if (!thread->file_ctx) { if (!thread->file_ctx) {
goto error_exit; goto error_exit;
} }

@ -375,7 +375,7 @@ static TmEcode JsonStatsLogThreadInit(ThreadVars *t, const void *initdata, void
/* Use the Output Context (file pointer and mutex) */ /* Use the Output Context (file pointer and mutex) */
aft->statslog_ctx = ((OutputCtx *)initdata)->data; aft->statslog_ctx = ((OutputCtx *)initdata)->data;
aft->file_ctx = LogFileEnsureExists(aft->statslog_ctx->file_ctx); aft->file_ctx = LogFileEnsureExists(t->id, aft->statslog_ctx->file_ctx);
if (!aft->file_ctx) { if (!aft->file_ctx) {
goto error_exit; goto error_exit;
} }
@ -471,8 +471,8 @@ static OutputInitResult OutputStatsLogInitSub(ConfNode *conf, OutputCtx *parent_
} }
SCLogDebug("Preparing file context for stats submodule logger"); SCLogDebug("Preparing file context for stats submodule logger");
/* Share output slot with thread 1 */ /* prepared by suricata-main */
stats_ctx->file_ctx = LogFileEnsureExists(ajt->file_ctx); stats_ctx->file_ctx = LogFileEnsureExists(0, ajt->file_ctx);
if (!stats_ctx->file_ctx) { if (!stats_ctx->file_ctx) {
SCFree(stats_ctx); SCFree(stats_ctx);
SCFree(output_ctx); SCFree(output_ctx);

@ -40,6 +40,7 @@ typedef struct SCPlugin_ {
} SCPlugin; } SCPlugin;
typedef SCPlugin *(*SCPluginRegisterFunc)(void); typedef SCPlugin *(*SCPluginRegisterFunc)(void);
typedef uint32_t ThreadId;
/** /**
* Structure used to define an Eve output file type plugin. * Structure used to define an Eve output file type plugin.
@ -54,8 +55,9 @@ typedef struct SCEveFileType_ {
int (*Write)(const char *buffer, int buffer_len, void *init_data, void *thread_data); int (*Write)(const char *buffer, int buffer_len, void *init_data, void *thread_data);
/* Close - Called on final close */ /* Close - Called on final close */
void (*Deinit)(void *init_data); void (*Deinit)(void *init_data);
/* ThreadInit - Called for each thread using file object*/ /* ThreadInit - Called for each thread using file object; non-zero thread_ids correlate
int (*ThreadInit)(void *init_data, int thread_id, void **thread_data); * to Suricata's worker threads; 0 correlates to the Suricata main thread */
int (*ThreadInit)(void *init_data, ThreadId thread_id, void **thread_data);
/* ThreadDeinit - Called for each thread using file object */ /* ThreadDeinit - Called for each thread using file object */
int (*ThreadDeinit)(void *init_data, void *thread_data); int (*ThreadDeinit)(void *init_data, void *thread_data);
TAILQ_ENTRY(SCEveFileType_) entries; TAILQ_ENTRY(SCEveFileType_) entries;

@ -50,7 +50,7 @@ static bool LogFileNewThreadedCtx(LogFileCtx *parent_ctx, const char *log_path,
ThreadLogFileHashEntry *entry); ThreadLogFileHashEntry *entry);
// Threaded eve.json identifier // Threaded eve.json identifier
static SC_ATOMIC_DECL_AND_INIT_WITH_VAL(uint32_t, eve_file_id, 1); static SC_ATOMIC_DECL_AND_INIT_WITH_VAL(uint16_t, eve_file_id, 1);
#ifdef BUILD_WITH_UNIXSOCKET #ifdef BUILD_WITH_UNIXSOCKET
/** \brief connect to the indicated local stream socket, logging any errors /** \brief connect to the indicated local stream socket, logging any errors
@ -677,7 +677,7 @@ LogFileCtx *LogFileNewCtx(void)
* Each thread -- identified by its operating system thread-id -- has its * Each thread -- identified by its operating system thread-id -- has its
* own file entry that includes a file pointer. * own file entry that includes a file pointer.
*/ */
static ThreadLogFileHashEntry *LogFileThread2Slot(LogThreadedFileCtx *parent) static ThreadLogFileHashEntry *LogFileThread2Slot(LogThreadedFileCtx *parent, ThreadId thread_id)
{ {
ThreadLogFileHashEntry thread_hash_entry; ThreadLogFileHashEntry thread_hash_entry;
@ -689,12 +689,14 @@ static ThreadLogFileHashEntry *LogFileThread2Slot(LogThreadedFileCtx *parent)
if (!ent) { if (!ent) {
ent = SCCalloc(1, sizeof(*ent)); ent = SCCalloc(1, sizeof(*ent));
if (!ent) { if (!ent) {
FatalError("Unable to allocate thread/entry entry"); FatalError("Unable to allocate thread/hash-entry entry");
} }
ent->thread_id = thread_hash_entry.thread_id; ent->thread_id = thread_hash_entry.thread_id;
SCLogDebug("Trying to add thread %ld to entry %d", ent->thread_id, ent->slot_number); ent->internal_thread_id = thread_id;
SCLogDebug(
"Trying to add thread %" PRIi64 " to entry %d", ent->thread_id, ent->slot_number);
if (0 != HashTableAdd(parent->ht, ent, 0)) { if (0 != HashTableAdd(parent->ht, ent, 0)) {
FatalError("Unable to add thread/entry mapping"); FatalError("Unable to add thread/hash-entry mapping");
} }
} }
return ent; return ent;
@ -704,7 +706,7 @@ static ThreadLogFileHashEntry *LogFileThread2Slot(LogThreadedFileCtx *parent)
* \param parent_ctx * \param parent_ctx
* \retval LogFileCtx * pointer if successful; NULL otherwise * \retval LogFileCtx * pointer if successful; NULL otherwise
*/ */
LogFileCtx *LogFileEnsureExists(LogFileCtx *parent_ctx) LogFileCtx *LogFileEnsureExists(ThreadId thread_id, LogFileCtx *parent_ctx)
{ {
/* threaded output disabled */ /* threaded output disabled */
if (!parent_ctx->threaded) if (!parent_ctx->threaded)
@ -712,15 +714,16 @@ LogFileCtx *LogFileEnsureExists(LogFileCtx *parent_ctx)
SCMutexLock(&parent_ctx->threads->mutex); SCMutexLock(&parent_ctx->threads->mutex);
/* Find this thread's entry */ /* Find this thread's entry */
ThreadLogFileHashEntry *entry = LogFileThread2Slot(parent_ctx->threads); ThreadLogFileHashEntry *entry = LogFileThread2Slot(parent_ctx->threads, thread_id);
SCLogDebug("Adding reference for thread %ld [slot %d] to file %s [ctx %p]", SCGetThreadIdLong(), SCLogDebug("%s: Adding reference for thread %" PRIi64
entry->slot_number, parent_ctx->filename, parent_ctx); " (local thread id %d) to file %s [ctx %p]",
t_thread_name, SCGetThreadIdLong(), thread_id, parent_ctx->filename, parent_ctx);
bool new = entry->isopen; bool new = entry->isopen;
/* has it been opened yet? */ /* has it been opened yet? */
if (!entry->isopen) { if (!entry->isopen) {
SCLogDebug("Opening new file for thread/slot %d to file %s [ctx %p]", entry->slot_number, SCLogDebug("%s: Opening new file for thread/id %d to file %s [ctx %p]", t_thread_name,
parent_ctx->filename, parent_ctx); thread_id, parent_ctx->filename, parent_ctx);
if (LogFileNewThreadedCtx( if (LogFileNewThreadedCtx(
parent_ctx, parent_ctx->filename, parent_ctx->threads->append, entry)) { parent_ctx, parent_ctx->filename, parent_ctx->threads->append, entry)) {
entry->isopen = true; entry->isopen = true;
@ -810,11 +813,13 @@ static bool LogFileNewThreadedCtx(LogFileCtx *parent_ctx, const char *log_path,
*thread = *parent_ctx; *thread = *parent_ctx;
if (parent_ctx->type == LOGFILE_TYPE_FILE) { if (parent_ctx->type == LOGFILE_TYPE_FILE) {
char fname[LOGFILE_NAME_MAX]; char fname[LOGFILE_NAME_MAX];
if (!LogFileThreadedName(log_path, fname, sizeof(fname), SC_ATOMIC_ADD(eve_file_id, 1))) { entry->slot_number = SC_ATOMIC_ADD(eve_file_id, 1);
if (!LogFileThreadedName(log_path, fname, sizeof(fname), entry->slot_number)) {
SCLogError("Unable to create threaded filename for log"); SCLogError("Unable to create threaded filename for log");
goto error; goto error;
} }
SCLogDebug("Thread open -- using name %s [replaces %s]", fname, log_path); SCLogDebug("%s: thread open -- using name %s [replaces %s] - thread %d [slot %d]",
t_thread_name, fname, log_path, entry->internal_thread_id, entry->slot_number);
thread->fp = SCLogOpenFileFp(fname, append, thread->filemode); thread->fp = SCLogOpenFileFp(fname, append, thread->filemode);
if (thread->fp == NULL) { if (thread->fp == NULL) {
goto error; goto error;
@ -830,8 +835,10 @@ static bool LogFileNewThreadedCtx(LogFileCtx *parent_ctx, const char *log_path,
OutputRegisterFileRotationFlag(&thread->rotation_flag); OutputRegisterFileRotationFlag(&thread->rotation_flag);
} else if (parent_ctx->type == LOGFILE_TYPE_PLUGIN) { } else if (parent_ctx->type == LOGFILE_TYPE_PLUGIN) {
entry->slot_number = SC_ATOMIC_ADD(eve_file_id, 1); entry->slot_number = SC_ATOMIC_ADD(eve_file_id, 1);
SCLogDebug("%s - thread %d [slot %d]", log_path, entry->internal_thread_id,
entry->slot_number);
thread->plugin.plugin->ThreadInit( thread->plugin.plugin->ThreadInit(
thread->plugin.init_data, entry->slot_number, &thread->plugin.thread_data); thread->plugin.init_data, entry->internal_thread_id, &thread->plugin.thread_data);
} }
thread->threaded = false; thread->threaded = false;
thread->parent = parent_ctx; thread->parent = parent_ctx;

@ -49,10 +49,13 @@ typedef struct SyslogSetup_ {
} SyslogSetup; } SyslogSetup;
typedef struct ThreadLogFileHashEntry { typedef struct ThreadLogFileHashEntry {
uint64_t thread_id;
int slot_number; /* slot identifier -- for plugins */
bool isopen;
struct LogFileCtx_ *ctx; struct LogFileCtx_ *ctx;
uint64_t thread_id; /* OS thread identifier */
ThreadId internal_thread_id; /* Suri internal thread id; to assist output plugins correlating
usage */
uint16_t slot_number; /* Slot identifier - used when forming per-thread output names*/
bool isopen;
} ThreadLogFileHashEntry; } ThreadLogFileHashEntry;
struct LogFileCtx_; struct LogFileCtx_;
@ -168,7 +171,7 @@ LogFileCtx *LogFileNewCtx(void);
int LogFileFreeCtx(LogFileCtx *); int LogFileFreeCtx(LogFileCtx *);
int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer); int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer);
LogFileCtx *LogFileEnsureExists(LogFileCtx *lf_ctx); LogFileCtx *LogFileEnsureExists(ThreadId thread_id, LogFileCtx *lf_ctx);
int SCConfLogOpenGeneric(ConfNode *conf, LogFileCtx *, const char *, int); int SCConfLogOpenGeneric(ConfNode *conf, LogFileCtx *, const char *, int);
int SCConfLogReopen(LogFileCtx *); int SCConfLogReopen(LogFileCtx *);
bool SCLogOpenThreadedFile(const char *log_path, const char *append, LogFileCtx *parent_ctx); bool SCLogOpenThreadedFile(const char *log_path, const char *append, LogFileCtx *parent_ctx);

Loading…
Cancel
Save