From f837146321fce199f7c90e30485da9ac2fdc5080 Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Mon, 26 Sep 2022 09:54:37 +0200 Subject: [PATCH] flow/worker: process injected flows more gradually Worker threads are responsible for final processing of timed out flows. These are selected by the Flow Manager and inserted into a per thread queue. The Flow Worker then checks this queue after each packet. Due to the burstiness of this process, the packet threads would sometimes process a lot of these flows in the context of a single packet, leading to spike in latency which might cause packet loss. This patch changes the behavior to only process at max 2 flows per packet. This way added processing cost is amortized over many packets. --- src/flow-worker.c | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/src/flow-worker.c b/src/flow-worker.c index 5a4bc81000..a14b4c3523 100644 --- a/src/flow-worker.c +++ b/src/flow-worker.c @@ -168,11 +168,12 @@ static int FlowFinish(ThreadVars *tv, Flow *f, FlowWorkerThreadData *fw, void *d return 1; } +/** \param[in] max_work Max flows to process. 0 if unlimited. */ static void CheckWorkQueue(ThreadVars *tv, FlowWorkerThreadData *fw, void *detect_thread, // TODO proper type? - FlowTimeoutCounters *counters, - FlowQueuePrivate *fq) + FlowTimeoutCounters *counters, FlowQueuePrivate *fq, const uint32_t max_work) { + uint32_t i = 0; Flow *f; while ((f = FlowQueuePrivateGetFromTop(fq)) != NULL) { FLOWLOCK_WRLOCK(f); @@ -204,11 +205,15 @@ static void CheckWorkQueue(ThreadVars *tv, FlowWorkerThreadData *fw, FlowClearMemory (f, f->protomap); FLOWLOCK_UNLOCK(f); + if (fw->fls.spare_queue.len >= 200) { // TODO match to API? 200 = 2 * block size FlowSparePoolReturnFlow(f); } else { FlowQueuePrivatePrependFlow(&fw->fls.spare_queue, f); } + + if (max_work != 0 && ++i == max_work) + break; } } @@ -459,9 +464,8 @@ static inline void FlowWorkerProcessInjectedFlows(ThreadVars *tv, if (p->pkt_src == PKT_SRC_WIRE) StatsSetUI64(tv, fw->cnt.flows_injected_max, (uint64_t)injected.len); - FlowTimeoutCounters counters = { 0, 0, }; - CheckWorkQueue(tv, fw, detect_thread, &counters, &injected); - UpdateCounters(tv, fw, &counters); + /* move to local queue so we can process over the course of multiple packets */ + FlowQueuePrivateAppendPrivate(&fw->fls.work_queue, &injected); } FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW_INJECTED); } @@ -472,12 +476,16 @@ static inline void FlowWorkerProcessInjectedFlows(ThreadVars *tv, static inline void FlowWorkerProcessLocalFlows(ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p, void *detect_thread) { + uint32_t max_work = 2; + if (PKT_IS_PSEUDOPKT(p)) + max_work = 0; + FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW_EVICTED); if (fw->fls.work_queue.len) { StatsAddUI64(tv, fw->cnt.flows_removed, (uint64_t)fw->fls.work_queue.len); FlowTimeoutCounters counters = { 0, 0, }; - CheckWorkQueue(tv, fw, detect_thread, &counters, &fw->fls.work_queue); + CheckWorkQueue(tv, fw, detect_thread, &counters, &fw->fls.work_queue, max_work); UpdateCounters(tv, fw, &counters); } FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW_EVICTED); @@ -592,7 +600,7 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data) housekeeping: - /* take injected flows and process them */ + /* take injected flows and add them to our local queue */ FlowWorkerProcessInjectedFlows(tv, fw, p, detect_thread); /* process local work queue */