counters: merge counters from threads for output

Merge counters so the table contains combined values from counters
from each thread.

Use global counter id's, track them in a hash.

Rename SCPCAElem members

Fix and improve average counters
pull/1508/head
Victor Julien 10 years ago
parent 7da657dc3d
commit 711cd7b59b

@ -76,8 +76,12 @@ enum {
* have to be clubbed based on TM, before being sent out
*/
typedef struct SCPerfClubTMInst_ {
char *name;
char *tm_name;
SCPerfPublicContext *ctx;
SCPerfPublicContext **head;
uint32_t size;
@ -90,6 +94,7 @@ typedef struct SCPerfClubTMInst_ {
typedef struct SCPerfOPIfaceContext_ {
SCPerfClubTMInst *pctmi;
SCMutex pctmi_lock;
HashTable *counters_id_hash;
} SCPerfOPIfaceContext;
static void *stats_thread_data = NULL;
@ -106,6 +111,8 @@ static int SCPerfOutputCounterFileIface(ThreadVars *tv);
* loggers. Initialized at first use. */
static StatsTable stats_table = { NULL, 0, 0, {0 , 0}};
static uint16_t counters_global_id = 0;
/**
* \brief The output interface dispatcher for the counter api
*/
@ -131,8 +138,8 @@ void SCPerfCounterAddUI64(ThreadVars *tv, uint16_t id, uint64_t x)
#ifdef DEBUG
BUG_ON ((id < 1) || (id > pca->size));
#endif
pca->head[id].ui64_cnt += x;
pca->head[id].syncs++;
pca->head[id].value += x;
pca->head[id].updates++;
return;
}
@ -152,8 +159,8 @@ void SCPerfCounterIncr(ThreadVars *tv, uint16_t id)
#ifdef DEBUG
BUG_ON ((id < 1) || (id > pca->size));
#endif
pca->head[id].ui64_cnt++;
pca->head[id].syncs++;
pca->head[id].value++;
pca->head[id].updates++;
return;
}
@ -176,13 +183,13 @@ void SCPerfCounterSetUI64(ThreadVars *tv, uint16_t id, uint64_t x)
#endif
if ((pca->head[id].pc->type == SC_PERF_TYPE_Q_MAXIMUM) &&
(x > pca->head[id].ui64_cnt)) {
pca->head[id].ui64_cnt = x;
(x > pca->head[id].value)) {
pca->head[id].value = x;
} else if (pca->head[id].pc->type == SC_PERF_TYPE_Q_NORMAL) {
pca->head[id].ui64_cnt = x;
pca->head[id].value = x;
}
pca->head[id].syncs++;
pca->head[id].updates++;
return;
}
@ -594,20 +601,10 @@ static uint16_t SCPerfRegisterQualifiedCounter(char *cname, char *tm_name,
*/
static void SCPerfCopyCounterValue(SCPCAElem *pcae)
{
SCPerfCounter *pc = NULL;
uint64_t ui64_temp = 0;
pc = pcae->pc;
ui64_temp = pcae->ui64_cnt;
if (pc->type == SC_PERF_TYPE_Q_AVERAGE) {
if (pcae->syncs != 0)
ui64_temp /= pcae->syncs;
pc->value = ui64_temp;
} else {
pc->value = ui64_temp;
}
SCPerfCounter *pc = pcae->pc;
pc->value = pcae->value;
pc->updates = pcae->updates;
return;
}
@ -629,7 +626,6 @@ static uint64_t SCPerfOutputCalculateCounterValue(SCPerfCounter *pc)
return pc->value;
}
/**
* \brief The file output interface for the Perf Counter api
*/
@ -637,61 +633,10 @@ static int SCPerfOutputCounterFileIface(ThreadVars *tv)
{
const SCPerfClubTMInst *pctmi = NULL;
const SCPerfCounter *pc = NULL;
SCPerfCounter **pc_heads = NULL;
uint64_t ui64_temp = 0;
uint64_t ui64_result = 0;
uint32_t u = 0;
int flag = 0;
void *td = stats_thread_data;
if (stats_table.nstats == 0) {
uint32_t nstats = 0;
pctmi = sc_perf_op_ctx->pctmi;
while (pctmi != NULL) {
if (pctmi->size == 0) {
pctmi = pctmi->next;
continue;
}
if ((pc_heads = SCMalloc(pctmi->size * sizeof(SCPerfCounter *))) == NULL)
return 0;
memset(pc_heads, 0, pctmi->size * sizeof(SCPerfCounter *));
for (u = 0; u < pctmi->size; u++) {
pc_heads[u] = pctmi->head[u]->head;
SCMutexLock(&pctmi->head[u]->m);
}
flag = 1;
while (flag) {
if (pc_heads[0] == NULL)
break;
for (u = 0; u < pctmi->size; u++) {
if (pc_heads[u] != NULL)
pc_heads[u] = pc_heads[u]->next;
if (pc_heads[u] == NULL)
flag = 0;
}
/* count */
nstats++;
}
for (u = 0; u < pctmi->size; u++)
SCMutexUnlock(&pctmi->head[u]->m);
pctmi = pctmi->next;
SCFree(pc_heads);
}
if (nstats == 0) {
SCLogError(SC_ERR_PERF_STATS_NOT_INIT, "no counters registered");
return -1;
}
uint32_t nstats = counters_global_id;
stats_table.nstats = nstats;
stats_table.stats = SCCalloc(stats_table.nstats, sizeof(StatsRecord));
@ -703,58 +648,72 @@ static int SCPerfOutputCounterFileIface(ThreadVars *tv)
stats_table.start_time = sc_start_time;
}
StatsRecord *table = stats_table.stats;
int table_i = 0;
/** temporary local table to merge the per thread counters,
* especially needed for the average counters */
struct CountersMergeTable {
int type;
uint64_t value;
uint64_t updates;
} merge_table[counters_global_id];
memset(&merge_table, 0x00,
counters_global_id * sizeof(struct CountersMergeTable));
StatsRecord *table = stats_table.stats;
pctmi = sc_perf_op_ctx->pctmi;
SCLogDebug("pctmi %p", pctmi);
while (pctmi != NULL) {
if (pctmi->size == 0) {
pctmi = pctmi->next;
continue;
}
if ((pc_heads = SCMalloc(pctmi->size * sizeof(SCPerfCounter *))) == NULL)
return 0;
memset(pc_heads, 0, pctmi->size * sizeof(SCPerfCounter *));
for (u = 0; u < pctmi->size; u++) {
pc_heads[u] = pctmi->head[u]->head;
SCMutexLock(&pctmi->head[u]->m);
}
flag = 1;
while (flag) {
ui64_result = 0;
if (pc_heads[0] == NULL)
break;
/* keep ptr to first pc to we can use it to print the cname */
pc = pc_heads[0];
for (u = 0; u < pctmi->size; u++) {
ui64_temp = SCPerfOutputCalculateCounterValue(pc_heads[u]);
ui64_result += ui64_temp;
SCLogDebug("Thread %s (%s) ctx %p", pctmi->name,
pctmi->tm_name ? pctmi->tm_name : "none", pctmi->ctx);
if (pc_heads[u] != NULL)
pc_heads[u] = pc_heads[u]->next;
if (pc_heads[u] == NULL)
flag = 0;
SCMutexLock(&pctmi->ctx->m);
pc = pctmi->ctx->head;
while (pc != NULL) {
SCLogDebug("Counter %s (%u:%u) value %"PRIu64,
pc->cname, pc->id, pc->gid, pc->value);
merge_table[pc->gid].type = pc->type;
switch (pc->type) {
case SC_PERF_TYPE_Q_MAXIMUM:
if (pc->value > merge_table[pc->gid].value)
merge_table[pc->gid].value = pc->value;
break;
default:
merge_table[pc->gid].value += pc->value;
break;
}
merge_table[pc->gid].updates += pc->updates;
/* store in the table */
table[table_i].name = pc->cname;
table[table_i].tm_name = pctmi->tm_name;
table[table_i].pvalue = table[table_i].value;
table[table_i].value = ui64_result;
table_i++;
}
for (u = 0; u < pctmi->size; u++)
SCMutexUnlock(&pctmi->head[u]->m);
table[pc->gid].name = pc->cname;
table[pc->gid].tm_name = pctmi->tm_name;
pc = pc->next;
}
SCMutexUnlock(&pctmi->ctx->m);
pctmi = pctmi->next;
SCFree(pc_heads);
}
uint16_t x;
for (x = 0; x < counters_global_id; x++) {
/* xfer previous value to pvalue and reset value */
table[x].pvalue = table[x].value;
table[x].value = 0;
struct CountersMergeTable *m = &merge_table[x];
switch (m->type) {
case SC_PERF_TYPE_Q_MAXIMUM:
if (m->value > table[x].value)
table[x].value = m->value;
break;
case SC_PERF_TYPE_Q_AVERAGE:
if (m->value > 0 && m->updates > 0) {
table[x].value = (uint64_t)(m->value / m->updates);
}
break;
default:
table[x].value += m->value;
break;
}
}
/* invoke logger(s) */
@ -1018,6 +977,57 @@ static uint16_t SCPerfRegisterCounter(char *cname, char *tm_name, int type, char
return id;
}
typedef struct CountersIdType_ {
uint16_t id;
const char *string;
} CountersIdType;
uint32_t CountersIdHashFunc(HashTable *ht, void *data, uint16_t datalen)
{
CountersIdType *t = (CountersIdType *)data;
uint32_t hash = 0;
int i = 0;
int len = strlen(t->string);
for (i = 0; i < len; i++)
hash += tolower((unsigned char)t->string[i]);
hash = hash % ht->array_size;
return hash;
}
char CountersIdHashCompareFunc(void *data1, uint16_t datalen1,
void *data2, uint16_t datalen2)
{
CountersIdType *t1 = (CountersIdType *)data1;
CountersIdType *t2 = (CountersIdType *)data2;
int len1 = 0;
int len2 = 0;
if (t1 == NULL || t2 == NULL)
return 0;
if (t1->string == NULL || t2->string == NULL)
return 0;
len1 = strlen(t1->string);
len2 = strlen(t2->string);
if (len1 == len2 && memcmp(t1->string, t2->string, len1) == 0) {
return 1;
}
return 0;
}
void CountersIdHashFreeFunc(void *data)
{
SCFree(data);
}
/** \internal
* \brief Adds a TM to the clubbed TM table. Multiple instances of the same TM
* are stacked together in a PCTMI container.
@ -1027,108 +1037,73 @@ static uint16_t SCPerfRegisterCounter(char *cname, char *tm_name, int type, char
*
* \retval 1 on success, 0 on failure
*/
static int SCPerfAddToClubbedTMTable(char *tm_name, SCPerfPublicContext *pctx)
static int SCPerfAddToClubbedTMTable(ThreadVars *tv, SCPerfPublicContext *pctx)
{
void *ptmp;
if (sc_perf_op_ctx == NULL) {
SCLogDebug("Counter module has been disabled");
return 0;
}
SCPerfClubTMInst *pctmi = NULL;
SCPerfClubTMInst *prev = NULL;
SCPerfClubTMInst *temp = NULL;
SCPerfPublicContext **hpctx = NULL;
uint32_t u = 0;
if (tm_name == NULL || pctx == NULL) {
if (tv == NULL || pctx == NULL) {
SCLogDebug("supplied argument(s) to SCPerfAddToClubbedTMTable NULL");
return 0;
}
SCMutexLock(&sc_perf_op_ctx->pctmi_lock);
pctmi = sc_perf_op_ctx->pctmi;
SCLogDebug("pctmi %p", pctmi);
prev = pctmi;
while (pctmi != NULL) {
prev = pctmi;
if (strcmp(tm_name, pctmi->tm_name) != 0) {
pctmi = pctmi->next;
continue;
}
break;
if (sc_perf_op_ctx->counters_id_hash == NULL) {
sc_perf_op_ctx->counters_id_hash = HashTableInit(256, CountersIdHashFunc,
CountersIdHashCompareFunc,
CountersIdHashFreeFunc);
BUG_ON(sc_perf_op_ctx->counters_id_hash == NULL);
}
/* get me the bugger who wrote this junk of a code :P */
if (pctmi == NULL) {
if ( (temp = SCMalloc(sizeof(SCPerfClubTMInst))) == NULL) {
SCMutexUnlock(&sc_perf_op_ctx->pctmi_lock);
return 0;
}
memset(temp, 0, sizeof(SCPerfClubTMInst));
temp->size = 1;
temp->head = SCMalloc(sizeof(SCPerfPublicContext **));
if (temp->head == NULL) {
SCFree(temp);
SCMutexUnlock(&sc_perf_op_ctx->pctmi_lock);
return 0;
}
temp->head[0] = pctx;
temp->tm_name = SCStrdup(tm_name);
if (unlikely(temp->tm_name == NULL)) {
SCFree(temp->head);
SCFree(temp);
SCMutexUnlock(&sc_perf_op_ctx->pctmi_lock);
return 0;
SCPerfCounter *pc = pctx->head;
while (pc != NULL) {
CountersIdType t = { 0, pc->cname }, *id = NULL;
id = HashTableLookup(sc_perf_op_ctx->counters_id_hash, &t, sizeof(t));
if (id == NULL) {
id = SCCalloc(1, sizeof(*id));
BUG_ON(id == NULL);
id->id = counters_global_id++;
id->string = pc->cname;
BUG_ON(HashTableAdd(sc_perf_op_ctx->counters_id_hash, id, sizeof(*id)) < 0);
}
if (prev == NULL)
sc_perf_op_ctx->pctmi = temp;
else
prev->next = temp;
SCMutexUnlock(&sc_perf_op_ctx->pctmi_lock);
return 1;
pc->gid = id->id;
pc = pc->next;
}
/* see if the pctx is already part of this pctmi */
hpctx = pctmi->head;
for (u = 0; u < pctmi->size; u++) {
if (hpctx[u] != pctx)
continue;
if ( (temp = SCMalloc(sizeof(SCPerfClubTMInst))) == NULL) {
SCMutexUnlock(&sc_perf_op_ctx->pctmi_lock);
return 1;
return 0;
}
memset(temp, 0, sizeof(SCPerfClubTMInst));
ptmp = SCRealloc(pctmi->head,
(pctmi->size + 1) * sizeof(SCPerfPublicContext **));
if (ptmp == NULL) {
SCFree(pctmi->head);
pctmi->head = NULL;
temp->ctx = pctx;
temp->name = SCStrdup(tv->name);
if (unlikely(temp->name == NULL)) {
SCFree(temp);
SCMutexUnlock(&sc_perf_op_ctx->pctmi_lock);
return 0;
}
pctmi->head = ptmp;
hpctx = pctmi->head;
hpctx[pctmi->size] = pctx;
for (u = pctmi->size - 1; u > 0; u--) {
if (pctx->curr_id <= hpctx[u]->curr_id) {
hpctx[u + 1] = hpctx[u];
hpctx[u] = pctx;
continue;
if (tv->thread_group_name != NULL) {
temp->tm_name = SCStrdup(tv->thread_group_name);
if (unlikely(temp->tm_name == NULL)) {
SCFree(temp->name);
SCFree(temp);
SCMutexUnlock(&sc_perf_op_ctx->pctmi_lock);
return 0;
}
break;
}
pctmi->size++;
SCMutexUnlock(&sc_perf_op_ctx->pctmi_lock);
temp->next = sc_perf_op_ctx->pctmi;
sc_perf_op_ctx->pctmi = temp;
SCLogInfo("sc_perf_op_ctx->pctmi %p", sc_perf_op_ctx->pctmi);
SCMutexUnlock(&sc_perf_op_ctx->pctmi_lock);
return 1;
}
@ -1207,9 +1182,7 @@ int SCPerfSetupPrivate(ThreadVars *tv)
{
SCPerfGetAllCountersArray(&(tv)->perf_public_ctx, &(tv)->perf_private_ctx);
SCPerfAddToClubbedTMTable((tv->thread_group_name != NULL) ?
tv->thread_group_name : tv->name,
&(tv)->perf_public_ctx);
SCPerfAddToClubbedTMTable(tv, &(tv)->perf_public_ctx);
return 0;
}
@ -1274,7 +1247,7 @@ uint64_t SCPerfGetLocalCounterValue(ThreadVars *tv, uint16_t id)
#ifdef DEBUG
BUG_ON ((id < 1) || (id > pca->size));
#endif
return pca->head[id].ui64_cnt;
return pca->head[id].value;
}
/**
@ -1466,7 +1439,7 @@ static int SCPerfTestUpdateCounter08()
SCPerfCounterIncr(&tv, id);
SCPerfCounterAddUI64(&tv, id, 100);
result = pca->head[id].ui64_cnt;
result = pca->head[id].value;
SCPerfReleasePerfCounterS(tv.perf_public_ctx.head);
SCPerfReleasePCA(pca);
@ -1500,7 +1473,7 @@ static int SCPerfTestUpdateCounter09()
SCPerfCounterIncr(&tv, id2);
SCPerfCounterAddUI64(&tv, id2, 100);
result = (pca->head[id1].ui64_cnt == 0) && (pca->head[id2].ui64_cnt == 101);
result = (pca->head[id1].value == 0) && (pca->head[id2].value == 101);
SCPerfReleasePerfCounterS(tv.perf_public_ctx.head);
SCPerfReleasePCA(pca);

@ -45,7 +45,12 @@ typedef struct SCPerfCounter_ {
/* local id for this counter in this tm */
uint16_t id;
uint64_t value;
/* global id */
uint16_t gid;
/* counter value(s): copies from the 'private' counter */
uint64_t value; /**< sum of updates/increments, or 'set' value */
uint64_t updates; /**< number of updates (for avg) */
/* name of the counter */
char *cname;
@ -86,10 +91,11 @@ typedef struct SCPCAElem_ {
/* counter id of the above counter(pc) */
uint16_t id;
uint64_t ui64_cnt;
/* total value of the adds/increments, or exact value in case of 'set' */
uint64_t value;
/* no of times the local counter has been updated */
uint64_t syncs;
uint64_t updates;
} SCPCAElem;
/**

Loading…
Cancel
Save