diff --git a/src/suricata.c b/src/suricata.c index 6313b01bab..db3373def0 100644 --- a/src/suricata.c +++ b/src/suricata.c @@ -2283,9 +2283,14 @@ void PostRunDeinit(const int runmode, struct timeval *start_time) /* handle graceful shutdown of the flow engine, it's helper * threads and the packet threads */ FlowDisableFlowManagerThread(); + /* disable capture */ TmThreadDisableReceiveThreads(); + /* tell packet threads to enter flow timeout loop */ + TmThreadDisablePacketThreads(THV_REQ_FLOW_LOOP, THV_FLOW_LOOP); + /* run cleanup on the flow hash */ FlowWorkToDoCleanup(); - TmThreadDisablePacketThreads(); + /* gracefully shut down packet threads */ + TmThreadDisablePacketThreads(THV_KILL, THV_RUNNING_DONE); SCPrintElapsedTime(start_time); FlowDisableFlowRecyclerThread(); diff --git a/src/threadvars.h b/src/threadvars.h index b7b39ad491..d645d0645e 100644 --- a/src/threadvars.h +++ b/src/threadvars.h @@ -45,7 +45,7 @@ struct TmSlot_; #define THV_DEINIT BIT_U32(7) #define THV_RUNNING_DONE BIT_U32(8) /** thread has completed running and is entering * the de-init phase */ -#define THV_KILL_PKTACQ BIT_U32(9) /**< flag thread to stop packet acq */ +#define THV_REQ_FLOW_LOOP BIT_U32(9) /**< request thread to enter flow timeout loop */ #define THV_FLOW_LOOP BIT_U32(10) /**< thread is in flow shutdown loop */ /** signal thread's capture method to create a fake packet to force through diff --git a/src/tm-threads.c b/src/tm-threads.c index d8246cb12b..ee4a3e750f 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -337,7 +337,7 @@ static void *TmThreadsSlotPktAcqLoop(void *td) TmThreadsSetFlag(tv, THV_FAILED); run = false; } - if (TmThreadsCheckFlag(tv, THV_KILL_PKTACQ) || suricata_ctl_flags) { + if (TmThreadsCheckFlag(tv, THV_REQ_FLOW_LOOP) || suricata_ctl_flags) { run = false; } if (r == TM_ECODE_DONE) { @@ -517,37 +517,16 @@ static void *TmThreadsSlotVar(void *td) TmThreadsHandleInjectedPackets(tv); } - if (TmThreadsCheckFlag(tv, THV_KILL)) { + if (TmThreadsCheckFlag(tv, THV_REQ_FLOW_LOOP)) { run = false; } - } /* while (run) */ - StatsSyncCounters(tv); - - TmThreadsSetFlag(tv, THV_RUNNING_DONE); - TmThreadWaitForFlag(tv, THV_DEINIT); - - PacketPoolDestroy(); - - s = (TmSlot *)tv->tm_slots; - - for ( ; s != NULL; s = s->slot_next) { - if (s->SlotThreadExitPrintStats != NULL) { - s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data)); - } - - if (s->SlotThreadDeinit != NULL) { - r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data)); - if (r != TM_ECODE_OK) { - TmThreadsSetFlag(tv, THV_CLOSED); - goto error; - } - } } + if (!SCTmThreadsSlotPktAcqLoopFinish(tv)) { + goto error; + } + StatsSyncCounters(tv); - SCLogDebug("%s ending", tv->name); - tv->stream_pq = NULL; - TmThreadsSetFlag(tv, THV_CLOSED); - pthread_exit((void *) 0); + pthread_exit(NULL); return NULL; error: @@ -1462,7 +1441,7 @@ again: if (tm && tm->PktAcqBreakLoop != NULL) { tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(slots->slot_data)); } - TmThreadsSetFlag(tv, THV_KILL_PKTACQ); + TmThreadsSetFlag(tv, THV_REQ_FLOW_LOOP); if (tv->inq != NULL) { for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) { @@ -1515,8 +1494,17 @@ static void TmThreadDebugValidateNoMorePackets(void) /** * \brief Disable all packet threads + * \param set flag to set + * \param check flag to check + * + * Support 2 stages in shutting down the packet threads: + * 1. set THV_REQ_FLOW_LOOP and wait for THV_FLOW_LOOP + * 2. set THV_KILL and wait for THV_RUNNING_DONE + * + * During step 1 the main loop is exited, and the flow loop logic is entered. + * During step 2, the flow loop logic is done and the thread closes. */ -void TmThreadDisablePacketThreads(void) +void TmThreadDisablePacketThreads(const uint16_t set, const uint16_t check) { struct timeval start_ts; struct timeval cur_ts; @@ -1540,7 +1528,7 @@ again: /* loop through the packet threads and kill them */ SCMutexLock(&tv_root_lock); for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) { - TmThreadsSetFlag(tv, THV_KILL); + TmThreadsSetFlag(tv, set); /* separate worker threads (autofp) will still wait at their * input queues. So nudge them here so they will observe the @@ -1554,7 +1542,8 @@ again: SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id); } - while (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) { + /* wait for it to reach the expected state */ + while (!TmThreadsCheckFlag(tv, check)) { SCMutexUnlock(&tv_root_lock); SleepMsec(1); diff --git a/src/tm-threads.h b/src/tm-threads.h index d4c8e898a5..83bc826826 100644 --- a/src/tm-threads.h +++ b/src/tm-threads.h @@ -122,7 +122,7 @@ void TmThreadWaitForFlag(ThreadVars *, uint32_t); TmEcode TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *slot); -void TmThreadDisablePacketThreads(void); +void TmThreadDisablePacketThreads(const uint16_t set, const uint16_t check); void TmThreadDisableReceiveThreads(void); uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags);