log: Use hash table for slot maintenance

Issue: 5198

This commit modifies the threaded logging support to use the hash table
for handling thread/slot mappings. As a result, it's no longer necessary
to provide the thread id when ensuring the log output exists.
pull/8356/head
Jeff Lucovsky 4 years ago committed by Jeff Lucovsky
parent f06aabc32a
commit 018ea2625f

@ -38,7 +38,7 @@ OutputJsonThreadCtx *CreateEveThreadCtx(ThreadVars *t, OutputJsonCtx *ctx)
goto error; goto error;
} }
thread->file_ctx = LogFileEnsureExists(ctx->file_ctx, t->id); thread->file_ctx = LogFileEnsureExists(ctx->file_ctx);
if (!thread->file_ctx) { if (!thread->file_ctx) {
goto error; goto error;
} }
@ -104,7 +104,7 @@ TmEcode JsonLogThreadInit(ThreadVars *t, const void *initdata, void **data)
} }
thread->ctx = ((OutputCtx *)initdata)->data; thread->ctx = ((OutputCtx *)initdata)->data;
thread->file_ctx = LogFileEnsureExists(thread->ctx->file_ctx, t->id); thread->file_ctx = LogFileEnsureExists(thread->ctx->file_ctx);
if (!thread->file_ctx) { if (!thread->file_ctx) {
goto error_exit; goto error_exit;
} }

