flow/manager: adaptive hash eviction timing

The flow manager scans the hash table in chunks based on the flow timeout
settings. In the default config this will lead to a full hash pass every
240 seconds. Under pressure, this will lead to a large amount of memory
still in use by flows waiting to be evicted, or evicted flows waiting to
be freed.

This patch implements a new adaptive logic to the timing and amount of
work that is done by the flow manager. It takes the memcap budgets and
calculates the proportion of the memcap budgets in use. It takes the max
in-use percentage, and adapts the flow manager behavior based on that.

The memcaps considered are:
    flow, stream, stream-reassembly and app-layer-http

The percentage in use, is inversely applies to the time the flow manager
takes for a full hash pass. In addition, it is also applied to the chunk
size and the sleep time.

Example: tcp.reassembly_memuse is at 90% of the memcap and normal flow
hash pass is 240s. Hash pass time will be:

    240 * (100 - 90) / 100 = 24s

Chunk size and sleep time will automatically be updated for this.

Adds various counters.

Bug: #4650.
Bug: #4808.
pull/7533/head
Victor Julien 4 years ago committed by Victor Julien
parent f50af12068
commit e9d2417e0f

@ -71,6 +71,8 @@
#include "output-flow.h" #include "output-flow.h"
#include "util-validate.h" #include "util-validate.h"
#include "runmode-unix-socket.h"
/* Run mode selected at suricata.c */ /* Run mode selected at suricata.c */
extern int run_mode; extern int run_mode;
@ -481,22 +483,34 @@ static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td,
return cnt; return cnt;
} }
static uint32_t FlowTimeoutHashInChunks(FlowManagerTimeoutThread *td, /** \internal
struct timeval *ts, * \brief handle timeout for a slice of hash rows
const uint32_t hash_min, const uint32_t hash_max, * If we wrap around we call FlowTimeoutHash twice */
FlowTimeoutCounters *counters, uint32_t iter, const uint32_t chunks) static uint32_t FlowTimeoutHashInChunks(FlowManagerTimeoutThread *td, struct timeval *ts,
const uint32_t hash_min, const uint32_t hash_max, FlowTimeoutCounters *counters,
const uint32_t rows, uint32_t *pos)
{ {
const uint32_t rows = hash_max - hash_min; uint32_t start = 0;
const uint32_t chunk_size = rows / chunks; uint32_t end = 0;
uint32_t cnt = 0;
const uint32_t min = iter * chunk_size + hash_min; uint32_t rows_left = rows;
uint32_t max = min + chunk_size;
/* we start at beginning of hash at next iteration so let's check again:
* hash till the end */ start = hash_min + (*pos);
if (iter + 1 == chunks) { if (start >= hash_max) {
max = hash_max; start = hash_min;
}
end = start + rows_left;
if (end > hash_max) {
end = hash_max;
}
*pos = (end == hash_max) ? hash_min : end;
rows_left = rows_left - (end - start);
cnt += FlowTimeoutHash(td, ts, start, end, counters);
if (rows_left) {
goto again;
} }
const uint32_t cnt = FlowTimeoutHash(td, ts, min, max, counters);
return cnt; return cnt;
} }
@ -614,6 +628,9 @@ typedef struct FlowCounters_ {
uint16_t flow_bypassed_cnt_clo; uint16_t flow_bypassed_cnt_clo;
uint16_t flow_bypassed_pkts; uint16_t flow_bypassed_pkts;
uint16_t flow_bypassed_bytes; uint16_t flow_bypassed_bytes;
uint16_t memcap_pressure;
uint16_t memcap_pressure_max;
} FlowCounters; } FlowCounters;
typedef struct FlowManagerThreadData_ { typedef struct FlowManagerThreadData_ {
@ -648,6 +665,9 @@ static void FlowCountersInit(ThreadVars *t, FlowCounters *fc)
fc->flow_bypassed_cnt_clo = StatsRegisterCounter("flow_bypassed.closed", t); fc->flow_bypassed_cnt_clo = StatsRegisterCounter("flow_bypassed.closed", t);
fc->flow_bypassed_pkts = StatsRegisterCounter("flow_bypassed.pkts", t); fc->flow_bypassed_pkts = StatsRegisterCounter("flow_bypassed.pkts", t);
fc->flow_bypassed_bytes = StatsRegisterCounter("flow_bypassed.bytes", t); fc->flow_bypassed_bytes = StatsRegisterCounter("flow_bypassed.bytes", t);
fc->memcap_pressure = StatsRegisterCounter("memcap_pressure", t);
fc->memcap_pressure_max = StatsRegisterMaxCounter("memcap_pressure_max", t);
} }
static TmEcode FlowManagerThreadInit(ThreadVars *t, const void *initdata, void **data) static TmEcode FlowManagerThreadInit(ThreadVars *t, const void *initdata, void **data)
@ -710,6 +730,42 @@ static uint32_t FlowTimeoutsMin(void)
//#define FM_PROFILE //#define FM_PROFILE
static void GetWorkUnitSizing(const uint32_t pass_in_sec, const uint32_t rows, const uint32_t mp,
const bool emergency, uint64_t *wu_sleep, uint32_t *wu_rows)
{
if (emergency) {
*wu_rows = rows;
*wu_sleep = 250;
return;
}
uint32_t full_pass_in_ms = pass_in_sec * 1000;
float perc = MIN((((float)(100 - mp) / (float)100)), 1);
full_pass_in_ms *= perc;
full_pass_in_ms = MAX(full_pass_in_ms, 333);
uint32_t work_unit_ms = 999 * perc;
work_unit_ms = MAX(work_unit_ms, 250);
uint32_t wus_per_full_pass = full_pass_in_ms / work_unit_ms;
uint32_t rows_per_wu = MAX(1, rows / wus_per_full_pass);
uint32_t rows_process_cost = rows_per_wu / 1000; // est 1usec per row
int32_t sleep_per_wu = work_unit_ms - rows_process_cost;
sleep_per_wu = MAX(sleep_per_wu, 10);
#if 0
float passes_sec = 1000.0/(float)full_pass_in_ms;
SCLogNotice("full_pass_in_ms %u perc %f rows %u "
"wus_per_full_pass %u rows_per_wu %u work_unit_ms %u sleep_per_wu %u => passes/s %f rows/s %u",
full_pass_in_ms, perc, rows,
wus_per_full_pass, rows_per_wu, work_unit_ms, (uint32_t)sleep_per_wu,
passes_sec, (uint32_t)((float)rows * passes_sec));
#endif
*wu_sleep = sleep_per_wu;
*wu_rows = rows_per_wu;
}
/** \brief Thread that manages the flow table and times out flows. /** \brief Thread that manages the flow table and times out flows.
* *
* \param td ThreadVars casted to void ptr * \param td ThreadVars casted to void ptr
@ -724,7 +780,6 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
bool emerg = false; bool emerg = false;
bool prev_emerg = false; bool prev_emerg = false;
uint32_t other_last_sec = 0; /**< last sec stamp when defrag etc ran */ uint32_t other_last_sec = 0; /**< last sec stamp when defrag etc ran */
uint32_t flow_last_sec = 0;
/* VJ leaving disabled for now, as hosts are only used by tags and the numbers /* VJ leaving disabled for now, as hosts are only used by tags and the numbers
* are really low. Might confuse ppl * are really low. Might confuse ppl
uint16_t flow_mgr_host_prune = StatsRegisterCounter("hosts.pruned", th_v); uint16_t flow_mgr_host_prune = StatsRegisterCounter("hosts.pruned", th_v);
@ -732,12 +787,6 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
uint16_t flow_mgr_host_spare = StatsRegisterCounter("hosts.spare", th_v); uint16_t flow_mgr_host_spare = StatsRegisterCounter("hosts.spare", th_v);
*/ */
memset(&ts, 0, sizeof(ts)); memset(&ts, 0, sizeof(ts));
uint32_t hash_passes = 0;
#ifdef FM_PROFILE
uint32_t hash_row_checks = 0;
uint32_t hash_passes_chunks = 0;
#endif
uint32_t hash_full_passes = 0;
const uint32_t min_timeout = FlowTimeoutsMin(); const uint32_t min_timeout = FlowTimeoutsMin();
const uint32_t pass_in_sec = min_timeout ? min_timeout * 8 : 60; const uint32_t pass_in_sec = min_timeout ? min_timeout * 8 : 60;
@ -766,9 +815,19 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
memset(&startts, 0, sizeof(startts)); memset(&startts, 0, sizeof(startts));
gettimeofday(&startts, NULL); gettimeofday(&startts, NULL);
uint32_t hash_pass_iter = 0;
uint32_t emerg_over_cnt = 0; uint32_t emerg_over_cnt = 0;
uint64_t next_run_ms = 0; uint64_t next_run_ms = 0;
const uint32_t rows = ftd->max - ftd->min;
uint32_t pos = 0;
uint32_t rows_per_wu = 0;
uint64_t sleep_per_wu = 0;
uint32_t mp = MemcapsGetPressure() * 100;
if (ftd->instance == 0) {
StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp);
StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp);
}
GetWorkUnitSizing(pass_in_sec, rows, mp, emerg, &sleep_per_wu, &rows_per_wu);
while (1) while (1)
{ {
@ -805,7 +864,6 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
TimeGet(&ts); TimeGet(&ts);
SCLogDebug("ts %" PRIdMAX "", (intmax_t)ts.tv_sec); SCLogDebug("ts %" PRIdMAX "", (intmax_t)ts.tv_sec);
const uint64_t ts_ms = ts.tv_sec * 1000 + ts.tv_usec / 1000; const uint64_t ts_ms = ts.tv_sec * 1000 + ts.tv_usec / 1000;
const uint32_t rt = (uint32_t)ts.tv_sec;
const bool emerge_p = (emerg && !prev_emerg); const bool emerge_p = (emerg && !prev_emerg);
if (emerge_p) { if (emerge_p) {
next_run_ms = 0; next_run_ms = 0;
@ -822,7 +880,6 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
FlowSparePoolUpdate(sq_len); FlowSparePoolUpdate(sq_len);
} }
} }
const uint32_t secs_passed = rt - flow_last_sec;
/* try to time out flows */ /* try to time out flows */
FlowTimeoutCounters counters = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; FlowTimeoutCounters counters = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
@ -830,34 +887,18 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
if (emerg) { if (emerg) {
/* in emergency mode, do a full pass of the hash table */ /* in emergency mode, do a full pass of the hash table */
FlowTimeoutHash(&ftd->timeout, &ts, ftd->min, ftd->max, &counters); FlowTimeoutHash(&ftd->timeout, &ts, ftd->min, ftd->max, &counters);
hash_passes++;
hash_full_passes++;
hash_passes++;
#ifdef FM_PROFILE
hash_passes_chunks += 1;
hash_row_checks += counters.rows_checked;
#endif
StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass); StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);
} else { } else {
/* non-emergency mode: scan part of the hash */ SCLogDebug("hash %u:%u slice starting at %u with %u rows", ftd->min, ftd->max, pos,
const uint32_t chunks = MIN(secs_passed, pass_in_sec); rows_per_wu);
for (uint32_t i = 0; i < chunks; i++) {
FlowTimeoutHashInChunks(&ftd->timeout, &ts, ftd->min, ftd->max, const uint32_t ppos = pos;
&counters, hash_pass_iter, pass_in_sec); FlowTimeoutHashInChunks(
hash_pass_iter++; &ftd->timeout, &ts, ftd->min, ftd->max, &counters, rows_per_wu, &pos);
if (hash_pass_iter == pass_in_sec) { if (ppos > pos) {
hash_pass_iter = 0; StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);
hash_full_passes++;
StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);
}
} }
hash_passes++;
#ifdef FM_PROFILE
hash_row_checks += counters.rows_checked;
hash_passes_chunks += chunks;
#endif
} }
flow_last_sec = rt;
/* /*
StatsAddUI64(th_v, flow_mgr_host_prune, (uint64_t)hosts_pruned); StatsAddUI64(th_v, flow_mgr_host_prune, (uint64_t)hosts_pruned);
@ -891,54 +932,58 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
* clear emergency bit if we have at least xx flows pruned. */ * clear emergency bit if we have at least xx flows pruned. */
uint32_t len = FlowSpareGetPoolSize(); uint32_t len = FlowSpareGetPoolSize();
StatsSetUI64(th_v, ftd->cnt.flow_mgr_spare, (uint64_t)len); StatsSetUI64(th_v, ftd->cnt.flow_mgr_spare, (uint64_t)len);
if (emerg == true) { if (emerg == true) {
SCLogDebug("flow_sparse_q.len = %"PRIu32" prealloc: %"PRIu32 SCLogDebug("flow_sparse_q.len = %"PRIu32" prealloc: %"PRIu32
"flow_spare_q status: %"PRIu32"%% flows at the queue", "flow_spare_q status: %"PRIu32"%% flows at the queue",
len, flow_config.prealloc, len * 100 / flow_config.prealloc); len, flow_config.prealloc, len * 100 / flow_config.prealloc);
/* only if we have pruned this "emergency_recovery" percentage /* only if we have pruned this "emergency_recovery" percentage
* of flows, we will unset the emergency bit */ * of flows, we will unset the emergency bit */
if (len * 100 / flow_config.prealloc > flow_config.emergency_recovery) { if (len * 100 / flow_config.prealloc > flow_config.emergency_recovery) {
emerg_over_cnt++; emerg_over_cnt++;
} else { } else {
emerg_over_cnt = 0; emerg_over_cnt = 0;
}
if (emerg_over_cnt >= 30) {
SC_ATOMIC_AND(flow_flags, ~FLOW_EMERGENCY);
FlowTimeoutsReset();
emerg = false;
prev_emerg = false;
emerg_over_cnt = 0;
SCLogNotice("Flow emergency mode over, back to normal... unsetting"
" FLOW_EMERGENCY bit (ts.tv_sec: %" PRIuMAX ", "
"ts.tv_usec:%" PRIuMAX ") flow_spare_q status(): %" PRIu32
"%% flows at the queue",
(uintmax_t)ts.tv_sec, (uintmax_t)ts.tv_usec,
len * 100 / flow_config.prealloc);
StatsIncr(th_v, ftd->cnt.flow_emerg_mode_over);
}
} }
if (emerg_over_cnt >= 30) { /* update work units */
SC_ATOMIC_AND(flow_flags, ~FLOW_EMERGENCY); mp = MemcapsGetPressure() * 100;
FlowTimeoutsReset(); if (ftd->instance == 0) {
StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp);
emerg = false; StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp);
prev_emerg = false;
emerg_over_cnt = 0;
hash_pass_iter = 0;
SCLogNotice("Flow emergency mode over, back to normal... unsetting"
" FLOW_EMERGENCY bit (ts.tv_sec: %"PRIuMAX", "
"ts.tv_usec:%"PRIuMAX") flow_spare_q status(): %"PRIu32
"%% flows at the queue", (uintmax_t)ts.tv_sec,
(uintmax_t)ts.tv_usec, len * 100 / flow_config.prealloc);
StatsIncr(th_v, ftd->cnt.flow_emerg_mode_over);
} }
} GetWorkUnitSizing(pass_in_sec, rows, mp, emerg, &sleep_per_wu, &rows_per_wu);
next_run_ms = ts_ms + 667;
if (emerg)
next_run_ms = ts_ms + 250;
}
if (flow_last_sec == 0) {
flow_last_sec = rt;
}
if (ftd->instance == 0 && next_run_ms = ts_ms + sleep_per_wu;
(other_last_sec == 0 || other_last_sec < (uint32_t)ts.tv_sec)) { }
DefragTimeoutHash(&ts); if (other_last_sec == 0 || other_last_sec < (uint32_t)ts.tv_sec) {
//uint32_t hosts_pruned = if (ftd->instance == 0) {
HostTimeoutHash(&ts); DefragTimeoutHash(&ts);
IPPairTimeoutHash(&ts); // uint32_t hosts_pruned =
HttpRangeContainersTimeoutHash(&ts); HostTimeoutHash(&ts);
other_last_sec = (uint32_t)ts.tv_sec; IPPairTimeoutHash(&ts);
HttpRangeContainersTimeoutHash(&ts);
other_last_sec = (uint32_t)ts.tv_sec;
}
} }
#ifdef FM_PROFILE #ifdef FM_PROFILE
struct timeval run_endts; struct timeval run_endts;
@ -981,11 +1026,6 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
established_cnt, closing_cnt); established_cnt, closing_cnt);
#ifdef FM_PROFILE #ifdef FM_PROFILE
SCLogNotice("hash passes %u avg chunks %u full %u rows %u (rows/s %u)",
hash_passes, hash_passes_chunks / (hash_passes ? hash_passes : 1),
hash_full_passes, hash_row_checks,
hash_row_checks / ((uint32_t)active.tv_sec?(uint32_t)active.tv_sec:1));
gettimeofday(&endts, NULL); gettimeofday(&endts, NULL);
struct timeval total_run_time; struct timeval total_run_time;
timersub(&endts, &startts, &total_run_time); timersub(&endts, &startts, &total_run_time);

