threading: wait for flow housekeeping at shutdown

Flow house keeping can accumulate work that wasn't taken into account
during shutdown. This could lead to flows still in the flowworker
thread context when being it was freed, leading to missed work and
memory leaks.

This patch adds a new way of checking if a thread module is still
busy.

Bug: #6062.
pull/8858/head
Victor Julien 2 years ago
parent d333dffdcb
commit fa3f16ec75

@ -660,11 +660,32 @@ static void FlowWorkerExitPrintStats(ThreadVars *tv, void *data)
OutputLoggerExitPrintStats(tv, fw->output_thread);
}
static bool FlowWorkerIsBusy(ThreadVars *tv, void *flow_worker)
{
FlowWorkerThreadData *fw = flow_worker;
if (fw->pq.len)
return true;
if (fw->fls.work_queue.len)
return true;
if (tv->flow_queue) {
FQLOCK_LOCK(tv->flow_queue);
bool fq_done = (tv->flow_queue->qlen == 0);
FQLOCK_UNLOCK(tv->flow_queue);
if (!fq_done) {
return true;
}
}
return false;
}
void TmModuleFlowWorkerRegister (void)
{
tmm_modules[TMM_FLOWWORKER].name = "FlowWorker";
tmm_modules[TMM_FLOWWORKER].ThreadInit = FlowWorkerThreadInit;
tmm_modules[TMM_FLOWWORKER].Func = FlowWorker;
tmm_modules[TMM_FLOWWORKER].ThreadBusy = FlowWorkerIsBusy;
tmm_modules[TMM_FLOWWORKER].ThreadDeinit = FlowWorkerThreadDeinit;
tmm_modules[TMM_FLOWWORKER].ThreadExitPrintStats = FlowWorkerExitPrintStats;
tmm_modules[TMM_FLOWWORKER].cap_flags = 0;

@ -56,6 +56,12 @@ typedef struct TmModule_ {
/** terminates the capture loop in PktAcqLoop */
TmEcode (*PktAcqBreakLoop)(ThreadVars *, void *);
/** does a thread still have tasks to complete before it can be killed?
* \retval bool
* \param tv threadvars
* \param thread_data thread module thread data (e.g. FlowWorkerThreadData for FlowWorker) */
bool (*ThreadBusy)(ThreadVars *tv, void *thread_data);
TmEcode (*Management)(ThreadVars *, void *);
/** global Init/DeInit */

@ -1263,6 +1263,18 @@ static int TmThreadKillThread(ThreadVars *tv)
return 1;
}
static bool ThreadBusy(ThreadVars *tv)
{
for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
TmModule *tm = TmModuleGetById(s->tm_id);
if (tm && tm->ThreadBusy != NULL) {
if (tm->ThreadBusy(tv, SC_ATOMIC_GET(s->slot_data)))
return true;
}
}
return false;
}
/** \internal
*
* \brief make sure that all packet threads are done processing their
@ -1298,28 +1310,23 @@ again:
SleepMsec(1);
goto again;
}
if (tv->flow_queue) {
FQLOCK_LOCK(tv->flow_queue);
bool fq_done = (tv->flow_queue->qlen == 0);
FQLOCK_UNLOCK(tv->flow_queue);
if (!fq_done) {
SCMutexUnlock(&tv_root_lock);
Packet *p = PacketGetFromAlloc();
if (p != NULL) {
p->flags |= PKT_PSEUDO_STREAM_END;
PKT_SET_SRC(p, PKT_SRC_SHUTDOWN_FLUSH);
PacketQueue *q = tv->stream_pq;
SCMutexLock(&q->mutex_q);
PacketEnqueue(q, p);
SCCondSignal(&q->cond_q);
SCMutexUnlock(&q->mutex_q);
}
if (ThreadBusy(tv)) {
SCMutexUnlock(&tv_root_lock);
/* don't sleep while holding a lock */
SleepMsec(1);
goto again;
Packet *p = PacketGetFromAlloc();
if (p != NULL) {
p->flags |= PKT_PSEUDO_STREAM_END;
PKT_SET_SRC(p, PKT_SRC_SHUTDOWN_FLUSH);
PacketQueue *q = tv->stream_pq;
SCMutexLock(&q->mutex_q);
PacketEnqueue(q, p);
SCCondSignal(&q->cond_q);
SCMutexUnlock(&q->mutex_q);
}
/* don't sleep while holding a lock */
SleepMsec(1);
goto again;
}
tv = tv->next;
}
@ -1387,28 +1394,23 @@ again:
goto again;
}
if (tv->flow_queue) {
FQLOCK_LOCK(tv->flow_queue);
bool fq_done = (tv->flow_queue->qlen == 0);
FQLOCK_UNLOCK(tv->flow_queue);
if (!fq_done) {
SCMutexUnlock(&tv_root_lock);
Packet *p = PacketGetFromAlloc();
if (p != NULL) {
p->flags |= PKT_PSEUDO_STREAM_END;
PKT_SET_SRC(p, PKT_SRC_SHUTDOWN_FLUSH);
PacketQueue *q = tv->stream_pq;
SCMutexLock(&q->mutex_q);
PacketEnqueue(q, p);
SCCondSignal(&q->cond_q);
SCMutexUnlock(&q->mutex_q);
}
if (ThreadBusy(tv)) {
SCMutexUnlock(&tv_root_lock);
/* don't sleep while holding a lock */
SleepMsec(1);
goto again;
Packet *p = PacketGetFromAlloc();
if (p != NULL) {
p->flags |= PKT_PSEUDO_STREAM_END;
PKT_SET_SRC(p, PKT_SRC_SHUTDOWN_FLUSH);
PacketQueue *q = tv->stream_pq;
SCMutexLock(&q->mutex_q);
PacketEnqueue(q, p);
SCCondSignal(&q->cond_q);
SCMutexUnlock(&q->mutex_q);
}
/* don't sleep while holding a lock */
SleepMsec(1);
goto again;
}
/* we found a receive TV. Send it a KILL_PKTACQ signal. */

Loading…
Cancel
Save