diff --git a/src/flow-worker.c b/src/flow-worker.c index aad47df982..9ecfe65f29 100644 --- a/src/flow-worker.c +++ b/src/flow-worker.c @@ -660,11 +660,32 @@ static void FlowWorkerExitPrintStats(ThreadVars *tv, void *data) OutputLoggerExitPrintStats(tv, fw->output_thread); } +static bool FlowWorkerIsBusy(ThreadVars *tv, void *flow_worker) +{ + FlowWorkerThreadData *fw = flow_worker; + if (fw->pq.len) + return true; + if (fw->fls.work_queue.len) + return true; + + if (tv->flow_queue) { + FQLOCK_LOCK(tv->flow_queue); + bool fq_done = (tv->flow_queue->qlen == 0); + FQLOCK_UNLOCK(tv->flow_queue); + if (!fq_done) { + return true; + } + } + + return false; +} + void TmModuleFlowWorkerRegister (void) { tmm_modules[TMM_FLOWWORKER].name = "FlowWorker"; tmm_modules[TMM_FLOWWORKER].ThreadInit = FlowWorkerThreadInit; tmm_modules[TMM_FLOWWORKER].Func = FlowWorker; + tmm_modules[TMM_FLOWWORKER].ThreadBusy = FlowWorkerIsBusy; tmm_modules[TMM_FLOWWORKER].ThreadDeinit = FlowWorkerThreadDeinit; tmm_modules[TMM_FLOWWORKER].ThreadExitPrintStats = FlowWorkerExitPrintStats; tmm_modules[TMM_FLOWWORKER].cap_flags = 0; diff --git a/src/tm-modules.h b/src/tm-modules.h index 3e77db637f..4642ff46a6 100644 --- a/src/tm-modules.h +++ b/src/tm-modules.h @@ -56,6 +56,12 @@ typedef struct TmModule_ { /** terminates the capture loop in PktAcqLoop */ TmEcode (*PktAcqBreakLoop)(ThreadVars *, void *); + /** does a thread still have tasks to complete before it can be killed? + * \retval bool + * \param tv threadvars + * \param thread_data thread module thread data (e.g. FlowWorkerThreadData for FlowWorker) */ + bool (*ThreadBusy)(ThreadVars *tv, void *thread_data); + TmEcode (*Management)(ThreadVars *, void *); /** global Init/DeInit */ diff --git a/src/tm-threads.c b/src/tm-threads.c index c3f73d6666..99808ad2ca 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -1263,6 +1263,18 @@ static int TmThreadKillThread(ThreadVars *tv) return 1; } +static bool ThreadBusy(ThreadVars *tv) +{ + for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) { + TmModule *tm = TmModuleGetById(s->tm_id); + if (tm && tm->ThreadBusy != NULL) { + if (tm->ThreadBusy(tv, SC_ATOMIC_GET(s->slot_data))) + return true; + } + } + return false; +} + /** \internal * * \brief make sure that all packet threads are done processing their @@ -1298,28 +1310,23 @@ again: SleepMsec(1); goto again; } - if (tv->flow_queue) { - FQLOCK_LOCK(tv->flow_queue); - bool fq_done = (tv->flow_queue->qlen == 0); - FQLOCK_UNLOCK(tv->flow_queue); - if (!fq_done) { - SCMutexUnlock(&tv_root_lock); - - Packet *p = PacketGetFromAlloc(); - if (p != NULL) { - p->flags |= PKT_PSEUDO_STREAM_END; - PKT_SET_SRC(p, PKT_SRC_SHUTDOWN_FLUSH); - PacketQueue *q = tv->stream_pq; - SCMutexLock(&q->mutex_q); - PacketEnqueue(q, p); - SCCondSignal(&q->cond_q); - SCMutexUnlock(&q->mutex_q); - } + if (ThreadBusy(tv)) { + SCMutexUnlock(&tv_root_lock); - /* don't sleep while holding a lock */ - SleepMsec(1); - goto again; + Packet *p = PacketGetFromAlloc(); + if (p != NULL) { + p->flags |= PKT_PSEUDO_STREAM_END; + PKT_SET_SRC(p, PKT_SRC_SHUTDOWN_FLUSH); + PacketQueue *q = tv->stream_pq; + SCMutexLock(&q->mutex_q); + PacketEnqueue(q, p); + SCCondSignal(&q->cond_q); + SCMutexUnlock(&q->mutex_q); } + + /* don't sleep while holding a lock */ + SleepMsec(1); + goto again; } tv = tv->next; } @@ -1387,28 +1394,23 @@ again: goto again; } - if (tv->flow_queue) { - FQLOCK_LOCK(tv->flow_queue); - bool fq_done = (tv->flow_queue->qlen == 0); - FQLOCK_UNLOCK(tv->flow_queue); - if (!fq_done) { - SCMutexUnlock(&tv_root_lock); - - Packet *p = PacketGetFromAlloc(); - if (p != NULL) { - p->flags |= PKT_PSEUDO_STREAM_END; - PKT_SET_SRC(p, PKT_SRC_SHUTDOWN_FLUSH); - PacketQueue *q = tv->stream_pq; - SCMutexLock(&q->mutex_q); - PacketEnqueue(q, p); - SCCondSignal(&q->cond_q); - SCMutexUnlock(&q->mutex_q); - } + if (ThreadBusy(tv)) { + SCMutexUnlock(&tv_root_lock); - /* don't sleep while holding a lock */ - SleepMsec(1); - goto again; + Packet *p = PacketGetFromAlloc(); + if (p != NULL) { + p->flags |= PKT_PSEUDO_STREAM_END; + PKT_SET_SRC(p, PKT_SRC_SHUTDOWN_FLUSH); + PacketQueue *q = tv->stream_pq; + SCMutexLock(&q->mutex_q); + PacketEnqueue(q, p); + SCCondSignal(&q->cond_q); + SCMutexUnlock(&q->mutex_q); } + + /* don't sleep while holding a lock */ + SleepMsec(1); + goto again; } /* we found a receive TV. Send it a KILL_PKTACQ signal. */