@ -82,8 +82,6 @@ const char *RunModeUnixSocketGetDefaultMode(void)
return "autofp"; return "autofp";
} }
#ifdef BUILD_UNIX_SOCKET
#define MEMCAPS_MAX 7 #define MEMCAPS_MAX 7
static MemcapCommand memcaps[MEMCAPS_MAX] = { static MemcapCommand memcaps[MEMCAPS_MAX] = {
{ {
@ -130,6 +128,24 @@ static MemcapCommand memcaps[MEMCAPS_MAX] = {
}, },
}; };
float MemcapsGetPressure(void)
{
float percent = 0.0;
for (int i = 0; i < 4; i++) { // only flow, streams, http
uint64_t memcap = memcaps[i].GetFunc();
if (memcap) {
uint64_t memuse = memcaps[i].GetMemuseFunc();
float p = (float)((double)memuse / (double)memcap);
// SCLogNotice("%s: memuse %"PRIu64", memcap %"PRIu64" => %f%%",
// memcaps[i].name, memuse, memcap, (p * 100));
percent = MAX(p, percent);
}
}
return percent;
}
#ifdef BUILD_UNIX_SOCKET
static int RunModeUnixSocketMaster(void); static int RunModeUnixSocketMaster(void);
static int unix_manager_pcap_task_running = 0; static int unix_manager_pcap_task_running = 0;
static int unix_manager_pcap_task_failed = 0; static int unix_manager_pcap_task_failed = 0;

@ -30,6 +30,8 @@ int RunModeUnixSocketIsActive(void);
TmEcode UnixSocketPcapFile(TmEcode tm, struct timespec *last_processed); TmEcode UnixSocketPcapFile(TmEcode tm, struct timespec *last_processed);
float MemcapsGetPressure(void);
#ifdef BUILD_UNIX_SOCKET #ifdef BUILD_UNIX_SOCKET
TmEcode UnixSocketDatasetAdd(json_t *cmd, json_t* answer, void *data); TmEcode UnixSocketDatasetAdd(json_t *cmd, json_t* answer, void *data);
TmEcode UnixSocketDatasetRemove(json_t *cmd, json_t* answer, void *data); TmEcode UnixSocketDatasetRemove(json_t *cmd, json_t* answer, void *data);

Loading…
Cancel
Save