detect: only breakloop threads that are lagging

Sleep after all threads have been checked.

Bug: #5969.
(cherry picked from commit 8a968faa04)
pull/8721/head
Victor Julien 2 years ago
parent 6658300c84
commit 6d8b50b748

@ -1791,40 +1791,6 @@ int DetectEngineInspectPktBufferGeneric(
}
}
/* nudge capture loops to wake up */
static void BreakCapture(void)
{
SCMutexLock(&tv_root_lock);
for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
if ((tv->tmm_flags & TM_FLAG_RECEIVE_TM) == 0) {
continue;
}
/* find the correct slot */
for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
if (suricata_ctl_flags != 0) {
SCMutexUnlock(&tv_root_lock);
return;
}
TmModule *tm = TmModuleGetById(s->tm_id);
if (!(tm->flags & TM_FLAG_RECEIVE_TM)) {
continue;
}
/* signal capture method that we need a packet. */
TmThreadsSetFlag(tv, THV_CAPTURE_INJECT_PKT);
/* if the method supports it, BreakLoop. Otherwise we rely on
* the capture method's recv timeout */
if (tm->PktAcqLoop && tm->PktAcqBreakLoop) {
tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(s->slot_data));
}
break;
}
}
SCMutexUnlock(&tv_root_lock);
}
/** \internal
* \brief inject a pseudo packet into each detect thread that doesn't use the
* new det_ctx yet
@ -1950,21 +1916,27 @@ static int DetectEngineReloadThreads(DetectEngineCtx *new_de_ctx)
InjectPackets(detect_tvs, new_det_ctx, no_of_detect_tvs);
/* loop waiting for detect threads to switch to the new det_ctx. Try to
* wake up capture if needed (break loop). */
uint32_t threads_done = 0;
retry:
for (i = 0; i < no_of_detect_tvs; i++) {
int break_out = 0;
if (suricata_ctl_flags != 0) {
threads_done = no_of_detect_tvs;
break;
}
usleep(1000);
while (SC_ATOMIC_GET(new_det_ctx[i]->so_far_used_by_detect) != 1) {
if (suricata_ctl_flags != 0) {
break_out = 1;
break;
}
BreakCapture();
usleep(1000);
if (SC_ATOMIC_GET(new_det_ctx[i]->so_far_used_by_detect) == 1) {
SCLogDebug("new_det_ctx - %p used by detect engine", new_det_ctx[i]);
threads_done++;
} else if (detect_tvs[i]->break_loop) {
TmThreadsCaptureBreakLoop(detect_tvs[i]);
}
if (break_out)
break;
SCLogDebug("new_det_ctx - %p used by detect engine", new_det_ctx[i]);
}
if (threads_done < no_of_detect_tvs) {
threads_done = 0;
SleepMsec(250);
goto retry;
}
/* this is to make sure that if someone initiated shutdown during a live

@ -2307,23 +2307,6 @@ uint16_t TmThreadsGetWorkerThreadMax(void)
return thread_max;
}
static inline void ThreadBreakLoop(ThreadVars *tv)
{
if ((tv->tmm_flags & TM_FLAG_RECEIVE_TM) == 0) {
return;
}
/* find the correct slot */
TmSlot *s = tv->tm_slots;
TmModule *tm = TmModuleGetById(s->tm_id);
if (tm->flags & TM_FLAG_RECEIVE_TM) {
/* if the method supports it, BreakLoop. Otherwise we rely on
* the capture method's recv timeout */
if (tm->PktAcqLoop && tm->PktAcqBreakLoop) {
tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(s->slot_data));
}
}
}
/**
* \retval r 1 if packet was accepted, 0 otherwise
* \note if packet was not accepted, it's still the responsibility
@ -2353,7 +2336,7 @@ int TmThreadsInjectPacketsById(Packet **packets, const int id)
if (tv->inq != NULL) {
SCCondSignal(&tv->inq->pq->cond_q);
} else if (tv->break_loop) {
ThreadBreakLoop(tv);
TmThreadsCaptureBreakLoop(tv);
}
return 1;
}
@ -2377,6 +2360,6 @@ void TmThreadsInjectFlowById(Flow *f, const int id)
if (tv->inq != NULL) {
SCCondSignal(&tv->inq->pq->cond_q);
} else if (tv->break_loop) {
ThreadBreakLoop(tv);
TmThreadsCaptureBreakLoop(tv);
}
}

@ -254,6 +254,27 @@ static inline void TmThreadsCaptureHandleTimeout(ThreadVars *tv, Packet *p)
tv->tmqh_out(tv, p);
}
static inline void TmThreadsCaptureBreakLoop(ThreadVars *tv)
{
if (unlikely(!tv->break_loop))
return;
if ((tv->tmm_flags & TM_FLAG_RECEIVE_TM) == 0) {
return;
}
/* find the correct slot */
TmSlot *s = tv->tm_slots;
TmModule *tm = TmModuleGetById(s->tm_id);
if (tm->flags & TM_FLAG_RECEIVE_TM) {
/* if the method supports it, BreakLoop. Otherwise we rely on
* the capture method's recv timeout */
if (tm->PktAcqLoop && tm->PktAcqBreakLoop) {
tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(s->slot_data));
}
TmThreadsSetFlag(tv, THV_CAPTURE_INJECT_PKT);
}
}
void TmThreadsListThreads(void);
int TmThreadsRegisterThread(ThreadVars *tv, const int type);
void TmThreadsUnregisterThread(const int id);

Loading…
Cancel
Save