From 524af82b1a035135849dd5b10f13e051dfe6fa21 Mon Sep 17 00:00:00 2001 From: Anoop Saldanha Date: Thu, 11 Aug 2011 21:59:43 +0530 Subject: [PATCH] code cleanup in tm-threads.c --- src/tm-threads.c | 264 ++++++++++++++++++++++++++++------------------- 1 file changed, 157 insertions(+), 107 deletions(-) diff --git a/src/tm-threads.c b/src/tm-threads.c index 8020d0a212..b15baf5420 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -22,7 +22,7 @@ * \author Anoop Saldanha * \author Eric Leblond * - * Thread management functions + * Thread management functions. */ #include "suricata-common.h" @@ -69,36 +69,39 @@ ThreadVars *tv_root[TVT_MAX] = { NULL }; SCMutex tv_root_lock = PTHREAD_MUTEX_INITIALIZER; /* Action On Failure(AOF). Determines how the engine should behave when a - thread encounters a failure. Defaults to restart the failed thread */ + * thread encounters a failure. Defaults to restart the failed thread */ uint8_t tv_aof = THV_RESTART_THREAD; /** - * \brief Check if a thread flag is set + * \brief Check if a thread flag is set. * - * \retval 1 flag is set - * \retval 0 flag is not set + * \retval 1 flag is set. + * \retval 0 flag is not set. */ -int TmThreadsCheckFlag(ThreadVars *tv, uint8_t flag) { +int TmThreadsCheckFlag(ThreadVars *tv, uint8_t flag) +{ return (SC_ATOMIC_GET(tv->flags) & flag) ? 1 : 0; } /** - * \brief Set a thread flag + * \brief Set a thread flag. */ -void TmThreadsSetFlag(ThreadVars *tv, uint8_t flag) { +void TmThreadsSetFlag(ThreadVars *tv, uint8_t flag) +{ SC_ATOMIC_OR(tv->flags, flag); } /** - * \brief Unset a thread flag + * \brief Unset a thread flag. */ -void TmThreadsUnsetFlag(ThreadVars *tv, uint8_t flag) { +void TmThreadsUnsetFlag(ThreadVars *tv, uint8_t flag) +{ SC_ATOMIC_AND(tv->flags, ~flag); } /* 1 slot functions */ - -void *TmThreadsSlot1NoIn(void *td) { +void *TmThreadsSlot1NoIn(void *td) +{ ThreadVars *tv = (ThreadVars *)td; TmSlot *s = (TmSlot *)tv->tm_slots; Packet *p = NULL; @@ -128,7 +131,7 @@ void *TmThreadsSlot1NoIn(void *td) { TmThreadsSetFlag(tv, THV_INIT_DONE); - while(run) { + while (run) { TmThreadTestThreadUnPaused(tv); r = s->SlotFunc(tv, p, s->slot_data, &s->slot_pre_pq, &s->slot_post_pq); @@ -145,9 +148,8 @@ void *TmThreadsSlot1NoIn(void *td) { /* handle pre queue */ while (s->slot_pre_pq.top != NULL) { Packet *extra_p = PacketDequeue(&s->slot_pre_pq); - if (extra_p != NULL) { + if (extra_p != NULL) tv->tmqh_out(tv, extra_p); - } } tv->tmqh_out(tv, p); @@ -157,16 +159,15 @@ void *TmThreadsSlot1NoIn(void *td) { /* handle post queue */ while (s->slot_post_pq.top != NULL) { Packet *extra_p = PacketDequeue(&s->slot_post_pq); - if (extra_p != NULL) { + if (extra_p != NULL) tv->tmqh_out(tv, extra_p); - } } if (TmThreadsCheckFlag(tv, THV_KILL)) { SCPerfUpdateCounterArray(tv->sc_perf_pca, &tv->sc_perf_pctx, 0); run = 0; } - } + } /* while (run) */ if (s->SlotThreadExitPrintStats != NULL) { s->SlotThreadExitPrintStats(tv, s->slot_data); @@ -184,7 +185,8 @@ void *TmThreadsSlot1NoIn(void *td) { pthread_exit((void *) 0); } -void *TmThreadsSlot1NoOut(void *td) { +void *TmThreadsSlot1NoOut(void *td) +{ ThreadVars *tv = (ThreadVars *)td; TmSlot *s = (TmSlot *)tv->tm_slots; Packet *p = NULL; @@ -214,12 +216,13 @@ void *TmThreadsSlot1NoOut(void *td) { TmThreadsSetFlag(tv, THV_INIT_DONE); - while(run) { + while (run) { TmThreadTestThreadUnPaused(tv); p = tv->tmqh_in(tv); - r = s->SlotFunc(tv, p, s->slot_data, /* no outqh no pq */NULL, /* no outqh no pq */NULL); + r = s->SlotFunc(tv, p, s->slot_data, /* no outqh no pq */ NULL, + /* no outqh no pq */ NULL); /* handle error */ if (r == TM_ECODE_FAILED) { TmqhOutputPacketpool(tv, p); @@ -231,7 +234,7 @@ void *TmThreadsSlot1NoOut(void *td) { SCPerfUpdateCounterArray(tv->sc_perf_pca, &tv->sc_perf_pctx, 0); run = 0; } - } + } /* while (run) */ if (s->SlotThreadExitPrintStats != NULL) { s->SlotThreadExitPrintStats(tv, s->slot_data); @@ -249,7 +252,8 @@ void *TmThreadsSlot1NoOut(void *td) { pthread_exit((void *) 0); } -void *TmThreadsSlot1NoInOut(void *td) { +void *TmThreadsSlot1NoInOut(void *td) +{ ThreadVars *tv = (ThreadVars *)td; TmSlot *s = (TmSlot *)tv->tm_slots; char run = 1; @@ -280,11 +284,10 @@ void *TmThreadsSlot1NoInOut(void *td) { TmThreadsSetFlag(tv, THV_INIT_DONE); - while(run) { + while (run) { TmThreadTestThreadUnPaused(tv); r = s->SlotFunc(tv, NULL, s->slot_data, /* no outqh, no pq */NULL, NULL); - //printf("%s: TmThreadsSlot1NoInNoOut: r %" PRId32 "\n", tv->name, r); /* handle error */ if (r == TM_ECODE_FAILED) { @@ -293,11 +296,10 @@ void *TmThreadsSlot1NoInOut(void *td) { } if (TmThreadsCheckFlag(tv, THV_KILL)) { - //printf("%s: TmThreadsSlot1NoInOut: KILL is set\n", tv->name); SCPerfUpdateCounterArray(tv->sc_perf_pca, &tv->sc_perf_pctx, 0); run = 0; } - } + } /* while (run) */ if (s->SlotThreadExitPrintStats != NULL) { s->SlotThreadExitPrintStats(tv, s->slot_data); @@ -311,12 +313,12 @@ void *TmThreadsSlot1NoInOut(void *td) { } } - //printf("TmThreadsSlot1NoInOut: %s ending\n", tv->name); TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) 0); } -void *TmThreadsSlot1(void *td) { +void *TmThreadsSlot1(void *td) +{ ThreadVars *tv = (ThreadVars *)td; TmSlot *s = (TmSlot *)tv->tm_slots; Packet *p = NULL; @@ -347,16 +349,16 @@ void *TmThreadsSlot1(void *td) { memset(&s->slot_post_pq, 0, sizeof(PacketQueue)); TmThreadsSetFlag(tv, THV_INIT_DONE); - while(run) { + while (run) { TmThreadTestThreadUnPaused(tv); /* input a packet */ p = tv->tmqh_in(tv); if (p == NULL) { - //printf("%s: TmThreadsSlot1: p == NULL\n", tv->name); } else { - r = s->SlotFunc(tv, p, s->slot_data, &s->slot_pre_pq, &s->slot_post_pq); + r = s->SlotFunc(tv, p, s->slot_data, &s->slot_pre_pq, + &s->slot_post_pq); /* handle error */ if (r == TM_ECODE_FAILED) { TmqhReleasePacketsToPacketPool(&s->slot_pre_pq); @@ -387,11 +389,10 @@ void *TmThreadsSlot1(void *td) { } if (TmThreadsCheckFlag(tv, THV_KILL)) { - //printf("%s: TmThreadsSlot1: KILL is set\n", tv->name); SCPerfUpdateCounterArray(tv->sc_perf_pca, &tv->sc_perf_pctx, 0); run = 0; } - } + } /* while (run) */ if (s->SlotThreadExitPrintStats != NULL) { s->SlotThreadExitPrintStats(tv, s->slot_data); @@ -410,11 +411,14 @@ void *TmThreadsSlot1(void *td) { pthread_exit((void *) 0); } -/** \brief separate run function so we can call it recursively +/** + * \brief Separate run function so we can call it recursively. * - * \todo deal with post_pq for slots beyond the first + * \todo Deal with post_pq for slots beyond the first. */ -static inline TmEcode TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *slot) { +static inline TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, + TmSlot *slot) +{ TmEcode r = TM_ECODE_OK; TmSlot *s = NULL; @@ -442,9 +446,8 @@ static inline TmEcode TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *sl /* see if we need to process the packet */ if (s->slot_next != NULL) { r = TmThreadsSlotVarRun(tv, extra_p, s->slot_next); - /* XXX handle error */ + /* \todo XXX handle error */ if (r == TM_ECODE_FAILED) { - //printf("TmThreadsSlotVarRun: recursive TmThreadsSlotVarRun returned 1\n"); TmqhReleasePacketsToPacketPool(&s->slot_pre_pq); TmqhReleasePacketsToPacketPool(&s->slot_post_pq); TmqhOutputPacketpool(tv, extra_p); @@ -455,17 +458,18 @@ static inline TmEcode TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *sl tv->tmqh_out(tv, extra_p); } - /** \todo post pq */ + /** \todo XXX post pq */ } return TM_ECODE_OK; } /** - * \todo only the first "slot" currently makes the "post_pq" available - * to the thread module. + * \todo Only the first "slot" currently makes the "post_pq" available + * to the thread module. */ -void *TmThreadsSlotVar(void *td) { +void *TmThreadsSlotVar(void *td) +{ ThreadVars *tv = (ThreadVars *)td; TmSlot *s = (TmSlot *)tv->tm_slots; Packet *p = NULL; @@ -507,7 +511,7 @@ void *TmThreadsSlotVar(void *td) { s = (TmSlot *)tv->tm_slots; - while(run) { + while (run) { TmThreadTestThreadUnPaused(tv); /* input a packet */ @@ -548,7 +552,7 @@ void *TmThreadsSlotVar(void *td) { if (TmThreadsCheckFlag(tv, THV_KILL)) { run = 0; } - } + } /* while (run) */ SCPerfUpdateCounterArray(tv->sc_perf_pca, &tv->sc_perf_pctx, 0); s = (TmSlot *)tv->tm_slots; @@ -573,7 +577,7 @@ void *TmThreadsSlotVar(void *td) { } /** - * \brief We set the slot functions + * \brief We set the slot functions. * * \param tv Pointer to the TV to set the slot function for. * \param name Name of the slot variant. @@ -634,7 +638,6 @@ void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, void *data) if (slot == NULL) return; memset(slot, 0, sizeof(TmSlot)); - slot->SlotThreadInit = tm->ThreadInit; slot->slot_initdata = data; slot->SlotFunc = tm->Func; @@ -666,16 +669,19 @@ void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, void *data) #if !defined OS_WIN32 && !defined __OpenBSD__ static int SetCPUAffinitySet(cpu_set_t *cs) { #if defined OS_FREEBSD - int r = cpuset_setaffinity(CPU_LEVEL_WHICH,CPU_WHICH_TID,SCGetThreadIdLong(),sizeof(cpu_set_t),cs); + int r = cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID, + SCGetThreadIdLong(), sizeof(cpu_set_t),cs); #elif OS_DARWIN - int r = thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY, (void*)cs, THREAD_AFFINITY_POLICY_COUNT); + int r = thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY, + (void*)cs, THREAD_AFFINITY_POLICY_COUNT); #else pid_t tid = syscall(SYS_gettid); - int r = sched_setaffinity(tid,sizeof(cpu_set_t),cs); + int r = sched_setaffinity(tid, sizeof(cpu_set_t), cs); #endif /* OS_FREEBSD */ if (r != 0) { - printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r, strerror(errno)); + printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r, + strerror(errno)); return -1; } @@ -684,13 +690,15 @@ static int SetCPUAffinitySet(cpu_set_t *cs) { #endif - /** - * \brief Set the thread affinity on the calling thread - * \param cpuid id of the core/cpu to setup the affinity - * \retval 0 if all goes well; -1 if something is wrong + * \brief Set the thread affinity on the calling thread. + * + * \param cpuid Id of the core/cpu to setup the affinity. + * + * \retval 0 If all goes well; -1 if something is wrong. */ -static int SetCPUAffinity(uint16_t cpuid) { +static int SetCPUAffinity(uint16_t cpuid) +{ #if !defined __OpenBSD__ int cpu = (int)cpuid; #endif @@ -703,16 +711,18 @@ static int SetCPUAffinity(uint16_t cpuid) { cpu_set_t cs; CPU_ZERO(&cs); - CPU_SET(cpu,&cs); + CPU_SET(cpu, &cs); #endif /* OS_WIN32 */ #ifdef OS_WIN32 int r = (0 == SetThreadAffinityMask(GetCurrentThread(), cs)); if (r != 0) { - printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r, strerror(errno)); - return -1; + printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r, + strerror(errno)); + return -1; } - SCLogDebug("CPU Affinity for thread %lu set to CPU %" PRId32, SCGetThreadIdLong(), cpu); + SCLogDebug("CPU Affinity for thread %lu set to CPU %" PRId32, + SCGetThreadIdLong(), cpu); return 0; #elif !defined __OpenBSD__ @@ -722,34 +732,42 @@ static int SetCPUAffinity(uint16_t cpuid) { /** - * \brief Set the thread options (thread priority) - * \param tv pointer to the ThreadVars to setup the thread priority - * \retval TM_ECOE_OK + * \brief Set the thread options (thread priority). + * + * \param tv Pointer to the ThreadVars to setup the thread priority. + * + * \retval TM_ECODE_OK. */ -TmEcode TmThreadSetThreadPriority(ThreadVars *tv, int prio) { +TmEcode TmThreadSetThreadPriority(ThreadVars *tv, int prio) +{ tv->thread_setup_flags |= THREAD_SET_PRIORITY; tv->thread_priority = prio; + return TM_ECODE_OK; } /** - * \brief Adjusting nice value for threads + * \brief Adjusting nice value for threads. */ void TmThreadSetPrio(ThreadVars *tv) { SCEnter(); #ifdef OS_WIN32 if (0 == SetThreadPriority(GetCurrentThread(), tv->thread_priority)) { - SCLogError(SC_ERR_THREAD_NICE_PRIO, "Error setting priority for thread %s: %s", tv->name, strerror(errno)); + SCLogError(SC_ERR_THREAD_NICE_PRIO, "Error setting priority for " + "thread %s: %s", tv->name, strerror(errno)); } else { - SCLogDebug("Priority set to %"PRId32" for thread %s", tv->thread_priority, tv->name); + SCLogDebug("Priority set to %"PRId32" for thread %s", + tv->thread_priority, tv->name); } #else int ret = nice(tv->thread_priority); if (ret == -1) { - SCLogError(SC_ERR_THREAD_NICE_PRIO, "Error setting nice value for thread %s: %s", tv->name, strerror(errno)); + SCLogError(SC_ERR_THREAD_NICE_PRIO, "Error setting nice value " + "for thread %s: %s", tv->name, strerror(errno)); } else { - SCLogDebug("Nice value set to %"PRId32" for thread %s", tv->thread_priority, tv->name); + SCLogDebug("Nice value set to %"PRId32" for thread %s", + tv->thread_priority, tv->name); } #endif /* OS_WIN32 */ SCReturn; @@ -757,20 +775,26 @@ void TmThreadSetPrio(ThreadVars *tv) /** - * \brief Set the thread options (cpu affinity) - * \param tv pointer to the ThreadVars to setup the affinity - * \retval TM_ECOE_OK + * \brief Set the thread options (cpu affinity). + * + * \param tv pointer to the ThreadVars to setup the affinity. + * + * \retval TM_ECODE_OK. */ -TmEcode TmThreadSetCPUAffinity(ThreadVars *tv, uint16_t cpu) { +TmEcode TmThreadSetCPUAffinity(ThreadVars *tv, uint16_t cpu) +{ tv->thread_setup_flags |= THREAD_SET_AFFINITY; tv->cpu_affinity = cpu; + return TM_ECODE_OK; } -TmEcode TmThreadSetCPU(ThreadVars *tv, uint8_t type) { - if (! threading_set_cpu_affinity) +TmEcode TmThreadSetCPU(ThreadVars *tv, uint8_t type) +{ + if (!threading_set_cpu_affinity) return TM_ECODE_OK; + if (type > MAX_CPU_SET) { SCLogError(SC_ERR_INVALID_ARGUMENT, "invalid cpu type family"); return TM_ECODE_FAILED; @@ -778,6 +802,7 @@ TmEcode TmThreadSetCPU(ThreadVars *tv, uint8_t type) { tv->thread_setup_flags |= THREAD_SET_AFFTYPE; tv->cpu_affinity = type; + return TM_ECODE_OK; } @@ -787,19 +812,25 @@ int TmThreadGetNbThreads(uint8_t type) SCLogError(SC_ERR_INVALID_ARGUMENT, "invalid cpu type family"); return 0; } + return thread_affinity[type].nb_threads; } /** - * \brief Set the thread options (cpu affinitythread) - * Priority should be already set by pthread_create - * \param tv pointer to the ThreadVars of the calling thread + * \brief Set the thread options (cpu affinitythread). + * Priority should be already set by pthread_create. + * + * \param tv pointer to the ThreadVars of the calling thread. */ -TmEcode TmThreadSetupOptions(ThreadVars *tv) { +TmEcode TmThreadSetupOptions(ThreadVars *tv) +{ if (tv->thread_setup_flags & THREAD_SET_AFFINITY) { - SCLogInfo("Setting affinity for \"%s\" Module to cpu/core %"PRIu16", thread id %lu", tv->name, tv->cpu_affinity, SCGetThreadIdLong()); + SCLogInfo("Setting affinity for \"%s\" Module to cpu/core " + "%"PRIu16", thread id %lu", tv->name, tv->cpu_affinity, + SCGetThreadIdLong()); SetCPUAffinity(tv->cpu_affinity); } + #if !defined OS_WIN32 && !defined __OpenBSD__ if (tv->thread_setup_flags & THREAD_SET_PRIORITY) TmThreadSetPrio(tv); @@ -813,13 +844,14 @@ TmEcode TmThreadSetupOptions(ThreadVars *tv) { tv->thread_priority = PRIO_LOW; } else if (CPU_ISSET(cpu, &taf->medprio_cpu)) { tv->thread_priority = PRIO_MEDIUM; - } else if (CPU_ISSET(cpu, &taf->hiprio_cpu)) { + } else if (CPU_ISSET(cpu, &taf->hiprio_cpu)) { tv->thread_priority = PRIO_HIGH; } else { tv->thread_priority = taf->prio; } - SCLogInfo("Setting prio %d for \"%s\" Module to cpu/core %"PRIu16", thread id %lu", - tv->thread_priority, tv->name, cpu, SCGetThreadIdLong()); + SCLogInfo("Setting prio %d for \"%s\" Module to cpu/core " + "%"PRIu16", thread id %lu", tv->thread_priority, + tv->name, cpu, SCGetThreadIdLong()); } else { SetCPUAffinitySet(&taf->cpu_set); tv->thread_priority = taf->prio; @@ -827,6 +859,7 @@ TmEcode TmThreadSetupOptions(ThreadVars *tv) { TmThreadSetPrio(tv); } #endif + return TM_ECODE_OK; } @@ -872,13 +905,14 @@ ThreadVars *TmThreadCreate(char *name, char *inq_name, char *inqh_name, tv->aof = THV_RESTART_THREAD; /* set the incoming queue */ - if (inq_name != NULL && strcmp(inq_name,"packetpool") != 0) { + if (inq_name != NULL && strcmp(inq_name, "packetpool") != 0) { SCLogDebug("inq_name \"%s\"", inq_name); tmq = TmqGetQueueByName(inq_name); if (tmq == NULL) { tmq = TmqCreateQueue(inq_name); - if (tmq == NULL) goto error; + if (tmq == NULL) + goto error; } SCLogDebug("tmq %p", tmq); @@ -890,7 +924,8 @@ ThreadVars *TmThreadCreate(char *name, char *inq_name, char *inqh_name, SCLogDebug("inqh_name \"%s\"", inqh_name); tmqh = TmqhGetQueueHandlerByName(inqh_name); - if (tmqh == NULL) goto error; + if (tmqh == NULL) + goto error; tv->tmqh_in = tmqh->InHandler; tv->InShutdownHandler = tmqh->InShutdownHandler; @@ -902,11 +937,12 @@ ThreadVars *TmThreadCreate(char *name, char *inq_name, char *inqh_name, SCLogDebug("outqh_name \"%s\"", outqh_name); tmqh = TmqhGetQueueHandlerByName(outqh_name); - if (tmqh == NULL) goto error; + if (tmqh == NULL) + goto error; tv->tmqh_out = tmqh->OutHandler; - if (outq_name != NULL && strcmp(outq_name,"packetpool") != 0) { + if (outq_name != NULL && strcmp(outq_name, "packetpool") != 0) { SCLogDebug("outq_name \"%s\"", outq_name); if (tmqh->OutHandlerCtxSetup != NULL) { @@ -916,7 +952,8 @@ ThreadVars *TmThreadCreate(char *name, char *inq_name, char *inqh_name, tmq = TmqGetQueueByName(outq_name); if (tmq == NULL) { tmq = TmqCreateQueue(outq_name); - if (tmq == NULL) goto error; + if (tmq == NULL) + goto error; } SCLogDebug("tmq %p", tmq); @@ -935,6 +972,7 @@ ThreadVars *TmThreadCreate(char *name, char *inq_name, char *inqh_name, TmThreadInitMC(tv); return tv; + error: printf("ERROR: failed to setup a thread.\n"); return NULL; @@ -1012,8 +1050,8 @@ void TmThreadAppend(ThreadVars *tv, int type) tv->next = NULL; tv->prev = NULL; - //printf("TmThreadAppend: thread \'%s\' is the first thread in the list.\n", tv->name); SCMutexUnlock(&tv_root_lock); + return; } @@ -1031,7 +1069,8 @@ void TmThreadAppend(ThreadVars *tv, int type) } SCMutexUnlock(&tv_root_lock); - //printf("TmThreadAppend: thread \'%s\' is added to the list.\n", tv->name); + + return; } /** @@ -1046,6 +1085,7 @@ void TmThreadRemove(ThreadVars *tv, int type) if (tv_root[type] == NULL) { SCMutexUnlock(&tv_root_lock); + return; } @@ -1088,11 +1128,12 @@ void TmThreadKillThread(ThreadVars *tv) if (tv->inq != NULL) { /* signal the queue for the number of users */ - if (tv->InShutdownHandler != NULL) { - tv->InShutdownHandler(tv); - } - for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) + if (tv->InShutdownHandler != NULL) { + tv->InShutdownHandler(tv); + } + for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) { SCCondSignal(&trans_q[tv->inq->id].cond_q); + } /* to be sure, signal more */ while (1) { @@ -1100,11 +1141,12 @@ void TmThreadKillThread(ThreadVars *tv) break; } - if (tv->InShutdownHandler != NULL) { - tv->InShutdownHandler(tv); - } - for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) + if (tv->InShutdownHandler != NULL) { + tv->InShutdownHandler(tv); + } + for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) { SCCondSignal(&trans_q[tv->inq->id].cond_q); + } usleep(100); } @@ -1132,7 +1174,6 @@ void TmThreadKillThreads(void) { for (i = 0; i < TVT_MAX; i++) { tv = tv_root[i]; - while (tv) { TmThreadsSetFlag(tv, THV_KILL); SCLogDebug("told thread %s to stop", tv->name); @@ -1140,16 +1181,14 @@ void TmThreadKillThreads(void) { if (tv->inq != NULL) { int i; - //printf("TmThreadKillThreads: (t->inq->reader_cnt + t->inq->writer_cnt) %" PRIu32 "\n", (t->inq->reader_cnt + t->inq->writer_cnt)); - /* make sure our packet pending counter doesn't block */ - //SCCondSignal(&cond_pending); /* signal the queue for the number of users */ if (tv->InShutdownHandler != NULL) { tv->InShutdownHandler(tv); } + for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) { if (tv->inq->q_type == 0) SCCondSignal(&trans_q[tv->inq->id].cond_q); @@ -1206,6 +1245,8 @@ void TmThreadKillThreads(void) { tv = tv->next; } } + + return; } /** @@ -1274,7 +1315,8 @@ void TmThreadSetAOF(ThreadVars *tv, uint8_t aof) void TmThreadInitMC(ThreadVars *tv) { if ( (tv->m = SCMalloc(sizeof(SCMutex))) == NULL) { - SCLogError(SC_ERR_FATAL, "Fatal error encountered in TmThreadInitMC. Exiting..."); + SCLogError(SC_ERR_FATAL, "Fatal error encountered in TmThreadInitMC. " + "Exiting..."); exit(EXIT_FAILURE); } @@ -1284,14 +1326,18 @@ void TmThreadInitMC(ThreadVars *tv) } if ( (tv->cond = SCMalloc(sizeof(SCCondT))) == NULL) { - SCLogError(SC_ERR_FATAL, "Fatal error encountered in TmThreadInitMC. Exiting..."); + SCLogError(SC_ERR_FATAL, "Fatal error encountered in TmThreadInitMC. " + "Exiting..."); exit(0); } if (SCCondInit(tv->cond, NULL) != 0) { - printf("Error initializing the tv->cond condition variable\n"); + SCLogError(SC_ERR_FATAL, "Error initializing the tv->cond condition " + "variable"); exit(0); } + + return; } /** @@ -1322,6 +1368,7 @@ void TmThreadTestThreadUnPaused(ThreadVars *tv) void TmThreadContinue(ThreadVars *tv) { TmThreadsUnsetFlag(tv, THV_PAUSE); + return; } @@ -1352,6 +1399,7 @@ void TmThreadContinueThreads() void TmThreadPause(ThreadVars *tv) { TmThreadsSetFlag(tv, THV_PAUSE); + return; } @@ -1397,6 +1445,7 @@ static void TmThreadRestartThread(ThreadVars *tv) tv->restarted++; SCLogInfo("thread \"%s\" restarted", tv->name); + return; } @@ -1481,7 +1530,8 @@ TmEcode TmThreadWaitOnThreadInit(void) } SCLogInfo("all %"PRIu16" packet processing threads, %"PRIu16" management " - "threads initialized, engine started.", ppt_num, mgt_num); + "threads initialized, engine started.", ppt_num, mgt_num); + return TM_ECODE_OK; }