log/thread: Consolidate threaded file tracking

Issue: 5836

This commit removes the duplicate threaded file tracking from the log
file mechanisms.

Tracking is now consolidated with the threaded hash table.
pull/8572/head
Jeff Lucovsky 3 years ago committed by Victor Julien
parent 99b7257ef6
commit cb174e4fd9

@ -1017,7 +1017,7 @@ static int LogFileTypePrepare(
else if (log_filetype == LOGFILE_TYPE_PLUGIN) {
if (json_ctx->file_ctx->threaded) {
/* Prepare for threaded log output. */
if (!SCLogOpenThreadedFile(NULL, NULL, json_ctx->file_ctx, 1)) {
if (!SCLogOpenThreadedFile(NULL, NULL, json_ctx->file_ctx)) {
return -1;
}
}

@ -47,7 +47,8 @@
#define LOGFILE_NAME_MAX 255
static bool LogFileNewThreadedCtx(LogFileCtx *parent_ctx, const char *log_path, const char *append, int i);
static bool LogFileNewThreadedCtx(LogFileCtx *parent_ctx, const char *log_path, const char *append,
ThreadLogFileHashEntry *entry);
// Threaded eve.json identifier
static SC_ATOMIC_DECL_AND_INIT_WITH_VAL(uint32_t, eve_file_id, 1);
@ -323,35 +324,40 @@ static void SCLogFileClose(LogFileCtx *log_ctx)
SCMutexUnlock(&log_ctx->fp_mutex);
}
static char ThreadSlotHashCompareFunc(
static char ThreadLogFileHashCompareFunc(
void *data1, uint16_t datalen1, void *data2, uint16_t datalen2)
{
ThreadSlotHashEntry *p1 = (ThreadSlotHashEntry *)data1;
ThreadSlotHashEntry *p2 = (ThreadSlotHashEntry *)data2;
ThreadLogFileHashEntry *p1 = (ThreadLogFileHashEntry *)data1;
ThreadLogFileHashEntry *p2 = (ThreadLogFileHashEntry *)data2;
if (p1 == NULL || p2 == NULL)
return 0;
return p1->thread_id == p2->thread_id;
}
static uint32_t ThreadSlotHashFunc(HashTable *ht, void *data, uint16_t datalen)
static uint32_t ThreadLogFileHashFunc(HashTable *ht, void *data, uint16_t datalen)
{
const ThreadSlotHashEntry *ent = (ThreadSlotHashEntry *)data;
const ThreadLogFileHashEntry *ent = (ThreadLogFileHashEntry *)data;
return ent->thread_id % ht->array_size;
}
static void ThreadSlotHashFreeFunc(void *data)
static void ThreadLogFileHashFreeFunc(void *data)
{
ThreadSlotHashEntry *thread_ent = (ThreadSlotHashEntry *)data;
BUG_ON(data == NULL);
ThreadLogFileHashEntry *thread_ent = (ThreadLogFileHashEntry *)data;
if (thread_ent) {
LogFileCtx *lf_ctx = thread_ent->ctx;
/* Free the leaf log file entries */
if (!lf_ctx->threaded) {
LogFileFreeCtx(lf_ctx);
}
SCFree(thread_ent);
}
}
bool SCLogOpenThreadedFile(
const char *log_path, const char *append, LogFileCtx *parent_ctx, int slot_count)
bool SCLogOpenThreadedFile(const char *log_path, const char *append, LogFileCtx *parent_ctx)
{
parent_ctx->threads = SCCalloc(1, sizeof(LogThreadedFileCtx));
if (!parent_ctx->threads) {
@ -359,10 +365,10 @@ bool SCLogOpenThreadedFile(
return false;
}
parent_ctx->threads->ht = HashTableInit(
255, ThreadSlotHashFunc, ThreadSlotHashCompareFunc, ThreadSlotHashFreeFunc);
parent_ctx->threads->ht = HashTableInit(255, ThreadLogFileHashFunc,
ThreadLogFileHashCompareFunc, ThreadLogFileHashFreeFunc);
if (!parent_ctx->threads->ht) {
FatalError("Unable to initialize thread/slot table");
FatalError("Unable to initialize thread/entry hash table");
}
parent_ctx->threads->append = SCStrdup(append == NULL ? DEFAULT_LOG_MODE_APPEND : append);
@ -371,29 +377,11 @@ bool SCLogOpenThreadedFile(
goto error_exit;
}
parent_ctx->threads->slot_count = slot_count;
parent_ctx->threads->last_slot = 0;
parent_ctx->threads->lf_slots = SCCalloc(slot_count, sizeof(LogFileCtx *));
if (!parent_ctx->threads->lf_slots) {
SCLogError("Unable to allocate thread slots");
goto error_exit;
}
SCLogDebug("Allocated %d file context pointers for threaded array",
parent_ctx->threads->slot_count);
for (int slot = 1; slot < parent_ctx->threads->slot_count; slot++) {
if (!LogFileNewThreadedCtx(parent_ctx, log_path, parent_ctx->threads->append, slot)) {
/* TODO: clear allocated entries [1, slot) */
goto error_exit;
}
}
SCMutexInit(&parent_ctx->threads->mutex, NULL);
return true;
error_exit:
if (parent_ctx->threads->lf_slots) {
SCFree(parent_ctx->threads->lf_slots);
}
if (parent_ctx->threads->append) {
SCFree(parent_ctx->threads->append);
}
@ -600,7 +588,7 @@ SCConfLogOpenGeneric(ConfNode *conf,
if (log_ctx->fp == NULL)
return -1; // Error already logged by Open...Fp routine
} else {
if (!SCLogOpenThreadedFile(log_path, append, log_ctx, 1)) {
if (!SCLogOpenThreadedFile(log_path, append, log_ctx)) {
return -1;
}
}
@ -683,37 +671,34 @@ LogFileCtx *LogFileNewCtx(void)
return lf_ctx;
}
/** \brief LogFileThread2Slot() Return a file slot
* \retval int file slot for caller
/** \brief LogFileThread2Slot() Return a file entry
* \retval ThreadLogFileHashEntry * file entry for caller
*
* This function returns the file slot for the calling thread.
* This function returns the file entry for the calling thread.
* Each thread -- identified by its operating system thread-id -- has its
* own slot that includes a file pointer.
* own file entry that includes a file pointer.
*/
static int LogFileThread2Slot(LogThreadedFileCtx *parent)
static ThreadLogFileHashEntry *LogFileThread2Slot(LogThreadedFileCtx *parent)
{
ThreadSlotHashEntry thread_hash_entry;
ThreadLogFileHashEntry thread_hash_entry;
/* Check hash table for thread id*/
thread_hash_entry.thread_id = SCGetThreadIdLong();
ThreadSlotHashEntry *ent =
ThreadLogFileHashEntry *ent =
HashTableLookup(parent->ht, &thread_hash_entry, sizeof(thread_hash_entry));
if (ent) {
return ent->slot;
}
ent = SCCalloc(1, sizeof(*ent));
if (!ent) {
FatalError("Unable to allocate thread/slot entry");
}
ent->thread_id = thread_hash_entry.thread_id;
ent->slot = ++parent->last_slot;
SCLogDebug("Trying to add thread %ld to slot %d", ent->thread_id, ent->slot);
if (0 != HashTableAdd(parent->ht, ent, 0)) {
FatalError("Unable to add thread/slot mapping");
ent = SCCalloc(1, sizeof(*ent));
if (!ent) {
FatalError("Unable to allocate thread/entry entry");
}
ent->thread_id = thread_hash_entry.thread_id;
SCLogDebug("Trying to add thread %ld to entry %d", ent->thread_id, ent->slot_number);
if (0 != HashTableAdd(parent->ht, ent, 0)) {
FatalError("Unable to add thread/entry mapping");
}
}
return ent->slot;
return ent;
}
/** \brief LogFileEnsureExists() Ensure a log file context for the thread exists
@ -727,58 +712,35 @@ LogFileCtx *LogFileEnsureExists(LogFileCtx *parent_ctx)
return parent_ctx;
SCMutexLock(&parent_ctx->threads->mutex);
/* Find this thread's slot */
int slot = LogFileThread2Slot(parent_ctx->threads);
/* Find this thread's entry */
ThreadLogFileHashEntry *entry = LogFileThread2Slot(parent_ctx->threads);
SCLogDebug("Adding reference for thread %ld [slot %d] to file %s [ctx %p]", SCGetThreadIdLong(),
slot, parent_ctx->filename, parent_ctx);
/* Add slots if necessary */
if (slot >= parent_ctx->threads->slot_count) {
/* ensure there's a slot for the caller */
int new_size = MAX(parent_ctx->threads->slot_count << 1, slot + 1);
SCLogDebug("Increasing slot count; current %d, trying %d", parent_ctx->threads->slot_count,
new_size);
LogFileCtx **new_array =
SCRealloc(parent_ctx->threads->lf_slots, new_size * sizeof(LogFileCtx *));
if (new_array == NULL) {
/* Try one more time */
SCLogDebug("Unable to increase file context array size to %d; trying %d", new_size,
slot + 1);
new_size = slot + 1;
new_array = SCRealloc(parent_ctx->threads->lf_slots, new_size * sizeof(LogFileCtx *));
}
if (new_array == NULL) {
SCMutexUnlock(&parent_ctx->threads->mutex);
SCLogError("Unable to increase file context array size to %d", new_size);
return NULL;
}
parent_ctx->threads->lf_slots = new_array;
/* initialize newly added slots */
for (int i = parent_ctx->threads->slot_count; i < new_size; i++) {
parent_ctx->threads->lf_slots[i] = NULL;
}
parent_ctx->threads->slot_count = new_size;
}
entry->slot_number, parent_ctx->filename, parent_ctx);
bool new = entry->isopen;
/* has it been opened yet? */
if (!parent_ctx->threads->lf_slots[slot]) {
SCLogDebug("Opening new file for thread/slot %d to file %s [ctx %p]", slot,
if (!entry->isopen) {
SCLogDebug("Opening new file for thread/slot %d to file %s [ctx %p]", entry->slot_number,
parent_ctx->filename, parent_ctx);
if (!LogFileNewThreadedCtx(
parent_ctx, parent_ctx->filename, parent_ctx->threads->append, slot))
BUG_ON(parent_ctx->threads->lf_slots[slot] != NULL);
if (LogFileNewThreadedCtx(
parent_ctx, parent_ctx->filename, parent_ctx->threads->append, entry)) {
entry->isopen = true;
} else {
SCLogError(
"Unable to open slot %d for file %s", entry->slot_number, parent_ctx->filename);
(void)HashTableRemove(parent_ctx->threads->ht, entry, 0);
}
}
SCMutexUnlock(&parent_ctx->threads->mutex);
if (sc_log_global_log_level >= SC_LOG_DEBUG) {
if (parent_ctx->threads->lf_slots[slot])
SCLogDebug("Existing file for thread/slot %d reference to file %s [ctx %p]", slot,
if (new) {
SCLogDebug("Existing file for thread/entry %p reference to file %s [ctx %p]", entry,
parent_ctx->filename, parent_ctx);
}
}
return parent_ctx->threads->lf_slots[slot];
return entry->ctx;
}
/** \brief LogFileThreadedName() Create file name for threaded EVE storage
@ -835,14 +797,14 @@ static bool LogFileThreadedName(
* \param parent_ctx
* \param log_path
* \param append
* \param slot
* \param entry
*/
static bool LogFileNewThreadedCtx(
LogFileCtx *parent_ctx, const char *log_path, const char *append, int slot)
static bool LogFileNewThreadedCtx(LogFileCtx *parent_ctx, const char *log_path, const char *append,
ThreadLogFileHashEntry *entry)
{
LogFileCtx *thread = SCCalloc(1, sizeof(LogFileCtx));
if (!thread) {
SCLogError("Unable to allocate thread file context slot %d", slot);
SCLogError("Unable to allocate thread file context entry %p", entry);
return false;
}
@ -860,7 +822,7 @@ static bool LogFileNewThreadedCtx(
}
thread->filename = SCStrdup(fname);
if (!thread->filename) {
SCLogError("Unable to duplicate filename for context slot %d", slot);
SCLogError("Unable to duplicate filename for context entry %p", entry);
goto error;
}
thread->is_regular = true;
@ -868,14 +830,15 @@ static bool LogFileNewThreadedCtx(
thread->Close = SCLogFileCloseNoLock;
OutputRegisterFileRotationFlag(&thread->rotation_flag);
} else if (parent_ctx->type == LOGFILE_TYPE_PLUGIN) {
entry->slot_number = SC_ATOMIC_ADD(eve_file_id, 1);
thread->plugin.plugin->ThreadInit(
thread->plugin.init_data, slot, &thread->plugin.thread_data);
thread->plugin.init_data, entry->slot_number, &thread->plugin.thread_data);
}
thread->threaded = false;
thread->parent = parent_ctx;
thread->slot = slot;
thread->entry = entry;
entry->ctx = thread;
parent_ctx->threads->lf_slots[slot] = thread;
return true;
error:
@ -885,10 +848,10 @@ error:
thread->Close(thread);
}
}
if (thread) {
SCFree(thread);
}
parent_ctx->threads->lf_slots[slot] = NULL;
return false;
}
@ -902,26 +865,13 @@ int LogFileFreeCtx(LogFileCtx *lf_ctx)
SCReturnInt(0);
}
if (lf_ctx->type == LOGFILE_TYPE_PLUGIN) {
lf_ctx->plugin.plugin->Deinit(lf_ctx->plugin.init_data);
}
if (lf_ctx->threaded) {
BUG_ON(lf_ctx->threads == NULL);
SCMutexDestroy(&lf_ctx->threads->mutex);
for(int i = 0; i < lf_ctx->threads->slot_count; i++) {
if (!lf_ctx->threads->lf_slots[i]) {
continue;
}
LogFileCtx *this_ctx = lf_ctx->threads->lf_slots[i];
if (lf_ctx->type != LOGFILE_TYPE_PLUGIN) {
OutputUnregisterFileRotationFlag(&this_ctx->rotation_flag);
this_ctx->Close(this_ctx);
} else {
lf_ctx->plugin.plugin->ThreadDeinit(
this_ctx->plugin.init_data, this_ctx->plugin.thread_data);
}
SCFree(lf_ctx->threads->lf_slots[i]->filename);
SCFree(lf_ctx->threads->lf_slots[i]);
}
SCFree(lf_ctx->threads->lf_slots);
if (lf_ctx->threads->append)
SCFree(lf_ctx->threads->append);
if (lf_ctx->threads->ht) {
@ -933,19 +883,10 @@ int LogFileFreeCtx(LogFileCtx *lf_ctx)
if (lf_ctx->fp != NULL) {
lf_ctx->Close(lf_ctx);
}
if (lf_ctx->parent) {
SCMutexLock(&lf_ctx->parent->threads->mutex);
lf_ctx->parent->threads->lf_slots[lf_ctx->slot] = NULL;
SCMutexUnlock(&lf_ctx->parent->threads->mutex);
}
}
SCMutexDestroy(&lf_ctx->fp_mutex);
}
if (lf_ctx->type == LOGFILE_TYPE_PLUGIN) {
lf_ctx->plugin.plugin->Deinit(lf_ctx->plugin.init_data);
}
if (lf_ctx->prefix != NULL) {
SCFree(lf_ctx->prefix);
lf_ctx->prefix_len = 0;

@ -48,17 +48,16 @@ typedef struct SyslogSetup_ {
int alert_syslog_level;
} SyslogSetup;
typedef struct ThreadSlotHashEntry_ {
typedef struct ThreadLogFileHashEntry {
uint64_t thread_id;
int slot; /* table slot */
} ThreadSlotHashEntry;
int slot_number; /* slot identifier -- for plugins */
bool isopen;
struct LogFileCtx_ *ctx;
} ThreadLogFileHashEntry;
struct LogFileCtx_;
typedef struct LogThreadedFileCtx_ {
SCMutex mutex;
int slot_count; /* Allocated slot count */
struct LogFileCtx_ **lf_slots; /* Slots */
int last_slot; /* Last slot allocated */
HashTable *ht;
char *append;
} LogThreadedFileCtx;
@ -98,7 +97,7 @@ typedef struct LogFileCtx_ {
/** When threaded, track of the parent and thread id */
bool threaded;
struct LogFileCtx_ *parent;
int slot;
ThreadLogFileHashEntry *entry;
/** the type of file */
enum LogFileType type;
@ -178,7 +177,6 @@ int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer);
LogFileCtx *LogFileEnsureExists(LogFileCtx *lf_ctx);
int SCConfLogOpenGeneric(ConfNode *conf, LogFileCtx *, const char *, int);
int SCConfLogReopen(LogFileCtx *);
bool SCLogOpenThreadedFile(
const char *log_path, const char *append, LogFileCtx *parent_ctx, int slot_count);
bool SCLogOpenThreadedFile(const char *log_path, const char *append, LogFileCtx *parent_ctx);
#endif /* __UTIL_LOGOPENFILE_H__ */

Loading…
Cancel
Save