threading: refactor CPU affinity code

Split the code into multiple functions for easier readability.
pull/13397/head
Lukas Sismis 11 months ago committed by Victor Julien
parent 35c86ce800
commit fbfced12f2

@ -82,7 +82,7 @@ static void AffinitySetupInit(void)
int i, j; int i, j;
int ncpu = UtilCpuGetNumProcessorsConfigured(); int ncpu = UtilCpuGetNumProcessorsConfigured();
SCLogDebug("Initialize affinity setup\n"); SCLogDebug("Initialize CPU affinity setup");
/* be conservative relatively to OS: use all cpus by default */ /* be conservative relatively to OS: use all cpus by default */
for (i = 0; i < MAX_CPU_SET; i++) { for (i = 0; i < MAX_CPU_SET; i++) {
cpu_set_t *cs = &thread_affinity[i].cpu_set; cpu_set_t *cs = &thread_affinity[i].cpu_set;
@ -138,10 +138,11 @@ void BuildCpusetWithCallback(
for (i = a; i<= b; i++) { for (i = a; i<= b; i++) {
Callback(i, data); Callback(i, data);
} }
if (stop) if (stop) {
break; break;
} }
} }
}
static void AffinityCallback(int i, void *data) static void AffinityCallback(int i, void *data)
{ {
@ -152,121 +153,218 @@ static void BuildCpuset(const char *name, SCConfNode *node, cpu_set_t *cpu)
{ {
BuildCpusetWithCallback(name, node, AffinityCallback, (void *) cpu); BuildCpusetWithCallback(name, node, AffinityCallback, (void *) cpu);
} }
#endif /* OS_WIN32 and __OpenBSD__ */
/** /**
* \brief Extract cpu affinity configuration from current config file * \brief Get the appropriate set name for a given affinity value.
*/ */
static const char *GetAffinitySetName(const char *val)
void AffinitySetupLoadFromConfig(void)
{ {
#if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun if (strcmp(val, "decode-cpu-set") == 0 || strcmp(val, "stream-cpu-set") == 0 ||
SCConfNode *root = SCConfGetNode("threading.cpu-affinity"); strcmp(val, "reject-cpu-set") == 0 || strcmp(val, "output-cpu-set") == 0) {
SCConfNode *affinity; return NULL;
if (thread_affinity_init_done == 0) {
AffinitySetupInit();
thread_affinity_init_done = 1;
} }
SCLogDebug("Load affinity from config\n"); return (strcmp(val, "detect-cpu-set") == 0) ? "worker-cpu-set" : val;
if (root == NULL) {
SCLogInfo("can't get cpu-affinity node");
return;
} }
TAILQ_FOREACH(affinity, &root->head, next) { /**
if (strcmp(affinity->val, "decode-cpu-set") == 0 || * \brief Set up CPU sets for the given affinity type.
strcmp(affinity->val, "stream-cpu-set") == 0 || */
strcmp(affinity->val, "reject-cpu-set") == 0 || static void SetupCpuSets(ThreadsAffinityType *taf, SCConfNode *affinity, const char *setname)
strcmp(affinity->val, "output-cpu-set") == 0) { {
continue; CPU_ZERO(&taf->cpu_set);
SCConfNode *cpu_node = SCConfNodeLookupChild(affinity->head.tqh_first, "cpu");
if (cpu_node != NULL) {
BuildCpuset(setname, cpu_node, &taf->cpu_set);
} else {
SCLogWarning("Unable to find 'cpu' node for set %s", setname);
}
} }
const char *setname = affinity->val; /**
if (strcmp(affinity->val, "detect-cpu-set") == 0) * \brief Build a priority CPU set for the given priority level.
setname = "worker-cpu-set"; */
static void BuildPriorityCpuset(ThreadsAffinityType *taf, SCConfNode *prio_node,
const char *priority, cpu_set_t *cpuset, const char *setname)
{
SCConfNode *node = SCConfNodeLookupChild(prio_node, priority);
if (node != NULL) {
BuildCpuset(setname, node, cpuset);
} else {
SCLogDebug("Unable to find '%s' priority for set %s", priority, setname);
}
}
ThreadsAffinityType *taf = GetAffinityTypeFromName(setname); /**
SCConfNode *node = NULL; * \brief Set up the default priority for the given affinity type.
SCConfNode *nprio = NULL; * \retval 0 on success, -1 on error
*/
static int SetupDefaultPriority(
ThreadsAffinityType *taf, SCConfNode *prio_node, const char *setname)
{
SCConfNode *default_node = SCConfNodeLookupChild(prio_node, "default");
if (default_node == NULL) {
return 0;
}
if (taf == NULL) { if (strcmp(default_node->val, "low") == 0) {
FatalError("unknown cpu-affinity type"); taf->prio = PRIO_LOW;
} else if (strcmp(default_node->val, "medium") == 0) {
taf->prio = PRIO_MEDIUM;
} else if (strcmp(default_node->val, "high") == 0) {
taf->prio = PRIO_HIGH;
} else { } else {
SCLogConfig("Found affinity definition for \"%s\"", setname); SCLogError("Unknown default CPU affinity priority: %s", default_node->val);
return -1;
} }
CPU_ZERO(&taf->cpu_set); SCLogConfig("Using default priority '%s' for set %s", default_node->val, setname);
node = SCConfNodeLookupChild(affinity->head.tqh_first, "cpu"); return 0;
if (node == NULL) {
SCLogInfo("unable to find 'cpu'");
} else {
BuildCpuset(setname, node, &taf->cpu_set);
} }
/**
* \brief Set up priority CPU sets for the given affinity type.
* \retval 0 on success, -1 on error
*/
static int SetupAffinityPriority(
ThreadsAffinityType *taf, SCConfNode *affinity, const char *setname)
{
CPU_ZERO(&taf->lowprio_cpu); CPU_ZERO(&taf->lowprio_cpu);
CPU_ZERO(&taf->medprio_cpu); CPU_ZERO(&taf->medprio_cpu);
CPU_ZERO(&taf->hiprio_cpu); CPU_ZERO(&taf->hiprio_cpu);
nprio = SCConfNodeLookupChild(affinity->head.tqh_first, "prio");
if (nprio != NULL) { SCConfNode *prio_node = SCConfNodeLookupChild(affinity->head.tqh_first, "prio");
node = SCConfNodeLookupChild(nprio, "low"); if (prio_node == NULL) {
if (node == NULL) { return 0;
SCLogDebug("unable to find 'low' prio using default value");
} else {
BuildCpuset(setname, node, &taf->lowprio_cpu);
} }
node = SCConfNodeLookupChild(nprio, "medium"); BuildPriorityCpuset(taf, prio_node, "low", &taf->lowprio_cpu, setname);
if (node == NULL) { BuildPriorityCpuset(taf, prio_node, "medium", &taf->medprio_cpu, setname);
SCLogDebug("unable to find 'medium' prio using default value"); BuildPriorityCpuset(taf, prio_node, "high", &taf->hiprio_cpu, setname);
} else { return SetupDefaultPriority(taf, prio_node, setname);
BuildCpuset(setname, node, &taf->medprio_cpu);
} }
node = SCConfNodeLookupChild(nprio, "high"); /**
if (node == NULL) { * \brief Set up CPU affinity mode for the given affinity type.
SCLogDebug("unable to find 'high' prio using default value"); * \retval 0 on success, -1 on error
} else { */
BuildCpuset(setname, node, &taf->hiprio_cpu); static int SetupAffinityMode(ThreadsAffinityType *taf, SCConfNode *affinity)
{
SCConfNode *mode_node = SCConfNodeLookupChild(affinity->head.tqh_first, "mode");
if (mode_node == NULL) {
return 0;
} }
node = SCConfNodeLookupChild(nprio, "default");
if (node != NULL) { if (strcmp(mode_node->val, "exclusive") == 0) {
if (!strcmp(node->val, "low")) { taf->mode_flag = EXCLUSIVE_AFFINITY;
taf->prio = PRIO_LOW; } else if (strcmp(mode_node->val, "balanced") == 0) {
} else if (!strcmp(node->val, "medium")) { taf->mode_flag = BALANCED_AFFINITY;
taf->prio = PRIO_MEDIUM;
} else if (!strcmp(node->val, "high")) {
taf->prio = PRIO_HIGH;
} else { } else {
FatalError("unknown cpu_affinity prio"); SCLogError("Unknown CPU affinity mode: %s", mode_node->val);
return -1;
} }
SCLogConfig("Using default prio '%s' for set '%s'", return 0;
node->val, setname);
} }
/**
* \brief Set up the number of threads for the given affinity type.
* \retval 0 on success, -1 on error
*/
static int SetupAffinityThreads(ThreadsAffinityType *taf, SCConfNode *affinity)
{
SCConfNode *threads_node = SCConfNodeLookupChild(affinity->head.tqh_first, "threads");
if (threads_node == NULL) {
return 0;
} }
node = SCConfNodeLookupChild(affinity->head.tqh_first, "mode"); if (StringParseUint32(&taf->nb_threads, 10, 0, threads_node->val) < 0 || taf->nb_threads == 0) {
if (node != NULL) { SCLogError("Invalid thread count: %s", threads_node->val);
if (!strcmp(node->val, "exclusive")) { return -1;
taf->mode_flag = EXCLUSIVE_AFFINITY;
} else if (!strcmp(node->val, "balanced")) {
taf->mode_flag = BALANCED_AFFINITY;
} else {
FatalError("unknown cpu_affinity node");
} }
return 0;
} }
node = SCConfNodeLookupChild(affinity->head.tqh_first, "threads"); static bool AllCPUsUsed(ThreadsAffinityType *taf)
if (node != NULL) { {
if (StringParseUint32(&taf->nb_threads, 10, 0, (const char *)node->val) < 0) { if (taf->lcpu < UtilCpuGetNumProcessorsOnline()) {
FatalError("invalid value for threads " return false;
"count: '%s'", }
node->val); return true;
}
static void ResetCPUs(ThreadsAffinityType *taf)
{
taf->lcpu = 0;
}
static uint16_t GetNextAvailableCPU(ThreadsAffinityType *taf)
{
uint16_t cpu = taf->lcpu;
int attempts = 0;
while (!CPU_ISSET(cpu, &taf->cpu_set) && attempts < 2) {
cpu = (cpu + 1) % UtilCpuGetNumProcessorsOnline();
if (cpu == 0)
attempts++;
}
taf->lcpu = cpu + 1;
if (attempts == 2) {
SCLogError(
"cpu_set does not contain available CPUs, CPU affinity configuration is invalid");
}
return cpu;
}
#endif /* OS_WIN32 and __OpenBSD__ */
/**
* \brief Extract CPU affinity configuration from current config file
*/
void AffinitySetupLoadFromConfig(void)
{
#if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
if (thread_affinity_init_done == 0) {
AffinitySetupInit();
thread_affinity_init_done = 1;
} }
if (! taf->nb_threads) {
FatalError("bad value for threads count"); SCLogDebug("Loading threading.cpu-affinity from config");
SCConfNode *root = SCConfGetNode("threading.cpu-affinity");
if (root == NULL) {
SCLogInfo("Cannot find threading.cpu-affinity node in config");
return;
} }
SCConfNode *affinity;
TAILQ_FOREACH(affinity, &root->head, next) {
const char *setname = GetAffinitySetName(affinity->val);
if (setname == NULL) {
continue;
}
ThreadsAffinityType *taf = GetAffinityTypeFromName(setname);
if (taf == NULL) {
SCLogError("Failed to allocate CPU affinity type: %s", setname);
continue;
}
SCLogConfig("Found CPU affinity definition for \"%s\"", setname);
SetupCpuSets(taf, affinity, setname);
if (SetupAffinityPriority(taf, affinity, setname) < 0) {
SCLogError("Failed to setup priority for CPU affinity type: %s", setname);
continue;
}
if (SetupAffinityMode(taf, affinity) < 0) {
SCLogError("Failed to setup mode for CPU affinity type: %s", setname);
continue;
}
if (SetupAffinityThreads(taf, affinity) < 0) {
SCLogError("Failed to setup threads for CPU affinity type: %s", setname);
continue;
} }
} }
#endif /* OS_WIN32 and __OpenBSD__ */ #endif /* OS_WIN32 and __OpenBSD__ */
@ -280,37 +378,32 @@ uint16_t AffinityGetNextCPU(ThreadsAffinityType *taf)
{ {
uint16_t ncpu = 0; uint16_t ncpu = 0;
#if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
int iter = 0;
SCMutexLock(&taf->taf_mutex); SCMutexLock(&taf->taf_mutex);
ncpu = taf->lcpu; ncpu = GetNextAvailableCPU(taf);
while (!CPU_ISSET(ncpu, &taf->cpu_set) && iter < 2) {
ncpu++; if (AllCPUsUsed(taf)) {
if (ncpu >= UtilCpuGetNumProcessorsOnline()) { ResetCPUs(taf);
ncpu = 0;
iter++;
}
}
if (iter == 2) {
SCLogError("cpu_set does not contain "
"available cpus, cpu affinity conf is invalid");
} }
taf->lcpu = ncpu + 1;
if (taf->lcpu >= UtilCpuGetNumProcessorsOnline())
taf->lcpu = 0;
SCMutexUnlock(&taf->taf_mutex);
SCLogDebug("Setting affinity on CPU %d", ncpu); SCLogDebug("Setting affinity on CPU %d", ncpu);
SCMutexUnlock(&taf->taf_mutex);
#endif /* OS_WIN32 and __OpenBSD__ */ #endif /* OS_WIN32 and __OpenBSD__ */
return ncpu; return ncpu;
} }
/**
* \brief Return the total number of CPUs in a given affinity
* \retval the number of affined CPUs
*/
uint16_t UtilAffinityGetAffinedCPUNum(ThreadsAffinityType *taf) uint16_t UtilAffinityGetAffinedCPUNum(ThreadsAffinityType *taf)
{ {
uint16_t ncpu = 0; uint16_t ncpu = 0;
#if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
SCMutexLock(&taf->taf_mutex); SCMutexLock(&taf->taf_mutex);
for (int i = UtilCpuGetNumProcessorsOnline(); i >= 0; i--) for (int i = UtilCpuGetNumProcessorsOnline(); i >= 0; i--)
if (CPU_ISSET(i, &taf->cpu_set)) if (CPU_ISSET(i, &taf->cpu_set)) {
ncpu++; ncpu++;
}
SCMutexUnlock(&taf->taf_mutex); SCMutexUnlock(&taf->taf_mutex);
#endif #endif
return ncpu; return ncpu;
@ -336,8 +429,9 @@ uint16_t UtilAffinityCpusOverlap(ThreadsAffinityType *taf1, ThreadsAffinityType
SCMutexUnlock(&taf1->taf_mutex); SCMutexUnlock(&taf1->taf_mutex);
for (int i = UtilCpuGetNumProcessorsOnline(); i >= 0; i--) for (int i = UtilCpuGetNumProcessorsOnline(); i >= 0; i--)
if (CPU_ISSET(i, &tmpcset)) if (CPU_ISSET(i, &tmpcset)) {
return 1; return 1;
}
return 0; return 0;
} }

Loading…
Cancel
Save