diff --git a/src/runmode-pcap.c b/src/runmode-pcap.c index 43d9f81cfe..101c668781 100644 --- a/src/runmode-pcap.c +++ b/src/runmode-pcap.c @@ -55,6 +55,13 @@ void RunModeIdsPcapRegister(void) "Multi threaded pcap live mode", RunModeIdsPcapAuto); default_mode = "auto"; + RunModeRegisterNewRunMode(RUNMODE_PCAP_DEV, "autofp", + "Multi threaded pcap live mode. Packets from " + "each flow are assigned to a single detect thread, " + "unlike \"pcap_live_auto\" where packets from " + "the same flow can be processed by any detect " + "thread", + RunModeIdsPcapAutoFp); return; } @@ -478,3 +485,207 @@ int RunModeIdsPcapAuto(DetectEngineCtx *de_ctx) return 0; } + +/** + * \brief RunModIdsPcapAutoFp set up the following thread packet handlers: + * - Receive thread (from pcap device) + * - Decode thread + * - Stream thread + * - Detect: If we have only 1 cpu, it will setup one Detect thread + * If we have more than one, it will setup num_cpus - 1 + * starting from the second cpu available. + * - Outputs thread + * By default the threads will use the first cpu available + * except the Detection threads if we have more than one cpu. + * + * \param de_ctx Pointer to the Detection Engine + * + * \retval 0 If all goes well. (If any problem is detected the engine will + * exit()). + */ +int RunModeIdsPcapAutoFp(DetectEngineCtx *de_ctx) +{ + char tname[18]; + char qname[12]; + uint16_t cpu = 0; + char queues[2048] = ""; + int thread; + int thread_max; + TmModule *tm_module = NULL; + + SCEnter(); + RunModeInitialize(); + + /* Available cpus */ + uint16_t ncpus = UtilCpuGetNumProcessorsOnline(); + int npcap = LiveGetDeviceCount(); + + /* start with cpu 1 so that if we're creating an odd number of detect + * threads we're not creating the most on CPU0. */ + if (ncpus > 0) + cpu = 1; + + thread_max = TmThreadGetNbThreads(DETECT_CPU_SET); + + /* Set up the queue needed for the detect threads */ + if (thread_max == 0) + thread_max = ncpus * threading_detect_ratio; + /* always create at least one thread */ + if (thread_max < 1) + thread_max = 1; + + for (thread = 0; thread < thread_max; thread++) { + if (strlen(queues) > 0) + strlcat(queues, ",", sizeof(queues)); + + snprintf(qname, sizeof(qname), "pickup%"PRIu16, thread+1); + strlcat(queues, qname, sizeof(queues)); + } + SCLogDebug("queues %s", queues); + + TimeModeSetLive(); + + if (npcap == 1) { + char *pcap_dev = NULL; + if (ConfGet("pcap.single_pcap_dev", &pcap_dev) == 0) { + SCLogError(SC_ERR_RUNMODE, "Failed retrieving " + "pcap.single_pcap_dev from Conf"); + exit(EXIT_FAILURE); + } + SCLogDebug("pcap_dev %s", pcap_dev); + + char *pcap_devc = SCStrdup(pcap_dev); + /* create the threads */ + ThreadVars *tv_receivepcap = + TmThreadCreatePacketHandler("ReceivePcap", + "packetpool", "packetpool", + queues, "flow", + "pktacqloop"); + if (tv_receivepcap == NULL) { + printf("ERROR: TmThreadsCreate failed\n"); + exit(EXIT_FAILURE); + } + tm_module = TmModuleGetByName("ReceivePcap"); + if (tm_module == NULL) { + printf("ERROR: TmModuleGetByName failed for ReceivePcap\n"); + exit(EXIT_FAILURE); + } + TmSlotSetFuncAppend(tv_receivepcap, tm_module, pcap_devc); + + tm_module = TmModuleGetByName("DecodePcap"); + if (tm_module == NULL) { + printf("ERROR: TmModuleGetByName DecodePcap failed\n"); + exit(EXIT_FAILURE); + } + TmSlotSetFuncAppend(tv_receivepcap, tm_module, NULL); + + TmThreadSetCPU(tv_receivepcap, RECEIVE_CPU_SET); + + if (TmThreadSpawn(tv_receivepcap) != TM_ECODE_OK) { + printf("ERROR: TmThreadSpawn failed\n"); + exit(EXIT_FAILURE); + } + } else { + SCLogInfo("Using %d pcap device(s).", npcap); + + for (thread = 0; thread < npcap; thread++) { + char *pcap_dev = LiveGetDevice(thread); + if (pcap_dev == NULL) { + printf("Failed to lookup pcap dev %d\n", thread); + exit(EXIT_FAILURE); + } + SCLogDebug("pcap_dev %s", pcap_dev); + + snprintf(tname, sizeof(tname),"RecvPcap-%s", pcap_dev); + char *tnamec = SCStrdup(tname); + char *pcap_devc = SCStrdup(pcap_dev); + + /* create the threads */ + ThreadVars *tv_receivepcap = + TmThreadCreatePacketHandler(tnamec, + "packetpool", "packetpool", + queues, "flow", + "pktacqloop"); + if (tv_receivepcap == NULL) { + printf("ERROR: TmThreadsCreate failed\n"); + exit(EXIT_FAILURE); + } + tm_module = TmModuleGetByName("ReceivePcap"); + if (tm_module == NULL) { + printf("ERROR: TmModuleGetByName failed for ReceivePcap\n"); + exit(EXIT_FAILURE); + } + TmSlotSetFuncAppend(tv_receivepcap, tm_module, pcap_devc); + + tm_module = TmModuleGetByName("DecodePcap"); + if (tm_module == NULL) { + printf("ERROR: TmModuleGetByName DecodePcap failed\n"); + exit(EXIT_FAILURE); + } + TmSlotSetFuncAppend(tv_receivepcap, tm_module, NULL); + + TmThreadSetCPU(tv_receivepcap, RECEIVE_CPU_SET); + + if (TmThreadSpawn(tv_receivepcap) != TM_ECODE_OK) { + printf("ERROR: TmThreadSpawn failed\n"); + exit(EXIT_FAILURE); + } + } + } + + for (thread = 0; thread < thread_max; thread++) { + snprintf(tname, sizeof(tname), "Detect%"PRIu16, thread+1); + snprintf(qname, sizeof(qname), "pickup%"PRIu16, thread+1); + + SCLogDebug("tname %s, qname %s", tname, qname); + + char *thread_name = SCStrdup(tname); + SCLogDebug("Assigning %s affinity to cpu %u", thread_name, cpu); + + ThreadVars *tv_detect_ncpu = + TmThreadCreatePacketHandler(thread_name, + qname, "flow", + "packetpool", "packetpool", + "varslot"); + if (tv_detect_ncpu == NULL) { + printf("ERROR: TmThreadsCreate failed\n"); + exit(EXIT_FAILURE); + } + tm_module = TmModuleGetByName("StreamTcp"); + if (tm_module == NULL) { + printf("ERROR: TmModuleGetByName StreamTcp failed\n"); + exit(EXIT_FAILURE); + } + TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, NULL); + + tm_module = TmModuleGetByName("Detect"); + if (tm_module == NULL) { + printf("ERROR: TmModuleGetByName Detect failed\n"); + exit(EXIT_FAILURE); + } + TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, (void *)de_ctx); + + TmThreadSetCPU(tv_detect_ncpu, DETECT_CPU_SET); + + char *thread_group_name = SCStrdup("Detect"); + if (thread_group_name == NULL) { + printf("Error allocating memory\n"); + exit(EXIT_FAILURE); + } + tv_detect_ncpu->thread_group_name = thread_group_name; + + /* add outputs as well */ + SetupOutputs(tv_detect_ncpu); + + if (TmThreadSpawn(tv_detect_ncpu) != TM_ECODE_OK) { + printf("ERROR: TmThreadSpawn failed\n"); + exit(EXIT_FAILURE); + } + + if ((cpu + 1) == ncpus) + cpu = 0; + else + cpu++; + } + return 0; +} diff --git a/src/runmode-pcap.h b/src/runmode-pcap.h index 0279ab0273..9e88c6d90f 100644 --- a/src/runmode-pcap.h +++ b/src/runmode-pcap.h @@ -25,6 +25,7 @@ int RunModeIdsPcapAuto(DetectEngineCtx *); int RunModeIdsPcapSingle(DetectEngineCtx *); +int RunModeIdsPcapAutoFp(DetectEngineCtx *de_ctx); void RunModeIdsPcapRegister(void); const char *RunModeIdsGetDefaultMode(void);