flow/worker: process injected flows more gradually

Worker threads are responsible for final processing of timed out flows.
These are selected by the Flow Manager and inserted into a per thread
queue. The Flow Worker then checks this queue after each packet. Due to
the burstiness of this process, the packet threads would sometimes process
a lot of these flows in the context of a single packet, leading to spike
in latency which might cause packet loss.

This patch changes the behavior to only process at max 2 flows per packet.
This way added processing cost is amortized over many packets.
pull/7957/head
Victor Julien 3 years ago
parent ce1bdcb474
commit f837146321

@ -168,11 +168,12 @@ static int FlowFinish(ThreadVars *tv, Flow *f, FlowWorkerThreadData *fw, void *d
return 1;
}
/** \param[in] max_work Max flows to process. 0 if unlimited. */
static void CheckWorkQueue(ThreadVars *tv, FlowWorkerThreadData *fw,
void *detect_thread, // TODO proper type?
FlowTimeoutCounters *counters,
FlowQueuePrivate *fq)
FlowTimeoutCounters *counters, FlowQueuePrivate *fq, const uint32_t max_work)
{
uint32_t i = 0;
Flow *f;
while ((f = FlowQueuePrivateGetFromTop(fq)) != NULL) {
FLOWLOCK_WRLOCK(f);
@ -204,11 +205,15 @@ static void CheckWorkQueue(ThreadVars *tv, FlowWorkerThreadData *fw,
FlowClearMemory (f, f->protomap);
FLOWLOCK_UNLOCK(f);
if (fw->fls.spare_queue.len >= 200) { // TODO match to API? 200 = 2 * block size
FlowSparePoolReturnFlow(f);
} else {
FlowQueuePrivatePrependFlow(&fw->fls.spare_queue, f);
}
if (max_work != 0 && ++i == max_work)
break;
}
}
@ -459,9 +464,8 @@ static inline void FlowWorkerProcessInjectedFlows(ThreadVars *tv,
if (p->pkt_src == PKT_SRC_WIRE)
StatsSetUI64(tv, fw->cnt.flows_injected_max, (uint64_t)injected.len);
FlowTimeoutCounters counters = { 0, 0, };
CheckWorkQueue(tv, fw, detect_thread, &counters, &injected);
UpdateCounters(tv, fw, &counters);
/* move to local queue so we can process over the course of multiple packets */
FlowQueuePrivateAppendPrivate(&fw->fls.work_queue, &injected);
}
FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW_INJECTED);
}
@ -472,12 +476,16 @@ static inline void FlowWorkerProcessInjectedFlows(ThreadVars *tv,
static inline void FlowWorkerProcessLocalFlows(ThreadVars *tv,
FlowWorkerThreadData *fw, Packet *p, void *detect_thread)
{
uint32_t max_work = 2;
if (PKT_IS_PSEUDOPKT(p))
max_work = 0;
FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW_EVICTED);
if (fw->fls.work_queue.len) {
StatsAddUI64(tv, fw->cnt.flows_removed, (uint64_t)fw->fls.work_queue.len);
FlowTimeoutCounters counters = { 0, 0, };
CheckWorkQueue(tv, fw, detect_thread, &counters, &fw->fls.work_queue);
CheckWorkQueue(tv, fw, detect_thread, &counters, &fw->fls.work_queue, max_work);
UpdateCounters(tv, fw, &counters);
}
FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW_EVICTED);
@ -592,7 +600,7 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data)
housekeeping:
/* take injected flows and process them */
/* take injected flows and add them to our local queue */
FlowWorkerProcessInjectedFlows(tv, fw, p, detect_thread);
/* process local work queue */

Loading…
Cancel
Save