From fbfced12f2eb469b7cbec40c33f9d461e4b40e7b Mon Sep 17 00:00:00 2001 From: Lukas Sismis Date: Thu, 2 Jan 2025 18:35:52 +0100 Subject: [PATCH] threading: refactor CPU affinity code Split the code into multiple functions for easier readability. --- src/util-affinity.c | 342 ++++++++++++++++++++++++++++---------------- 1 file changed, 218 insertions(+), 124 deletions(-) diff --git a/src/util-affinity.c b/src/util-affinity.c index bc12e634ab..2708d6f213 100644 --- a/src/util-affinity.c +++ b/src/util-affinity.c @@ -33,28 +33,28 @@ ThreadsAffinityType thread_affinity[MAX_CPU_SET] = { { - .name = "receive-cpu-set", - .mode_flag = EXCLUSIVE_AFFINITY, - .prio = PRIO_MEDIUM, - .lcpu = 0, + .name = "receive-cpu-set", + .mode_flag = EXCLUSIVE_AFFINITY, + .prio = PRIO_MEDIUM, + .lcpu = 0, }, { - .name = "worker-cpu-set", - .mode_flag = EXCLUSIVE_AFFINITY, - .prio = PRIO_MEDIUM, - .lcpu = 0, + .name = "worker-cpu-set", + .mode_flag = EXCLUSIVE_AFFINITY, + .prio = PRIO_MEDIUM, + .lcpu = 0, }, { - .name = "verdict-cpu-set", - .mode_flag = BALANCED_AFFINITY, - .prio = PRIO_MEDIUM, - .lcpu = 0, + .name = "verdict-cpu-set", + .mode_flag = BALANCED_AFFINITY, + .prio = PRIO_MEDIUM, + .lcpu = 0, }, { - .name = "management-cpu-set", - .mode_flag = BALANCED_AFFINITY, - .prio = PRIO_MEDIUM, - .lcpu = 0, + .name = "management-cpu-set", + .mode_flag = BALANCED_AFFINITY, + .prio = PRIO_MEDIUM, + .lcpu = 0, }, }; @@ -82,7 +82,7 @@ static void AffinitySetupInit(void) int i, j; int ncpu = UtilCpuGetNumProcessorsConfigured(); - SCLogDebug("Initialize affinity setup\n"); + SCLogDebug("Initialize CPU affinity setup"); /* be conservative relatively to OS: use all cpus by default */ for (i = 0; i < MAX_CPU_SET; i++) { cpu_set_t *cs = &thread_affinity[i].cpu_set; @@ -138,8 +138,9 @@ void BuildCpusetWithCallback( for (i = a; i<= b; i++) { Callback(i, data); } - if (stop) + if (stop) { break; + } } } @@ -152,121 +153,218 @@ static void BuildCpuset(const char *name, SCConfNode *node, cpu_set_t *cpu) { BuildCpusetWithCallback(name, node, AffinityCallback, (void *) cpu); } -#endif /* OS_WIN32 and __OpenBSD__ */ /** - * \brief Extract cpu affinity configuration from current config file + * \brief Get the appropriate set name for a given affinity value. + */ +static const char *GetAffinitySetName(const char *val) +{ + if (strcmp(val, "decode-cpu-set") == 0 || strcmp(val, "stream-cpu-set") == 0 || + strcmp(val, "reject-cpu-set") == 0 || strcmp(val, "output-cpu-set") == 0) { + return NULL; + } + + return (strcmp(val, "detect-cpu-set") == 0) ? "worker-cpu-set" : val; +} + +/** + * \brief Set up CPU sets for the given affinity type. + */ +static void SetupCpuSets(ThreadsAffinityType *taf, SCConfNode *affinity, const char *setname) +{ + CPU_ZERO(&taf->cpu_set); + + SCConfNode *cpu_node = SCConfNodeLookupChild(affinity->head.tqh_first, "cpu"); + if (cpu_node != NULL) { + BuildCpuset(setname, cpu_node, &taf->cpu_set); + } else { + SCLogWarning("Unable to find 'cpu' node for set %s", setname); + } +} + +/** + * \brief Build a priority CPU set for the given priority level. + */ +static void BuildPriorityCpuset(ThreadsAffinityType *taf, SCConfNode *prio_node, + const char *priority, cpu_set_t *cpuset, const char *setname) +{ + SCConfNode *node = SCConfNodeLookupChild(prio_node, priority); + if (node != NULL) { + BuildCpuset(setname, node, cpuset); + } else { + SCLogDebug("Unable to find '%s' priority for set %s", priority, setname); + } +} + +/** + * \brief Set up the default priority for the given affinity type. + * \retval 0 on success, -1 on error */ +static int SetupDefaultPriority( + ThreadsAffinityType *taf, SCConfNode *prio_node, const char *setname) +{ + SCConfNode *default_node = SCConfNodeLookupChild(prio_node, "default"); + if (default_node == NULL) { + return 0; + } + + if (strcmp(default_node->val, "low") == 0) { + taf->prio = PRIO_LOW; + } else if (strcmp(default_node->val, "medium") == 0) { + taf->prio = PRIO_MEDIUM; + } else if (strcmp(default_node->val, "high") == 0) { + taf->prio = PRIO_HIGH; + } else { + SCLogError("Unknown default CPU affinity priority: %s", default_node->val); + return -1; + } + + SCLogConfig("Using default priority '%s' for set %s", default_node->val, setname); + return 0; +} + +/** + * \brief Set up priority CPU sets for the given affinity type. + * \retval 0 on success, -1 on error + */ +static int SetupAffinityPriority( + ThreadsAffinityType *taf, SCConfNode *affinity, const char *setname) +{ + CPU_ZERO(&taf->lowprio_cpu); + CPU_ZERO(&taf->medprio_cpu); + CPU_ZERO(&taf->hiprio_cpu); + + SCConfNode *prio_node = SCConfNodeLookupChild(affinity->head.tqh_first, "prio"); + if (prio_node == NULL) { + return 0; + } + + BuildPriorityCpuset(taf, prio_node, "low", &taf->lowprio_cpu, setname); + BuildPriorityCpuset(taf, prio_node, "medium", &taf->medprio_cpu, setname); + BuildPriorityCpuset(taf, prio_node, "high", &taf->hiprio_cpu, setname); + return SetupDefaultPriority(taf, prio_node, setname); +} + +/** + * \brief Set up CPU affinity mode for the given affinity type. + * \retval 0 on success, -1 on error + */ +static int SetupAffinityMode(ThreadsAffinityType *taf, SCConfNode *affinity) +{ + SCConfNode *mode_node = SCConfNodeLookupChild(affinity->head.tqh_first, "mode"); + if (mode_node == NULL) { + return 0; + } + + if (strcmp(mode_node->val, "exclusive") == 0) { + taf->mode_flag = EXCLUSIVE_AFFINITY; + } else if (strcmp(mode_node->val, "balanced") == 0) { + taf->mode_flag = BALANCED_AFFINITY; + } else { + SCLogError("Unknown CPU affinity mode: %s", mode_node->val); + return -1; + } + return 0; +} + +/** + * \brief Set up the number of threads for the given affinity type. + * \retval 0 on success, -1 on error + */ +static int SetupAffinityThreads(ThreadsAffinityType *taf, SCConfNode *affinity) +{ + SCConfNode *threads_node = SCConfNodeLookupChild(affinity->head.tqh_first, "threads"); + if (threads_node == NULL) { + return 0; + } + + if (StringParseUint32(&taf->nb_threads, 10, 0, threads_node->val) < 0 || taf->nb_threads == 0) { + SCLogError("Invalid thread count: %s", threads_node->val); + return -1; + } + return 0; +} +static bool AllCPUsUsed(ThreadsAffinityType *taf) +{ + if (taf->lcpu < UtilCpuGetNumProcessorsOnline()) { + return false; + } + return true; +} + +static void ResetCPUs(ThreadsAffinityType *taf) +{ + taf->lcpu = 0; +} + +static uint16_t GetNextAvailableCPU(ThreadsAffinityType *taf) +{ + uint16_t cpu = taf->lcpu; + int attempts = 0; + + while (!CPU_ISSET(cpu, &taf->cpu_set) && attempts < 2) { + cpu = (cpu + 1) % UtilCpuGetNumProcessorsOnline(); + if (cpu == 0) + attempts++; + } + + taf->lcpu = cpu + 1; + + if (attempts == 2) { + SCLogError( + "cpu_set does not contain available CPUs, CPU affinity configuration is invalid"); + } + + return cpu; +} +#endif /* OS_WIN32 and __OpenBSD__ */ + +/** + * \brief Extract CPU affinity configuration from current config file + */ void AffinitySetupLoadFromConfig(void) { #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun - SCConfNode *root = SCConfGetNode("threading.cpu-affinity"); - SCConfNode *affinity; - if (thread_affinity_init_done == 0) { AffinitySetupInit(); thread_affinity_init_done = 1; } - SCLogDebug("Load affinity from config\n"); + SCLogDebug("Loading threading.cpu-affinity from config"); + SCConfNode *root = SCConfGetNode("threading.cpu-affinity"); if (root == NULL) { - SCLogInfo("can't get cpu-affinity node"); + SCLogInfo("Cannot find threading.cpu-affinity node in config"); return; } + SCConfNode *affinity; TAILQ_FOREACH(affinity, &root->head, next) { - if (strcmp(affinity->val, "decode-cpu-set") == 0 || - strcmp(affinity->val, "stream-cpu-set") == 0 || - strcmp(affinity->val, "reject-cpu-set") == 0 || - strcmp(affinity->val, "output-cpu-set") == 0) { + const char *setname = GetAffinitySetName(affinity->val); + if (setname == NULL) { continue; } - const char *setname = affinity->val; - if (strcmp(affinity->val, "detect-cpu-set") == 0) - setname = "worker-cpu-set"; - ThreadsAffinityType *taf = GetAffinityTypeFromName(setname); - SCConfNode *node = NULL; - SCConfNode *nprio = NULL; - if (taf == NULL) { - FatalError("unknown cpu-affinity type"); - } else { - SCLogConfig("Found affinity definition for \"%s\"", setname); - } - - CPU_ZERO(&taf->cpu_set); - node = SCConfNodeLookupChild(affinity->head.tqh_first, "cpu"); - if (node == NULL) { - SCLogInfo("unable to find 'cpu'"); - } else { - BuildCpuset(setname, node, &taf->cpu_set); + SCLogError("Failed to allocate CPU affinity type: %s", setname); + continue; } - CPU_ZERO(&taf->lowprio_cpu); - CPU_ZERO(&taf->medprio_cpu); - CPU_ZERO(&taf->hiprio_cpu); - nprio = SCConfNodeLookupChild(affinity->head.tqh_first, "prio"); - if (nprio != NULL) { - node = SCConfNodeLookupChild(nprio, "low"); - if (node == NULL) { - SCLogDebug("unable to find 'low' prio using default value"); - } else { - BuildCpuset(setname, node, &taf->lowprio_cpu); - } - - node = SCConfNodeLookupChild(nprio, "medium"); - if (node == NULL) { - SCLogDebug("unable to find 'medium' prio using default value"); - } else { - BuildCpuset(setname, node, &taf->medprio_cpu); - } + SCLogConfig("Found CPU affinity definition for \"%s\"", setname); - node = SCConfNodeLookupChild(nprio, "high"); - if (node == NULL) { - SCLogDebug("unable to find 'high' prio using default value"); - } else { - BuildCpuset(setname, node, &taf->hiprio_cpu); - } - node = SCConfNodeLookupChild(nprio, "default"); - if (node != NULL) { - if (!strcmp(node->val, "low")) { - taf->prio = PRIO_LOW; - } else if (!strcmp(node->val, "medium")) { - taf->prio = PRIO_MEDIUM; - } else if (!strcmp(node->val, "high")) { - taf->prio = PRIO_HIGH; - } else { - FatalError("unknown cpu_affinity prio"); - } - SCLogConfig("Using default prio '%s' for set '%s'", - node->val, setname); - } + SetupCpuSets(taf, affinity, setname); + if (SetupAffinityPriority(taf, affinity, setname) < 0) { + SCLogError("Failed to setup priority for CPU affinity type: %s", setname); + continue; } - - node = SCConfNodeLookupChild(affinity->head.tqh_first, "mode"); - if (node != NULL) { - if (!strcmp(node->val, "exclusive")) { - taf->mode_flag = EXCLUSIVE_AFFINITY; - } else if (!strcmp(node->val, "balanced")) { - taf->mode_flag = BALANCED_AFFINITY; - } else { - FatalError("unknown cpu_affinity node"); - } + if (SetupAffinityMode(taf, affinity) < 0) { + SCLogError("Failed to setup mode for CPU affinity type: %s", setname); + continue; } - - node = SCConfNodeLookupChild(affinity->head.tqh_first, "threads"); - if (node != NULL) { - if (StringParseUint32(&taf->nb_threads, 10, 0, (const char *)node->val) < 0) { - FatalError("invalid value for threads " - "count: '%s'", - node->val); - } - if (! taf->nb_threads) { - FatalError("bad value for threads count"); - } + if (SetupAffinityThreads(taf, affinity) < 0) { + SCLogError("Failed to setup threads for CPU affinity type: %s", setname); + continue; } } #endif /* OS_WIN32 and __OpenBSD__ */ @@ -280,37 +378,32 @@ uint16_t AffinityGetNextCPU(ThreadsAffinityType *taf) { uint16_t ncpu = 0; #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun - int iter = 0; SCMutexLock(&taf->taf_mutex); - ncpu = taf->lcpu; - while (!CPU_ISSET(ncpu, &taf->cpu_set) && iter < 2) { - ncpu++; - if (ncpu >= UtilCpuGetNumProcessorsOnline()) { - ncpu = 0; - iter++; - } - } - if (iter == 2) { - SCLogError("cpu_set does not contain " - "available cpus, cpu affinity conf is invalid"); + ncpu = GetNextAvailableCPU(taf); + + if (AllCPUsUsed(taf)) { + ResetCPUs(taf); } - taf->lcpu = ncpu + 1; - if (taf->lcpu >= UtilCpuGetNumProcessorsOnline()) - taf->lcpu = 0; - SCMutexUnlock(&taf->taf_mutex); + SCLogDebug("Setting affinity on CPU %d", ncpu); + SCMutexUnlock(&taf->taf_mutex); #endif /* OS_WIN32 and __OpenBSD__ */ return ncpu; } +/** + * \brief Return the total number of CPUs in a given affinity + * \retval the number of affined CPUs + */ uint16_t UtilAffinityGetAffinedCPUNum(ThreadsAffinityType *taf) { uint16_t ncpu = 0; #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun SCMutexLock(&taf->taf_mutex); for (int i = UtilCpuGetNumProcessorsOnline(); i >= 0; i--) - if (CPU_ISSET(i, &taf->cpu_set)) + if (CPU_ISSET(i, &taf->cpu_set)) { ncpu++; + } SCMutexUnlock(&taf->taf_mutex); #endif return ncpu; @@ -336,8 +429,9 @@ uint16_t UtilAffinityCpusOverlap(ThreadsAffinityType *taf1, ThreadsAffinityType SCMutexUnlock(&taf1->taf_mutex); for (int i = UtilCpuGetNumProcessorsOnline(); i >= 0; i--) - if (CPU_ISSET(i, &tmpcset)) + if (CPU_ISSET(i, &tmpcset)) { return 1; + } return 0; }