@ -344,7 +344,7 @@ static TmEcode JsonStatsLogThreadInit(ThreadVars *t, const void *initdata, void
/* Use the Output Context (file pointer and mutex) */ /* Use the Output Context (file pointer and mutex) */
aft->statslog_ctx = ((OutputCtx *)initdata)->data; aft->statslog_ctx = ((OutputCtx *)initdata)->data;
aft->file_ctx = LogFileEnsureExists(aft->statslog_ctx->file_ctx, t->id); aft->file_ctx = LogFileEnsureExists(aft->statslog_ctx->file_ctx);
if (!aft->file_ctx) { if (!aft->file_ctx) {
goto error_exit; goto error_exit;
} }

@ -372,6 +372,7 @@ bool SCLogOpenThreadedFile(
} }
parent_ctx->threads->slot_count = slot_count; parent_ctx->threads->slot_count = slot_count;
parent_ctx->threads->last_slot = 0;
parent_ctx->threads->lf_slots = SCCalloc(slot_count, sizeof(LogFileCtx *)); parent_ctx->threads->lf_slots = SCCalloc(slot_count, sizeof(LogFileCtx *));
if (!parent_ctx->threads->lf_slots) { if (!parent_ctx->threads->lf_slots) {
SCLogError("Unable to allocate thread slots"); SCLogError("Unable to allocate thread slots");
@ -682,61 +683,102 @@ LogFileCtx *LogFileNewCtx(void)
return lf_ctx; return lf_ctx;
} }
/** \brief LogFileThread2Slot() Return a file slot
* \retval int file slot for caller
*
* This function returns the file slot for the calling thread.
* Each thread -- identified by its operating system thread-id -- has its
* own slot that includes a file pointer.
*/
static int LogFileThread2Slot(LogThreadedFileCtx *parent)
{
ThreadSlotHashEntry thread_hash_entry;
/* Check hash table for thread id*/
thread_hash_entry.thread_id = SCGetThreadIdLong();
ThreadSlotHashEntry *ent =
HashTableLookup(parent->ht, &thread_hash_entry, sizeof(thread_hash_entry));
if (ent) {
return ent->slot;
}
ent = SCCalloc(1, sizeof(*ent));
if (!ent) {
FatalError("Unable to allocate thread/slot entry");
}
ent->thread_id = thread_hash_entry.thread_id;
ent->slot = ++parent->last_slot;
SCLogDebug("Trying to add thread %ld to slot %d", ent->thread_id, ent->slot);
if (0 != HashTableAdd(parent->ht, ent, 0)) {
FatalError("Unable to add thread/slot mapping");
}
return ent->slot;
}
/** \brief LogFileEnsureExists() Ensure a log file context for the thread exists /** \brief LogFileEnsureExists() Ensure a log file context for the thread exists
* \param parent_ctx * \param parent_ctx
* \param thread_id
* \retval LogFileCtx * pointer if successful; NULL otherwise * \retval LogFileCtx * pointer if successful; NULL otherwise
*/ */
LogFileCtx *LogFileEnsureExists(LogFileCtx *parent_ctx, int thread_id) LogFileCtx *LogFileEnsureExists(LogFileCtx *parent_ctx)
{ {
/* threaded output disabled */ /* threaded output disabled */
if (!parent_ctx->threaded) if (!parent_ctx->threaded)
return parent_ctx; return parent_ctx;
SCLogDebug("Adding reference %d to file ctx %p", thread_id, parent_ctx);
SCMutexLock(&parent_ctx->threads->mutex); SCMutexLock(&parent_ctx->threads->mutex);
/* are there enough context slots already */ /* Find this thread's slot */
if (thread_id < parent_ctx->threads->slot_count) { int slot = LogFileThread2Slot(parent_ctx->threads);
/* has it been opened yet? */ SCLogDebug("Adding reference for thread %ld [slot %d] to file %s [ctx %p]", SCGetThreadIdLong(),
if (!parent_ctx->threads->lf_slots[thread_id]) { slot, parent_ctx->filename, parent_ctx);
SCLogDebug("Opening new file for %d reference to file ctx %p", thread_id, parent_ctx);
LogFileNewThreadedCtx(parent_ctx, parent_ctx->filename, parent_ctx->threads->append, thread_id); /* Add slots if necessary */
} if (slot >= parent_ctx->threads->slot_count) {
SCMutexUnlock(&parent_ctx->threads->mutex); /* ensure there's a slot for the caller */
SCLogDebug("Existing file for %d reference to file ctx %p", thread_id, parent_ctx); int new_size = MAX(parent_ctx->threads->slot_count << 1, slot + 1);
return parent_ctx->threads->lf_slots[thread_id]; SCLogDebug("Increasing slot count; current %d, trying %d", parent_ctx->threads->slot_count,
} new_size);
LogFileCtx **new_array =
/* ensure there's a slot for the caller */ SCRealloc(parent_ctx->threads->lf_slots, new_size * sizeof(LogFileCtx *));
int new_size = MAX(parent_ctx->threads->slot_count << 1, thread_id + 1); if (new_array == NULL) {
SCLogDebug("Increasing slot count; current %d, trying %d", /* Try one more time */
parent_ctx->threads->slot_count, new_size); SCLogDebug("Unable to increase file context array size to %d; trying %d", new_size,
LogFileCtx **new_array = SCRealloc(parent_ctx->threads->lf_slots, new_size * sizeof(LogFileCtx *)); slot + 1);
if (new_array == NULL) { new_size = slot + 1;
/* Try one more time */ new_array = SCRealloc(parent_ctx->threads->lf_slots, new_size * sizeof(LogFileCtx *));
SCLogDebug("Unable to increase file context array size to %d; trying %d", }
new_size, thread_id + 1);
new_size = thread_id + 1; if (new_array == NULL) {
new_array = SCRealloc(parent_ctx->threads->lf_slots, new_size * sizeof(LogFileCtx *)); SCMutexUnlock(&parent_ctx->threads->mutex);
} SCLogError("Unable to increase file context array size to %d", new_size);
return NULL;
if (new_array == NULL) { }
SCMutexUnlock(&parent_ctx->threads->mutex);
SCLogError("Unable to increase file context array size to %d", new_size);
return NULL;
}
parent_ctx->threads->lf_slots = new_array; parent_ctx->threads->lf_slots = new_array;
/* initialize newly added slots */ /* initialize newly added slots */
for (int i = parent_ctx->threads->slot_count; i < new_size; i++) { for (int i = parent_ctx->threads->slot_count; i < new_size; i++) {
parent_ctx->threads->lf_slots[i] = NULL; parent_ctx->threads->lf_slots[i] = NULL;
}
parent_ctx->threads->slot_count = new_size;
} }
parent_ctx->threads->slot_count = new_size;
LogFileNewThreadedCtx(parent_ctx, parent_ctx->filename, parent_ctx->threads->append, thread_id);
/* has it been opened yet? */
if (!parent_ctx->threads->lf_slots[slot]) {
SCLogDebug("Opening new file for thread/slot %d to file %s [ctx %p]", slot,
parent_ctx->filename, parent_ctx);
if (!LogFileNewThreadedCtx(
parent_ctx, parent_ctx->filename, parent_ctx->threads->append, slot))
BUG_ON(parent_ctx->threads->lf_slots[slot] != NULL);
}
SCMutexUnlock(&parent_ctx->threads->mutex); SCMutexUnlock(&parent_ctx->threads->mutex);
return parent_ctx->threads->lf_slots[thread_id]; if (sc_log_global_log_level >= SC_LOG_DEBUG) {
if (parent_ctx->threads->lf_slots[slot])
SCLogDebug("Existing file for thread/slot %d reference to file %s [ctx %p]", slot,
parent_ctx->filename, parent_ctx);
}
return parent_ctx->threads->lf_slots[slot];
} }
/** \brief LogFileThreadedName() Create file name for threaded EVE storage /** \brief LogFileThreadedName() Create file name for threaded EVE storage
@ -793,13 +835,14 @@ static bool LogFileThreadedName(
* \param parent_ctx * \param parent_ctx
* \param log_path * \param log_path
* \param append * \param append
* \param thread_id * \param slot
*/ */
static bool LogFileNewThreadedCtx(LogFileCtx *parent_ctx, const char *log_path, const char *append, int thread_id) static bool LogFileNewThreadedCtx(
LogFileCtx *parent_ctx, const char *log_path, const char *append, int slot)
{ {
LogFileCtx *thread = SCCalloc(1, sizeof(LogFileCtx)); LogFileCtx *thread = SCCalloc(1, sizeof(LogFileCtx));
if (!thread) { if (!thread) {
SCLogError("Unable to allocate thread file context slot %d", thread_id); SCLogError("Unable to allocate thread file context slot %d", slot);
return false; return false;
} }
@ -817,7 +860,7 @@ static bool LogFileNewThreadedCtx(LogFileCtx *parent_ctx, const char *log_path,
} }
thread->filename = SCStrdup(fname); thread->filename = SCStrdup(fname);
if (!thread->filename) { if (!thread->filename) {
SCLogError("Unable to duplicate filename for context slot %d", thread_id); SCLogError("Unable to duplicate filename for context slot %d", slot);
goto error; goto error;
} }
thread->is_regular = true; thread->is_regular = true;
@ -826,13 +869,13 @@ static bool LogFileNewThreadedCtx(LogFileCtx *parent_ctx, const char *log_path,
OutputRegisterFileRotationFlag(&thread->rotation_flag); OutputRegisterFileRotationFlag(&thread->rotation_flag);
} else if (parent_ctx->type == LOGFILE_TYPE_PLUGIN) { } else if (parent_ctx->type == LOGFILE_TYPE_PLUGIN) {
thread->plugin.plugin->ThreadInit( thread->plugin.plugin->ThreadInit(
thread->plugin.init_data, thread_id, &thread->plugin.thread_data); thread->plugin.init_data, slot, &thread->plugin.thread_data);
} }
thread->threaded = false; thread->threaded = false;
thread->parent = parent_ctx; thread->parent = parent_ctx;
thread->id = thread_id; thread->slot = slot;
parent_ctx->threads->lf_slots[thread_id] = thread; parent_ctx->threads->lf_slots[slot] = thread;
return true; return true;
error: error:
@ -845,7 +888,7 @@ error:
if (thread) { if (thread) {
SCFree(thread); SCFree(thread);
} }
parent_ctx->threads->lf_slots[thread_id] = NULL; parent_ctx->threads->lf_slots[slot] = NULL;
return false; return false;
} }
@ -892,7 +935,7 @@ int LogFileFreeCtx(LogFileCtx *lf_ctx)
} }
if (lf_ctx->parent) { if (lf_ctx->parent) {
SCMutexLock(&lf_ctx->parent->threads->mutex); SCMutexLock(&lf_ctx->parent->threads->mutex);
lf_ctx->parent->threads->lf_slots[lf_ctx->id] = NULL; lf_ctx->parent->threads->lf_slots[lf_ctx->slot] = NULL;
SCMutexUnlock(&lf_ctx->parent->threads->mutex); SCMutexUnlock(&lf_ctx->parent->threads->mutex);
} }
} }

@ -98,7 +98,7 @@ typedef struct LogFileCtx_ {
/** When threaded, track of the parent and thread id */ /** When threaded, track of the parent and thread id */
bool threaded; bool threaded;
struct LogFileCtx_ *parent; struct LogFileCtx_ *parent;
int id; int slot;
/** the type of file */ /** the type of file */
enum LogFileType type; enum LogFileType type;
@ -175,7 +175,7 @@ LogFileCtx *LogFileNewCtx(void);
int LogFileFreeCtx(LogFileCtx *); int LogFileFreeCtx(LogFileCtx *);
int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer); int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer);
LogFileCtx *LogFileEnsureExists(LogFileCtx *lf_ctx, int thread_id); LogFileCtx *LogFileEnsureExists(LogFileCtx *lf_ctx);
int SCConfLogOpenGeneric(ConfNode *conf, LogFileCtx *, const char *, int); int SCConfLogOpenGeneric(ConfNode *conf, LogFileCtx *, const char *, int);
int SCConfLogReopen(LogFileCtx *); int SCConfLogReopen(LogFileCtx *);
bool SCLogOpenThreadedFile( bool SCLogOpenThreadedFile(

Loading…
Cancel
Save