code cleanup in tm-threads.c

remotes/origin/master-1.1.x
Anoop Saldanha 15 years ago committed by Victor Julien
parent 4f7df1029d
commit 524af82b1a

@ -22,7 +22,7 @@
* \author Anoop Saldanha <poonaatsoc@gmail.com>
* \author Eric Leblond <eleblond@edenwall.com>
*
* Thread management functions
* Thread management functions.
*/
#include "suricata-common.h"
@ -69,36 +69,39 @@ ThreadVars *tv_root[TVT_MAX] = { NULL };
SCMutex tv_root_lock = PTHREAD_MUTEX_INITIALIZER;
/* Action On Failure(AOF). Determines how the engine should behave when a
thread encounters a failure. Defaults to restart the failed thread */
* thread encounters a failure. Defaults to restart the failed thread */
uint8_t tv_aof = THV_RESTART_THREAD;
/**
* \brief Check if a thread flag is set
* \brief Check if a thread flag is set.
*
* \retval 1 flag is set
* \retval 0 flag is not set
* \retval 1 flag is set.
* \retval 0 flag is not set.
*/
int TmThreadsCheckFlag(ThreadVars *tv, uint8_t flag) {
int TmThreadsCheckFlag(ThreadVars *tv, uint8_t flag)
{
return (SC_ATOMIC_GET(tv->flags) & flag) ? 1 : 0;
}
/**
* \brief Set a thread flag
* \brief Set a thread flag.
*/
void TmThreadsSetFlag(ThreadVars *tv, uint8_t flag) {
void TmThreadsSetFlag(ThreadVars *tv, uint8_t flag)
{
SC_ATOMIC_OR(tv->flags, flag);
}
/**
* \brief Unset a thread flag
* \brief Unset a thread flag.
*/
void TmThreadsUnsetFlag(ThreadVars *tv, uint8_t flag) {
void TmThreadsUnsetFlag(ThreadVars *tv, uint8_t flag)
{
SC_ATOMIC_AND(tv->flags, ~flag);
}
/* 1 slot functions */
void *TmThreadsSlot1NoIn(void *td) {
void *TmThreadsSlot1NoIn(void *td)
{
ThreadVars *tv = (ThreadVars *)td;
TmSlot *s = (TmSlot *)tv->tm_slots;
Packet *p = NULL;
@ -128,7 +131,7 @@ void *TmThreadsSlot1NoIn(void *td) {
TmThreadsSetFlag(tv, THV_INIT_DONE);
while(run) {
while (run) {
TmThreadTestThreadUnPaused(tv);
r = s->SlotFunc(tv, p, s->slot_data, &s->slot_pre_pq, &s->slot_post_pq);
@ -145,9 +148,8 @@ void *TmThreadsSlot1NoIn(void *td) {
/* handle pre queue */
while (s->slot_pre_pq.top != NULL) {
Packet *extra_p = PacketDequeue(&s->slot_pre_pq);
if (extra_p != NULL) {
if (extra_p != NULL)
tv->tmqh_out(tv, extra_p);
}
}
tv->tmqh_out(tv, p);
@ -157,16 +159,15 @@ void *TmThreadsSlot1NoIn(void *td) {
/* handle post queue */
while (s->slot_post_pq.top != NULL) {
Packet *extra_p = PacketDequeue(&s->slot_post_pq);
if (extra_p != NULL) {
if (extra_p != NULL)
tv->tmqh_out(tv, extra_p);
}
}
if (TmThreadsCheckFlag(tv, THV_KILL)) {
SCPerfUpdateCounterArray(tv->sc_perf_pca, &tv->sc_perf_pctx, 0);
run = 0;
}
}
} /* while (run) */
if (s->SlotThreadExitPrintStats != NULL) {
s->SlotThreadExitPrintStats(tv, s->slot_data);
@ -184,7 +185,8 @@ void *TmThreadsSlot1NoIn(void *td) {
pthread_exit((void *) 0);
}
void *TmThreadsSlot1NoOut(void *td) {
void *TmThreadsSlot1NoOut(void *td)
{
ThreadVars *tv = (ThreadVars *)td;
TmSlot *s = (TmSlot *)tv->tm_slots;
Packet *p = NULL;
@ -214,12 +216,13 @@ void *TmThreadsSlot1NoOut(void *td) {
TmThreadsSetFlag(tv, THV_INIT_DONE);
while(run) {
while (run) {
TmThreadTestThreadUnPaused(tv);
p = tv->tmqh_in(tv);
r = s->SlotFunc(tv, p, s->slot_data, /* no outqh no pq */NULL, /* no outqh no pq */NULL);
r = s->SlotFunc(tv, p, s->slot_data, /* no outqh no pq */ NULL,
/* no outqh no pq */ NULL);
/* handle error */
if (r == TM_ECODE_FAILED) {
TmqhOutputPacketpool(tv, p);
@ -231,7 +234,7 @@ void *TmThreadsSlot1NoOut(void *td) {
SCPerfUpdateCounterArray(tv->sc_perf_pca, &tv->sc_perf_pctx, 0);
run = 0;
}
}
} /* while (run) */
if (s->SlotThreadExitPrintStats != NULL) {
s->SlotThreadExitPrintStats(tv, s->slot_data);
@ -249,7 +252,8 @@ void *TmThreadsSlot1NoOut(void *td) {
pthread_exit((void *) 0);
}
void *TmThreadsSlot1NoInOut(void *td) {
void *TmThreadsSlot1NoInOut(void *td)
{
ThreadVars *tv = (ThreadVars *)td;
TmSlot *s = (TmSlot *)tv->tm_slots;
char run = 1;
@ -280,11 +284,10 @@ void *TmThreadsSlot1NoInOut(void *td) {
TmThreadsSetFlag(tv, THV_INIT_DONE);
while(run) {
while (run) {
TmThreadTestThreadUnPaused(tv);
r = s->SlotFunc(tv, NULL, s->slot_data, /* no outqh, no pq */NULL, NULL);
//printf("%s: TmThreadsSlot1NoInNoOut: r %" PRId32 "\n", tv->name, r);
/* handle error */
if (r == TM_ECODE_FAILED) {
@ -293,11 +296,10 @@ void *TmThreadsSlot1NoInOut(void *td) {
}
if (TmThreadsCheckFlag(tv, THV_KILL)) {
//printf("%s: TmThreadsSlot1NoInOut: KILL is set\n", tv->name);
SCPerfUpdateCounterArray(tv->sc_perf_pca, &tv->sc_perf_pctx, 0);
run = 0;
}
}
} /* while (run) */
if (s->SlotThreadExitPrintStats != NULL) {
s->SlotThreadExitPrintStats(tv, s->slot_data);
@ -311,12 +313,12 @@ void *TmThreadsSlot1NoInOut(void *td) {
}
}
//printf("TmThreadsSlot1NoInOut: %s ending\n", tv->name);
TmThreadsSetFlag(tv, THV_CLOSED);
pthread_exit((void *) 0);
}
void *TmThreadsSlot1(void *td) {
void *TmThreadsSlot1(void *td)
{
ThreadVars *tv = (ThreadVars *)td;
TmSlot *s = (TmSlot *)tv->tm_slots;
Packet *p = NULL;
@ -347,16 +349,16 @@ void *TmThreadsSlot1(void *td) {
memset(&s->slot_post_pq, 0, sizeof(PacketQueue));
TmThreadsSetFlag(tv, THV_INIT_DONE);
while(run) {
while (run) {
TmThreadTestThreadUnPaused(tv);
/* input a packet */
p = tv->tmqh_in(tv);
if (p == NULL) {
//printf("%s: TmThreadsSlot1: p == NULL\n", tv->name);
} else {
r = s->SlotFunc(tv, p, s->slot_data, &s->slot_pre_pq, &s->slot_post_pq);
r = s->SlotFunc(tv, p, s->slot_data, &s->slot_pre_pq,
&s->slot_post_pq);
/* handle error */
if (r == TM_ECODE_FAILED) {
TmqhReleasePacketsToPacketPool(&s->slot_pre_pq);
@ -387,11 +389,10 @@ void *TmThreadsSlot1(void *td) {
}
if (TmThreadsCheckFlag(tv, THV_KILL)) {
//printf("%s: TmThreadsSlot1: KILL is set\n", tv->name);
SCPerfUpdateCounterArray(tv->sc_perf_pca, &tv->sc_perf_pctx, 0);
run = 0;
}
}
} /* while (run) */
if (s->SlotThreadExitPrintStats != NULL) {
s->SlotThreadExitPrintStats(tv, s->slot_data);
@ -410,11 +411,14 @@ void *TmThreadsSlot1(void *td) {
pthread_exit((void *) 0);
}
/** \brief separate run function so we can call it recursively
/**
* \brief Separate run function so we can call it recursively.
*
* \todo deal with post_pq for slots beyond the first
* \todo Deal with post_pq for slots beyond the first.
*/
static inline TmEcode TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *slot) {
static inline TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p,
TmSlot *slot)
{
TmEcode r = TM_ECODE_OK;
TmSlot *s = NULL;
@ -442,9 +446,8 @@ static inline TmEcode TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *sl
/* see if we need to process the packet */
if (s->slot_next != NULL) {
r = TmThreadsSlotVarRun(tv, extra_p, s->slot_next);
/* XXX handle error */
/* \todo XXX handle error */
if (r == TM_ECODE_FAILED) {
//printf("TmThreadsSlotVarRun: recursive TmThreadsSlotVarRun returned 1\n");
TmqhReleasePacketsToPacketPool(&s->slot_pre_pq);
TmqhReleasePacketsToPacketPool(&s->slot_post_pq);
TmqhOutputPacketpool(tv, extra_p);
@ -455,17 +458,18 @@ static inline TmEcode TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *sl
tv->tmqh_out(tv, extra_p);
}
/** \todo post pq */
/** \todo XXX post pq */
}
return TM_ECODE_OK;
}
/**
* \todo only the first "slot" currently makes the "post_pq" available
* to the thread module.
* \todo Only the first "slot" currently makes the "post_pq" available
* to the thread module.
*/
void *TmThreadsSlotVar(void *td) {
void *TmThreadsSlotVar(void *td)
{
ThreadVars *tv = (ThreadVars *)td;
TmSlot *s = (TmSlot *)tv->tm_slots;
Packet *p = NULL;
@ -507,7 +511,7 @@ void *TmThreadsSlotVar(void *td) {
s = (TmSlot *)tv->tm_slots;
while(run) {
while (run) {
TmThreadTestThreadUnPaused(tv);
/* input a packet */
@ -548,7 +552,7 @@ void *TmThreadsSlotVar(void *td) {
if (TmThreadsCheckFlag(tv, THV_KILL)) {
run = 0;
}
}
} /* while (run) */
SCPerfUpdateCounterArray(tv->sc_perf_pca, &tv->sc_perf_pctx, 0);
s = (TmSlot *)tv->tm_slots;
@ -573,7 +577,7 @@ void *TmThreadsSlotVar(void *td) {
}
/**
* \brief We set the slot functions
* \brief We set the slot functions.
*
* \param tv Pointer to the TV to set the slot function for.
* \param name Name of the slot variant.
@ -634,7 +638,6 @@ void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, void *data)
if (slot == NULL)
return;
memset(slot, 0, sizeof(TmSlot));
slot->SlotThreadInit = tm->ThreadInit;
slot->slot_initdata = data;
slot->SlotFunc = tm->Func;
@ -666,16 +669,19 @@ void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, void *data)
#if !defined OS_WIN32 && !defined __OpenBSD__
static int SetCPUAffinitySet(cpu_set_t *cs) {
#if defined OS_FREEBSD
int r = cpuset_setaffinity(CPU_LEVEL_WHICH,CPU_WHICH_TID,SCGetThreadIdLong(),sizeof(cpu_set_t),cs);
int r = cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID,
SCGetThreadIdLong(), sizeof(cpu_set_t),cs);
#elif OS_DARWIN
int r = thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY, (void*)cs, THREAD_AFFINITY_POLICY_COUNT);
int r = thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY,
(void*)cs, THREAD_AFFINITY_POLICY_COUNT);
#else
pid_t tid = syscall(SYS_gettid);
int r = sched_setaffinity(tid,sizeof(cpu_set_t),cs);
int r = sched_setaffinity(tid, sizeof(cpu_set_t), cs);
#endif /* OS_FREEBSD */
if (r != 0) {
printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r, strerror(errno));
printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
strerror(errno));
return -1;
}
@ -684,13 +690,15 @@ static int SetCPUAffinitySet(cpu_set_t *cs) {
#endif
/**
* \brief Set the thread affinity on the calling thread
* \param cpuid id of the core/cpu to setup the affinity
* \retval 0 if all goes well; -1 if something is wrong
* \brief Set the thread affinity on the calling thread.
*
* \param cpuid Id of the core/cpu to setup the affinity.
*
* \retval 0 If all goes well; -1 if something is wrong.
*/
static int SetCPUAffinity(uint16_t cpuid) {
static int SetCPUAffinity(uint16_t cpuid)
{
#if !defined __OpenBSD__
int cpu = (int)cpuid;
#endif
@ -703,16 +711,18 @@ static int SetCPUAffinity(uint16_t cpuid) {
cpu_set_t cs;
CPU_ZERO(&cs);
CPU_SET(cpu,&cs);
CPU_SET(cpu, &cs);
#endif /* OS_WIN32 */
#ifdef OS_WIN32
int r = (0 == SetThreadAffinityMask(GetCurrentThread(), cs));
if (r != 0) {
printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r, strerror(errno));
return -1;
printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
strerror(errno));
return -1;
}
SCLogDebug("CPU Affinity for thread %lu set to CPU %" PRId32, SCGetThreadIdLong(), cpu);
SCLogDebug("CPU Affinity for thread %lu set to CPU %" PRId32,
SCGetThreadIdLong(), cpu);
return 0;
#elif !defined __OpenBSD__
@ -722,34 +732,42 @@ static int SetCPUAffinity(uint16_t cpuid) {
/**
* \brief Set the thread options (thread priority)
* \param tv pointer to the ThreadVars to setup the thread priority
* \retval TM_ECOE_OK
* \brief Set the thread options (thread priority).
*
* \param tv Pointer to the ThreadVars to setup the thread priority.
*
* \retval TM_ECODE_OK.
*/
TmEcode TmThreadSetThreadPriority(ThreadVars *tv, int prio) {
TmEcode TmThreadSetThreadPriority(ThreadVars *tv, int prio)
{
tv->thread_setup_flags |= THREAD_SET_PRIORITY;
tv->thread_priority = prio;
return TM_ECODE_OK;
}
/**
* \brief Adjusting nice value for threads
* \brief Adjusting nice value for threads.
*/
void TmThreadSetPrio(ThreadVars *tv)
{
SCEnter();
#ifdef OS_WIN32
if (0 == SetThreadPriority(GetCurrentThread(), tv->thread_priority)) {
SCLogError(SC_ERR_THREAD_NICE_PRIO, "Error setting priority for thread %s: %s", tv->name, strerror(errno));
SCLogError(SC_ERR_THREAD_NICE_PRIO, "Error setting priority for "
"thread %s: %s", tv->name, strerror(errno));
} else {
SCLogDebug("Priority set to %"PRId32" for thread %s", tv->thread_priority, tv->name);
SCLogDebug("Priority set to %"PRId32" for thread %s",
tv->thread_priority, tv->name);
}
#else
int ret = nice(tv->thread_priority);
if (ret == -1) {
SCLogError(SC_ERR_THREAD_NICE_PRIO, "Error setting nice value for thread %s: %s", tv->name, strerror(errno));
SCLogError(SC_ERR_THREAD_NICE_PRIO, "Error setting nice value "
"for thread %s: %s", tv->name, strerror(errno));
} else {
SCLogDebug("Nice value set to %"PRId32" for thread %s", tv->thread_priority, tv->name);
SCLogDebug("Nice value set to %"PRId32" for thread %s",
tv->thread_priority, tv->name);
}
#endif /* OS_WIN32 */
SCReturn;
@ -757,20 +775,26 @@ void TmThreadSetPrio(ThreadVars *tv)
/**
* \brief Set the thread options (cpu affinity)
* \param tv pointer to the ThreadVars to setup the affinity
* \retval TM_ECOE_OK
* \brief Set the thread options (cpu affinity).
*
* \param tv pointer to the ThreadVars to setup the affinity.
*
* \retval TM_ECODE_OK.
*/
TmEcode TmThreadSetCPUAffinity(ThreadVars *tv, uint16_t cpu) {
TmEcode TmThreadSetCPUAffinity(ThreadVars *tv, uint16_t cpu)
{
tv->thread_setup_flags |= THREAD_SET_AFFINITY;
tv->cpu_affinity = cpu;
return TM_ECODE_OK;
}
TmEcode TmThreadSetCPU(ThreadVars *tv, uint8_t type) {
if (! threading_set_cpu_affinity)
TmEcode TmThreadSetCPU(ThreadVars *tv, uint8_t type)
{
if (!threading_set_cpu_affinity)
return TM_ECODE_OK;
if (type > MAX_CPU_SET) {
SCLogError(SC_ERR_INVALID_ARGUMENT, "invalid cpu type family");
return TM_ECODE_FAILED;
@ -778,6 +802,7 @@ TmEcode TmThreadSetCPU(ThreadVars *tv, uint8_t type) {
tv->thread_setup_flags |= THREAD_SET_AFFTYPE;
tv->cpu_affinity = type;
return TM_ECODE_OK;
}
@ -787,19 +812,25 @@ int TmThreadGetNbThreads(uint8_t type)
SCLogError(SC_ERR_INVALID_ARGUMENT, "invalid cpu type family");
return 0;
}
return thread_affinity[type].nb_threads;
}
/**
* \brief Set the thread options (cpu affinitythread)
* Priority should be already set by pthread_create
* \param tv pointer to the ThreadVars of the calling thread
* \brief Set the thread options (cpu affinitythread).
* Priority should be already set by pthread_create.
*
* \param tv pointer to the ThreadVars of the calling thread.
*/
TmEcode TmThreadSetupOptions(ThreadVars *tv) {
TmEcode TmThreadSetupOptions(ThreadVars *tv)
{
if (tv->thread_setup_flags & THREAD_SET_AFFINITY) {
SCLogInfo("Setting affinity for \"%s\" Module to cpu/core %"PRIu16", thread id %lu", tv->name, tv->cpu_affinity, SCGetThreadIdLong());
SCLogInfo("Setting affinity for \"%s\" Module to cpu/core "
"%"PRIu16", thread id %lu", tv->name, tv->cpu_affinity,
SCGetThreadIdLong());
SetCPUAffinity(tv->cpu_affinity);
}
#if !defined OS_WIN32 && !defined __OpenBSD__
if (tv->thread_setup_flags & THREAD_SET_PRIORITY)
TmThreadSetPrio(tv);
@ -813,13 +844,14 @@ TmEcode TmThreadSetupOptions(ThreadVars *tv) {
tv->thread_priority = PRIO_LOW;
} else if (CPU_ISSET(cpu, &taf->medprio_cpu)) {
tv->thread_priority = PRIO_MEDIUM;
} else if (CPU_ISSET(cpu, &taf->hiprio_cpu)) {
} else if (CPU_ISSET(cpu, &taf->hiprio_cpu)) {
tv->thread_priority = PRIO_HIGH;
} else {
tv->thread_priority = taf->prio;
}
SCLogInfo("Setting prio %d for \"%s\" Module to cpu/core %"PRIu16", thread id %lu",
tv->thread_priority, tv->name, cpu, SCGetThreadIdLong());
SCLogInfo("Setting prio %d for \"%s\" Module to cpu/core "
"%"PRIu16", thread id %lu", tv->thread_priority,
tv->name, cpu, SCGetThreadIdLong());
} else {
SetCPUAffinitySet(&taf->cpu_set);
tv->thread_priority = taf->prio;
@ -827,6 +859,7 @@ TmEcode TmThreadSetupOptions(ThreadVars *tv) {
TmThreadSetPrio(tv);
}
#endif
return TM_ECODE_OK;
}
@ -872,13 +905,14 @@ ThreadVars *TmThreadCreate(char *name, char *inq_name, char *inqh_name,
tv->aof = THV_RESTART_THREAD;
/* set the incoming queue */
if (inq_name != NULL && strcmp(inq_name,"packetpool") != 0) {
if (inq_name != NULL && strcmp(inq_name, "packetpool") != 0) {
SCLogDebug("inq_name \"%s\"", inq_name);
tmq = TmqGetQueueByName(inq_name);
if (tmq == NULL) {
tmq = TmqCreateQueue(inq_name);
if (tmq == NULL) goto error;
if (tmq == NULL)
goto error;
}
SCLogDebug("tmq %p", tmq);
@ -890,7 +924,8 @@ ThreadVars *TmThreadCreate(char *name, char *inq_name, char *inqh_name,
SCLogDebug("inqh_name \"%s\"", inqh_name);
tmqh = TmqhGetQueueHandlerByName(inqh_name);
if (tmqh == NULL) goto error;
if (tmqh == NULL)
goto error;
tv->tmqh_in = tmqh->InHandler;
tv->InShutdownHandler = tmqh->InShutdownHandler;
@ -902,11 +937,12 @@ ThreadVars *TmThreadCreate(char *name, char *inq_name, char *inqh_name,
SCLogDebug("outqh_name \"%s\"", outqh_name);
tmqh = TmqhGetQueueHandlerByName(outqh_name);
if (tmqh == NULL) goto error;
if (tmqh == NULL)
goto error;
tv->tmqh_out = tmqh->OutHandler;
if (outq_name != NULL && strcmp(outq_name,"packetpool") != 0) {
if (outq_name != NULL && strcmp(outq_name, "packetpool") != 0) {
SCLogDebug("outq_name \"%s\"", outq_name);
if (tmqh->OutHandlerCtxSetup != NULL) {
@ -916,7 +952,8 @@ ThreadVars *TmThreadCreate(char *name, char *inq_name, char *inqh_name,
tmq = TmqGetQueueByName(outq_name);
if (tmq == NULL) {
tmq = TmqCreateQueue(outq_name);
if (tmq == NULL) goto error;
if (tmq == NULL)
goto error;
}
SCLogDebug("tmq %p", tmq);
@ -935,6 +972,7 @@ ThreadVars *TmThreadCreate(char *name, char *inq_name, char *inqh_name,
TmThreadInitMC(tv);
return tv;
error:
printf("ERROR: failed to setup a thread.\n");
return NULL;
@ -1012,8 +1050,8 @@ void TmThreadAppend(ThreadVars *tv, int type)
tv->next = NULL;
tv->prev = NULL;
//printf("TmThreadAppend: thread \'%s\' is the first thread in the list.\n", tv->name);
SCMutexUnlock(&tv_root_lock);
return;
}
@ -1031,7 +1069,8 @@ void TmThreadAppend(ThreadVars *tv, int type)
}
SCMutexUnlock(&tv_root_lock);
//printf("TmThreadAppend: thread \'%s\' is added to the list.\n", tv->name);
return;
}
/**
@ -1046,6 +1085,7 @@ void TmThreadRemove(ThreadVars *tv, int type)
if (tv_root[type] == NULL) {
SCMutexUnlock(&tv_root_lock);
return;
}
@ -1088,11 +1128,12 @@ void TmThreadKillThread(ThreadVars *tv)
if (tv->inq != NULL) {
/* signal the queue for the number of users */
if (tv->InShutdownHandler != NULL) {
tv->InShutdownHandler(tv);
}
for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++)
if (tv->InShutdownHandler != NULL) {
tv->InShutdownHandler(tv);
}
for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
SCCondSignal(&trans_q[tv->inq->id].cond_q);
}
/* to be sure, signal more */
while (1) {
@ -1100,11 +1141,12 @@ void TmThreadKillThread(ThreadVars *tv)
break;
}
if (tv->InShutdownHandler != NULL) {
tv->InShutdownHandler(tv);
}
for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++)
if (tv->InShutdownHandler != NULL) {
tv->InShutdownHandler(tv);
}
for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
SCCondSignal(&trans_q[tv->inq->id].cond_q);
}
usleep(100);
}
@ -1132,7 +1174,6 @@ void TmThreadKillThreads(void) {
for (i = 0; i < TVT_MAX; i++) {
tv = tv_root[i];
while (tv) {
TmThreadsSetFlag(tv, THV_KILL);
SCLogDebug("told thread %s to stop", tv->name);
@ -1140,16 +1181,14 @@ void TmThreadKillThreads(void) {
if (tv->inq != NULL) {
int i;
//printf("TmThreadKillThreads: (t->inq->reader_cnt + t->inq->writer_cnt) %" PRIu32 "\n", (t->inq->reader_cnt + t->inq->writer_cnt));
/* make sure our packet pending counter doesn't block */
//SCCondSignal(&cond_pending);
/* signal the queue for the number of users */
if (tv->InShutdownHandler != NULL) {
tv->InShutdownHandler(tv);
}
for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
if (tv->inq->q_type == 0)
SCCondSignal(&trans_q[tv->inq->id].cond_q);
@ -1206,6 +1245,8 @@ void TmThreadKillThreads(void) {
tv = tv->next;
}
}
return;
}
/**
@ -1274,7 +1315,8 @@ void TmThreadSetAOF(ThreadVars *tv, uint8_t aof)
void TmThreadInitMC(ThreadVars *tv)
{
if ( (tv->m = SCMalloc(sizeof(SCMutex))) == NULL) {
SCLogError(SC_ERR_FATAL, "Fatal error encountered in TmThreadInitMC. Exiting...");
SCLogError(SC_ERR_FATAL, "Fatal error encountered in TmThreadInitMC. "
"Exiting...");
exit(EXIT_FAILURE);
}
@ -1284,14 +1326,18 @@ void TmThreadInitMC(ThreadVars *tv)
}
if ( (tv->cond = SCMalloc(sizeof(SCCondT))) == NULL) {
SCLogError(SC_ERR_FATAL, "Fatal error encountered in TmThreadInitMC. Exiting...");
SCLogError(SC_ERR_FATAL, "Fatal error encountered in TmThreadInitMC. "
"Exiting...");
exit(0);
}
if (SCCondInit(tv->cond, NULL) != 0) {
printf("Error initializing the tv->cond condition variable\n");
SCLogError(SC_ERR_FATAL, "Error initializing the tv->cond condition "
"variable");
exit(0);
}
return;
}
/**
@ -1322,6 +1368,7 @@ void TmThreadTestThreadUnPaused(ThreadVars *tv)
void TmThreadContinue(ThreadVars *tv)
{
TmThreadsUnsetFlag(tv, THV_PAUSE);
return;
}
@ -1352,6 +1399,7 @@ void TmThreadContinueThreads()
void TmThreadPause(ThreadVars *tv)
{
TmThreadsSetFlag(tv, THV_PAUSE);
return;
}
@ -1397,6 +1445,7 @@ static void TmThreadRestartThread(ThreadVars *tv)
tv->restarted++;
SCLogInfo("thread \"%s\" restarted", tv->name);
return;
}
@ -1481,7 +1530,8 @@ TmEcode TmThreadWaitOnThreadInit(void)
}
SCLogInfo("all %"PRIu16" packet processing threads, %"PRIu16" management "
"threads initialized, engine started.", ppt_num, mgt_num);
"threads initialized, engine started.", ppt_num, mgt_num);
return TM_ECODE_OK;
}

Loading…
Cancel
Save