detect: only breakloop threads that are lagging

Sleep after all threads have been checked.

Bug: #5969.
pull/8706/head
Victor Julien 3 years ago
parent 5e4cf182ab
commit 8a968faa04

@ -2112,40 +2112,6 @@ int DetectEngineInspectPktBufferGeneric(
}
}
/* nudge capture loops to wake up */
static void BreakCapture(void)
{
SCMutexLock(&tv_root_lock);
for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
if ((tv->tmm_flags & TM_FLAG_RECEIVE_TM) == 0) {
continue;
}
/* find the correct slot */
for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
if (suricata_ctl_flags != 0) {
SCMutexUnlock(&tv_root_lock);
return;
}
TmModule *tm = TmModuleGetById(s->tm_id);
if (!(tm->flags & TM_FLAG_RECEIVE_TM)) {
continue;
}
/* signal capture method that we need a packet. */
TmThreadsSetFlag(tv, THV_CAPTURE_INJECT_PKT);
/* if the method supports it, BreakLoop. Otherwise we rely on
* the capture method's recv timeout */
if (tm->PktAcqLoop && tm->PktAcqBreakLoop) {
tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(s->slot_data));
}
break;
}
}
SCMutexUnlock(&tv_root_lock);
}
/** \internal
* \brief inject a pseudo packet into each detect thread that doesn't use the
* new det_ctx yet
@ -2271,21 +2237,27 @@ static int DetectEngineReloadThreads(DetectEngineCtx *new_de_ctx)
InjectPackets(detect_tvs, new_det_ctx, no_of_detect_tvs);
/* loop waiting for detect threads to switch to the new det_ctx. Try to
* wake up capture if needed (break loop). */
uint32_t threads_done = 0;
retry:
for (i = 0; i < no_of_detect_tvs; i++) {
int break_out = 0;
if (suricata_ctl_flags != 0) {
threads_done = no_of_detect_tvs;
break;
}
usleep(1000);
while (SC_ATOMIC_GET(new_det_ctx[i]->so_far_used_by_detect) != 1) {
if (suricata_ctl_flags != 0) {
break_out = 1;
break;
}
BreakCapture();
usleep(1000);
if (SC_ATOMIC_GET(new_det_ctx[i]->so_far_used_by_detect) == 1) {
SCLogDebug("new_det_ctx - %p used by detect engine", new_det_ctx[i]);
threads_done++;
} else if (detect_tvs[i]->break_loop) {
TmThreadsCaptureBreakLoop(detect_tvs[i]);
}
if (break_out)
break;
SCLogDebug("new_det_ctx - %p used by detect engine", new_det_ctx[i]);
}
if (threads_done < no_of_detect_tvs) {
threads_done = 0;
SleepMsec(250);
goto retry;
}
/* this is to make sure that if someone initiated shutdown during a live

@ -2272,23 +2272,6 @@ uint16_t TmThreadsGetWorkerThreadMax(void)
return (uint16_t)thread_max;
}
static inline void ThreadBreakLoop(ThreadVars *tv)
{
if ((tv->tmm_flags & TM_FLAG_RECEIVE_TM) == 0) {
return;
}
/* find the correct slot */
TmSlot *s = tv->tm_slots;
TmModule *tm = TmModuleGetById(s->tm_id);
if (tm->flags & TM_FLAG_RECEIVE_TM) {
/* if the method supports it, BreakLoop. Otherwise we rely on
* the capture method's recv timeout */
if (tm->PktAcqLoop && tm->PktAcqBreakLoop) {
tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(s->slot_data));
}
}
}
/** \brief inject a flow into a threads flow queue
*/
void TmThreadsInjectFlowById(Flow *f, const int id)
@ -2308,6 +2291,6 @@ void TmThreadsInjectFlowById(Flow *f, const int id)
if (tv->inq != NULL) {
SCCondSignal(&tv->inq->pq->cond_q);
} else if (tv->break_loop) {
ThreadBreakLoop(tv);
TmThreadsCaptureBreakLoop(tv);
}
}

@ -253,6 +253,27 @@ static inline void TmThreadsCaptureHandleTimeout(ThreadVars *tv, Packet *p)
tv->tmqh_out(tv, p);
}
static inline void TmThreadsCaptureBreakLoop(ThreadVars *tv)
{
if (unlikely(!tv->break_loop))
return;
if ((tv->tmm_flags & TM_FLAG_RECEIVE_TM) == 0) {
return;
}
/* find the correct slot */
TmSlot *s = tv->tm_slots;
TmModule *tm = TmModuleGetById(s->tm_id);
if (tm->flags & TM_FLAG_RECEIVE_TM) {
/* if the method supports it, BreakLoop. Otherwise we rely on
* the capture method's recv timeout */
if (tm->PktAcqLoop && tm->PktAcqBreakLoop) {
tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(s->slot_data));
}
TmThreadsSetFlag(tv, THV_CAPTURE_INJECT_PKT);
}
}
void TmThreadsListThreads(void);
int TmThreadsRegisterThread(ThreadVars *tv, const int type);
void TmThreadsUnregisterThread(const int id);

Loading…
Cancel
Save