Thread Registration API for ID's

Create thread registration and unregistration API for assigning unique
thread id's.

Threadvars is static even if a thread restarts, so we can do the
registration before the threads start.

A thread is unregistered when the ThreadVars are freed.
pull/1267/head
Victor Julien 11 years ago
parent e586644c25
commit 489ee20560

@ -257,12 +257,11 @@ void *TmThreadsSlotPktAcqLoop(void *td)
if (r == TM_ECODE_DONE) {
EngineDone();
TmThreadsSetFlag(tv, THV_CLOSED | THV_INIT_DONE | THV_RUNNING_DONE);
pthread_exit((void *) -1);
goto error;
} else {
EngineKill();
TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
pthread_exit((void *) -1);
return NULL;
goto error;
}
}
(void)SC_ATOMIC_SET(slot->slot_data, slot_data);
@ -312,8 +311,7 @@ void *TmThreadsSlotPktAcqLoop(void *td)
r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
if (r != TM_ECODE_OK) {
TmThreadsSetFlag(tv, THV_CLOSED);
pthread_exit((void *) -1);
return NULL;
goto error;
}
}
}
@ -322,6 +320,10 @@ void *TmThreadsSlotPktAcqLoop(void *td)
TmThreadsSetFlag(tv, THV_CLOSED);
pthread_exit((void *) 0);
return NULL;
error:
pthread_exit((void *) -1);
return NULL;
}
@ -368,8 +370,7 @@ void *TmThreadsSlotVar(void *td)
EngineKill();
TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
pthread_exit((void *) -1);
return NULL;
goto error;
}
(void)SC_ATOMIC_SET(s->slot_data, slot_data);
}
@ -461,8 +462,7 @@ void *TmThreadsSlotVar(void *td)
r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
if (r != TM_ECODE_OK) {
TmThreadsSetFlag(tv, THV_CLOSED);
pthread_exit((void *) -1);
return NULL;
goto error;
}
}
}
@ -471,6 +471,10 @@ void *TmThreadsSlotVar(void *td)
TmThreadsSetFlag(tv, THV_CLOSED);
pthread_exit((void *) 0);
return NULL;
error:
pthread_exit((void *) -1);
return NULL;
}
static void *TmThreadsManagement(void *td)
@ -1156,8 +1160,11 @@ ThreadVars *TmThreadCreatePacketHandler(char *name, char *inq_name,
tv = TmThreadCreate(name, inq_name, inqh_name, outq_name, outqh_name,
slots, NULL, 0);
if (tv != NULL)
if (tv != NULL) {
tv->type = TVT_PPT;
tv->id = TmThreadsRegisterThread(tv, tv->type);
}
return tv;
}
@ -1183,6 +1190,7 @@ ThreadVars *TmThreadCreateMgmtThread(char *name, void *(fn_p)(void *),
if (tv != NULL) {
tv->type = TVT_MGMT;
tv->id = TmThreadsRegisterThread(tv, tv->type);
TmThreadSetCPU(tv, MANAGEMENT_CPU_SET);
}
@ -1210,6 +1218,7 @@ ThreadVars *TmThreadCreateMgmtThreadByName(char *name, char *module,
if (tv != NULL) {
tv->type = TVT_MGMT;
tv->id = TmThreadsRegisterThread(tv, tv->type);
TmThreadSetCPU(tv, MANAGEMENT_CPU_SET);
TmModule *m = TmModuleGetByName(module);
@ -1242,6 +1251,7 @@ ThreadVars *TmThreadCreateCmdThread(char *name, void *(fn_p)(void *),
if (tv != NULL) {
tv->type = TVT_CMD;
tv->id = TmThreadsRegisterThread(tv, tv->type);
TmThreadSetCPU(tv, MANAGEMENT_CPU_SET);
}
@ -1562,6 +1572,8 @@ void TmThreadFree(ThreadVars *tv)
s = s->slot_next;
SCFree(ps);
}
TmThreadsUnregisterThread(tv->id);
SCFree(tv);
}
@ -1761,6 +1773,8 @@ void TmThreadPauseThreads()
ThreadVars *tv = NULL;
int i = 0;
TmThreadsListThreads();
for (i = 0; i < TVT_MAX; i++) {
tv = tv_root[i];
while (tv != NULL) {
@ -1920,3 +1934,111 @@ ThreadVars *TmThreadsGetCallingThread(void)
return NULL;
}
typedef struct Thread_ {
ThreadVars *tv; /**< threadvars structure */
const char *name;
int type;
int in_use; /**< bool to indicate this is in use */
} Thread;
typedef struct Threads_ {
Thread *threads;
size_t threads_size;
int threads_cnt;
} Threads;
static Threads thread_store = { NULL, 0, 0 };
static SCMutex thread_store_lock = PTHREAD_MUTEX_INITIALIZER;
void TmThreadsListThreads(void)
{
Thread *t;
size_t s;
SCMutexLock(&thread_store_lock);
for (s = 0; s < thread_store.threads_size; s++) {
t = &thread_store.threads[s];
if (t == NULL || t->in_use == 0)
continue;
SCLogInfo("Thread %"PRIuMAX", %s type %d, tv %p", (uintmax_t)s, t->name, t->type, t->tv);
}
SCMutexUnlock(&thread_store_lock);
}
#define STEP 32
/**
* \retval id thread id, or 0 if not found
*/
int TmThreadsRegisterThread(ThreadVars *tv, const int type)
{
SCMutexLock(&thread_store_lock);
if (thread_store.threads == NULL) {
thread_store.threads = SCCalloc(STEP, sizeof(Thread));
BUG_ON(thread_store.threads == NULL);
thread_store.threads_size = STEP;
}
size_t s;
for (s = 0; s < thread_store.threads_size; s++) {
if (thread_store.threads[s].in_use == 0) {
Thread *t = &thread_store.threads[s];
t->name = tv->name;
t->type = type;
t->tv = tv;
t->in_use = 1;
SCMutexUnlock(&thread_store_lock);
return (int)s;
}
}
/* if we get here the array is completely filled */
void *newmem = SCRealloc(thread_store.threads, ((thread_store.threads_size + STEP) * sizeof(Thread)));
BUG_ON(newmem == NULL);
thread_store.threads = newmem;
memset((uint8_t *)thread_store.threads + (thread_store.threads_size * sizeof(Thread)), 0x00, STEP);
Thread *t = &thread_store.threads[thread_store.threads_size];
t->name = tv->name;
t->type = type;
t->tv = tv;
t->in_use = 1;
s = thread_store.threads_size;
thread_store.threads_size += STEP;
SCMutexUnlock(&thread_store_lock);
return (int)s;
}
#undef STEP
void TmThreadsUnregisterThread(const int id)
{
SCMutexLock(&thread_store_lock);
if (id < 0 || id >= (int)thread_store.threads_size)
return;
/* reset thread_id, which serves as clearing the record */
thread_store.threads[id].in_use = 0;
/* check if we have at least one registered thread left */
size_t s;
for (s = 0; s < thread_store.threads_size; s++) {
Thread *t = &thread_store.threads[s];
if (t->in_use == 1) {
goto end;
}
}
/* if we get here no threads are registered */
SCFree(thread_store.threads);
thread_store.threads = NULL;
thread_store.threads_size = 0;
thread_store.threads_cnt = 0;
end:
SCMutexUnlock(&thread_store_lock);
}

@ -195,4 +195,9 @@ static inline TmEcode TmThreadsSlotProcessPkt(ThreadVars *tv, TmSlot *s, Packet
return r;
}
void TmThreadsListThreads(void);
int TmThreadsRegisterThread(ThreadVars *tv, const int type);
void TmThreadsUnregisterThread(const int id);
#endif /* __TM_THREADS_H__ */

Loading…
Cancel
Save