flow: improve performance in emergency mode

When the flow engine enters emergency mode, 3 things happen:

1. a different set of (lower) timeout values are applied
2. the flow manager runs more often
3. worker threads go get a flow directly from the hash table

Testing showed that performance went down significantly due to concurrency
issues:

1. worker threads would fight each other over the hash access
2. flow manager would get in the way of workers

This patch changes the behavior in 2 ways:

1. it makes the flow manager slightly less aggressive. It will still
   try to run ~3 times per second, but no longer 10 times.

   This should be reducing the contention. At the same time flows
   won't time out faster if they are checked many times per second.

2. The 'get a used flow' logic optimizes the use of atomics by only
   doing an atomic operation once, and while doing so reserving
   a slice of the hash per worker.

   The worker will also give up much quicker, to avoid the overhead
   of hash walking and taking and releasing locks.

These combined changes show much better 'under stress' behavior, esp
on multi-NUMA systems.
pull/5247/head
Victor Julien 5 years ago
parent 0da4dc0dea
commit 611c991f27

@ -861,6 +861,8 @@ Flow *FlowGetExistingFlowFromHash(FlowKey *key, const uint32_t hash)
return f;
}
#define FLOW_GET_NEW_TRIES 5
/** \internal
* \brief Get a flow from the hash directly.
*
@ -878,20 +880,17 @@ Flow *FlowGetExistingFlowFromHash(FlowKey *key, const uint32_t hash)
*/
static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv)
{
uint32_t idx = SC_ATOMIC_GET(flow_prune_idx) % flow_config.hash_size;
uint32_t idx = SC_ATOMIC_ADD(flow_prune_idx, FLOW_GET_NEW_TRIES) % flow_config.hash_size;
uint32_t cnt = flow_config.hash_size;
uint32_t tried = 0;
while (cnt--) {
tried++;
if (tried++ > FLOW_GET_NEW_TRIES)
break;
if (++idx >= flow_config.hash_size)
idx = 0;
if (tried >= 25) {
(void) SC_ATOMIC_ADD(flow_prune_idx, (flow_config.hash_size - cnt));
break;
}
FlowBucket *fb = &flow_hash[idx];
if (FBLOCK_TRYLOCK(fb) != 0)
@ -960,8 +959,6 @@ static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv)
FlowUpdateState(f, FLOW_STATE_NEW);
FLOWLOCK_UNLOCK(f);
(void) SC_ATOMIC_ADD(flow_prune_idx, (flow_config.hash_size - cnt));
return f;
}

@ -108,9 +108,9 @@ void FlowTimeoutsEmergency(void)
/* 1 seconds */
#define FLOW_NORMAL_MODE_UPDATE_DELAY_SEC 1
#define FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC 0
/* 0.1 seconds */
/* 0.3 seconds */
#define FLOW_EMERG_MODE_UPDATE_DELAY_SEC 0
#define FLOW_EMERG_MODE_UPDATE_DELAY_NSEC 100000
#define FLOW_EMERG_MODE_UPDATE_DELAY_NSEC 300000
#define NEW_FLOW_COUNT_COND 10
typedef struct FlowTimeoutCounters_ {

Loading…
Cancel
Save