threads: helper function TmThreadsWaitForUnpause

The pattern of checking the pause flag, setting to paused then
waiting to unpause was done enough times to factor out into its own
function. This is also needed by library users who bring their own
packet acquisition threads.
pull/11948/head
Jason Ish 5 months ago committed by Victor Julien
parent 7d854bd97f
commit 3f8c3698db

@ -407,11 +407,7 @@ static void *StatsMgmtThread(void *arg)
TmThreadsSetFlag(tv_local, THV_INIT_DONE | THV_RUNNING);
while (1) {
if (TmThreadsCheckFlag(tv_local, THV_PAUSE)) {
TmThreadsSetFlag(tv_local, THV_PAUSED);
TmThreadTestThreadUnPaused(tv_local);
TmThreadsUnsetFlag(tv_local, THV_PAUSED);
}
TmThreadsWaitForUnpause(tv_local);
struct timeval cur_timev;
gettimeofday(&cur_timev, NULL);
@ -489,11 +485,7 @@ static void *StatsWakeupThread(void *arg)
TmThreadsSetFlag(tv_local, THV_INIT_DONE | THV_RUNNING);
while (1) {
if (TmThreadsCheckFlag(tv_local, THV_PAUSE)) {
TmThreadsSetFlag(tv_local, THV_PAUSED);
TmThreadTestThreadUnPaused(tv_local);
TmThreadsUnsetFlag(tv_local, THV_PAUSED);
}
TmThreadsWaitForUnpause(tv_local);
struct timeval cur_timev;
gettimeofday(&cur_timev, NULL);

@ -597,11 +597,7 @@ static TmEcode DetectLoader(ThreadVars *th_v, void *thread_data)
SCLogDebug("loader thread started");
while (1)
{
if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
TmThreadsSetFlag(th_v, THV_PAUSED);
TmThreadTestThreadUnPaused(th_v);
TmThreadsUnsetFlag(th_v, THV_PAUSED);
}
TmThreadsWaitForUnpause(th_v);
/* see if we have tasks */

@ -96,11 +96,7 @@ static TmEcode BypassedFlowManager(ThreadVars *th_v, void *thread_data)
TmThreadsSetFlag(th_v, THV_RUNNING);
while (1) {
if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
TmThreadsSetFlag(th_v, THV_PAUSED);
TmThreadTestThreadUnPaused(th_v);
TmThreadsUnsetFlag(th_v, THV_PAUSED);
}
TmThreadsWaitForUnpause(th_v);
SCLogDebug("Dumping the table");
gettimeofday(&tv, NULL);
TIMEVAL_TO_TIMESPEC(&tv, &curtime);

@ -820,11 +820,7 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
while (1)
{
if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
TmThreadsSetFlag(th_v, THV_PAUSED);
TmThreadTestThreadUnPaused(th_v);
TmThreadsUnsetFlag(th_v, THV_PAUSED);
}
TmThreadsWaitForUnpause(th_v);
bool emerg = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) != 0);
@ -1085,11 +1081,7 @@ static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
while (1)
{
if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
TmThreadsSetFlag(th_v, THV_PAUSED);
TmThreadTestThreadUnPaused(th_v);
TmThreadsUnsetFlag(th_v, THV_PAUSED);
}
TmThreadsWaitForUnpause(th_v);
SC_ATOMIC_ADD(flowrec_busy,1);
FlowQueuePrivate list = FlowQueueExtractPrivate(&flow_recycle_q);

@ -305,11 +305,7 @@ static void *TmThreadsSlotPktAcqLoop(void *td)
TmThreadsSetFlag(tv, THV_INIT_DONE);
while(run) {
if (TmThreadsCheckFlag(tv, THV_PAUSE)) {
TmThreadsSetFlag(tv, THV_PAUSED);
TmThreadTestThreadUnPaused(tv);
TmThreadsUnsetFlag(tv, THV_PAUSED);
}
TmThreadsWaitForUnpause(tv);
r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
@ -362,6 +358,15 @@ error:
return NULL;
}
void TmThreadsWaitForUnpause(ThreadVars *tv)
{
if (TmThreadsCheckFlag(tv, THV_PAUSE)) {
TmThreadsSetFlag(tv, THV_PAUSED);
TmThreadTestThreadUnPaused(tv);
TmThreadsUnsetFlag(tv, THV_PAUSED);
}
}
static void *TmThreadsSlotVar(void *td)
{
ThreadVars *tv = (ThreadVars *)td;
@ -442,11 +447,7 @@ static void *TmThreadsSlotVar(void *td)
s = (TmSlot *)tv->tm_slots;
while (run) {
if (TmThreadsCheckFlag(tv, THV_PAUSE)) {
TmThreadsSetFlag(tv, THV_PAUSED);
TmThreadTestThreadUnPaused(tv);
TmThreadsUnsetFlag(tv, THV_PAUSED);
}
TmThreadsWaitForUnpause(tv);
/* input a packet */
p = tv->tmqh_in(tv);

@ -288,4 +288,7 @@ void TmThreadsGetMinimalTimestamp(struct timeval *ts);
uint16_t TmThreadsGetWorkerThreadMax(void);
bool TmThreadsTimeSubsysIsReady(void);
/** \brief Wait for a thread to become unpaused. */
void TmThreadsWaitForUnpause(ThreadVars *tv);
#endif /* SURICATA_TM_THREADS_H */

Loading…
Cancel
Save