pcap-log: implement multi mode

This patch implements a new mode in pcap-logging: 'multi'. It stores
a pcap file per logger thread, instead of just one file globally.

This removes lock contention, so it brings a lot more performance.

The trade off is that there are now mulitple files where there would
be one before.

Files have a thread id added to their name: base_name.tid.ts, so by
we have something like: "log.pcap.20057.1254500095".
pull/1043/head
Victor Julien 12 years ago
parent 4922cd2d36
commit 923341fa05

@ -96,6 +96,7 @@ typedef struct PcapLogData_ {
uint32_t max_files; /**< maximum files to use in ring buffer mode */
uint64_t pkt_cnt; /**< total number of packets */
int prev_day; /**< last day, for finding out when */
int threads; /**< number of threads (only set in the global) */
pcap_t *pcap_dead_handle; /**< pcap_dumper_t needs a handle */
pcap_dumper_t *pcap_dumper; /**< actually writes the packets */
char *prefix; /**< filename prefix */
@ -123,6 +124,10 @@ typedef struct PcapLogThreadData_ {
PcapLogData *pcap_log;
} PcapLogThreadData;
/* global pcap data for when we're using multi mode. At exit we'll
* merge counters into this one and then report counters. */
static PcapLogData *g_pcap_data = NULL;
static int PcapLogOpenFileCtx(PcapLogData *);
static TmEcode PcapLog(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
static TmEcode PcapLogDataInit(ThreadVars *, void *, void **);
@ -387,6 +392,44 @@ static TmEcode PcapLog (ThreadVars *t, Packet *p, void *thread_data, PacketQueue
return TM_ECODE_OK;
}
static PcapLogData *PcapLogDataCopy(const PcapLogData *pl)
{
BUG_ON(pl->mode != LOGMODE_MULTI);
PcapLogData *copy = SCCalloc(1, sizeof(*copy));
if (unlikely(copy == NULL)) {
return NULL;
}
copy->h = SCCalloc(1, sizeof(*copy->h));
if (unlikely(copy->h == NULL)) {
SCFree(copy);
return NULL;
}
copy->prefix = SCStrdup(pl->prefix);
if (unlikely(copy->prefix == NULL)) {
SCFree(copy->h);
SCFree(copy);
return NULL;
}
/* settings TODO move to global cfg struct */
copy->mode = pl->mode;
copy->max_files = pl->max_files;
copy->use_ringbuffer = pl->use_ringbuffer;
copy->timestamp_format = pl->timestamp_format;
copy->use_stream_depth = pl->use_stream_depth;
copy->size_limit = pl->size_limit;
TAILQ_INIT(&copy->pcap_file_list);
SCMutexInit(&copy->plog_lock, NULL);
strlcpy(copy->dir, pl->dir, sizeof(copy->dir));
SCLogDebug("copied, returning %p", copy);
return copy;
}
static TmEcode PcapLogDataInit(ThreadVars *t, void *initdata, void **data)
{
if (initdata == NULL) {
@ -400,23 +443,32 @@ static TmEcode PcapLogDataInit(ThreadVars *t, void *initdata, void **data)
if (unlikely(td == NULL))
return TM_ECODE_FAILED;
if (pl->mode == LOGMODE_MULTI)
td->pcap_log = PcapLogDataCopy(pl);
else
td->pcap_log = pl;
BUG_ON(td->pcap_log == NULL);
SCMutexLock(&pl->plog_lock);
SCMutexLock(&td->pcap_log->plog_lock);
/** Use the Ouptut Context (file pointer and mutex) */
pl->pkt_cnt = 0;
pl->pcap_dead_handle = NULL;
pl->pcap_dumper = NULL;
pl->file_cnt = 1;
td->pcap_log->pkt_cnt = 0;
td->pcap_log->pcap_dead_handle = NULL;
td->pcap_log->pcap_dumper = NULL;
td->pcap_log->file_cnt = 1;
struct timeval ts;
memset(&ts, 0x00, sizeof(struct timeval));
TimeGet(&ts);
struct tm local_tm;
struct tm *tms = SCLocalTime(ts.tv_sec, &local_tm);
pl->prev_day = tms->tm_mday;
td->pcap_log->prev_day = tms->tm_mday;
SCMutexUnlock(&td->pcap_log->plog_lock);
/* count threads in the global structure */
SCMutexLock(&pl->plog_lock);
pl->threads++;
SCMutexUnlock(&pl->plog_lock);
*data = (void *)td;
@ -424,6 +476,32 @@ static TmEcode PcapLogDataInit(ThreadVars *t, void *initdata, void **data)
return TM_ECODE_OK;
}
static void StatsMerge(PcapLogData *dst, PcapLogData *src)
{
dst->profile_open.total += src->profile_open.total;
dst->profile_open.cnt += src->profile_open.cnt;
dst->profile_close.total += src->profile_close.total;
dst->profile_close.cnt += src->profile_close.cnt;
dst->profile_write.total += src->profile_write.total;
dst->profile_write.cnt += src->profile_write.cnt;
dst->profile_rotate.total += src->profile_rotate.total;
dst->profile_rotate.cnt += src->profile_rotate.cnt;
dst->profile_handles.total += src->profile_handles.total;
dst->profile_handles.cnt += src->profile_handles.cnt;
dst->profile_lock.total += src->profile_lock.total;
dst->profile_lock.cnt += src->profile_lock.cnt;
dst->profile_unlock.total += src->profile_unlock.total;
dst->profile_unlock.cnt += src->profile_unlock.cnt;
dst->profile_data_size += src->profile_data_size;
}
/**
* \brief Thread deinit function.
*
@ -432,7 +510,6 @@ static TmEcode PcapLogDataInit(ThreadVars *t, void *initdata, void **data)
* \retval TM_ECODE_OK on succces
* \retval TM_ECODE_FAILED on failure
*/
static TmEcode PcapLogDataDeinit(ThreadVars *t, void *thread_data)
{
PcapLogThreadData *td = (PcapLogThreadData *)thread_data;
@ -444,10 +521,19 @@ static TmEcode PcapLogDataDeinit(ThreadVars *t, void *thread_data)
}
}
if (pl->mode == LOGMODE_MULTI) {
SCMutexLock(&g_pcap_data->plog_lock);
StatsMerge(g_pcap_data, pl);
g_pcap_data->reported++;
if (g_pcap_data->threads == g_pcap_data->reported)
PcapLogProfilingDump(g_pcap_data);
SCMutexUnlock(&g_pcap_data->plog_lock);
} else {
if (pl->reported == 0) {
PcapLogProfilingDump(pl);
pl->reported = 1;
}
}
return TM_ECODE_OK;
}
@ -646,6 +732,7 @@ static OutputCtx *PcapLogInitCtx(ConfNode *conf)
}
output_ctx->data = pl;
output_ctx->DeInit = PcapLogFileDeInitCtx;
g_pcap_data = pl;
return output_ctx;
}
@ -733,7 +820,7 @@ static int PcapLogOpenFileCtx(PcapLogData *pl)
dirfull, pl->prefix, (uint32_t)ts.tv_sec, (uint32_t)ts.tv_usec);
}
} else {
} else if (pl->mode == LOGMODE_NORMAL) {
/* create the filename to use */
if (pl->timestamp_format == TS_FORMAT_SEC) {
snprintf(filename, PATH_MAX, "%s/%s.%" PRIu32, pl->dir,
@ -742,6 +829,19 @@ static int PcapLogOpenFileCtx(PcapLogData *pl)
snprintf(filename, PATH_MAX, "%s/%s.%" PRIu32 ".%" PRIu32, pl->dir,
pl->prefix, (uint32_t)ts.tv_sec, (uint32_t)ts.tv_usec);
}
} else if (pl->mode == LOGMODE_MULTI) {
long thread_id = SCGetThreadIdLong();
uint64_t tid = (uint64_t)thread_id;
/* create the filename to use */
if (pl->timestamp_format == TS_FORMAT_SEC) {
snprintf(filename, PATH_MAX, "%s/%s.%"PRIu64".%" PRIu32, pl->dir,
pl->prefix, tid, (uint32_t)ts.tv_sec);
} else {
snprintf(filename, PATH_MAX, "%s/%s.%"PRIu64".%" PRIu32 ".%" PRIu32, pl->dir,
pl->prefix, tid, (uint32_t)ts.tv_sec, (uint32_t)ts.tv_usec);
}
SCLogDebug("multi-mode: filename %s", filename);
}
if ((pf->filename = SCStrdup(pl->filename)) == NULL) {

Loading…
Cancel
Save