From b753ecce508340cd58cf7cdecce955c6864a4705 Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Sun, 24 Jul 2011 12:52:46 +0200 Subject: [PATCH] Implement a pkt acq loop infra with support for pcap-file. --- src/runmode-pcap-file.c | 211 +++++++++++++++------------------------- src/runmode-pcap-file.h | 2 +- src/source-pcap-file.c | 129 ++++++++++-------------- src/tm-modules.h | 2 + src/tm-threads.c | 127 ++++++++++++++++++++++++ src/tm-threads.h | 4 + src/util-mpm-b2g-cuda.c | 2 +- 7 files changed, 260 insertions(+), 217 deletions(-) diff --git a/src/runmode-pcap-file.c b/src/runmode-pcap-file.c index 2ab32ece3f..a1188c2291 100644 --- a/src/runmode-pcap-file.c +++ b/src/runmode-pcap-file.c @@ -49,7 +49,7 @@ void RunModeFilePcapRegister(void) { RunModeRegisterNewRunMode(RUNMODE_PCAP_FILE, "single", "Single threaded pcap file mode", - RunModeFilePcap2); + RunModeFilePcapSingle); RunModeRegisterNewRunMode(RUNMODE_PCAP_FILE, "auto", "Multi threaded pcap file mode", RunModeFilePcapAuto); @@ -68,7 +68,7 @@ void RunModeFilePcapRegister(void) /** * \brief Single thread version of the Pcap file processing. */ -int RunModeFilePcap2(DetectEngineCtx *de_ctx) +int RunModeFilePcapSingle(DetectEngineCtx *de_ctx) { char *file = NULL; if (ConfGet("pcap_file.file", &file) == 0) { @@ -81,8 +81,8 @@ int RunModeFilePcap2(DetectEngineCtx *de_ctx) /* create the threads */ ThreadVars *tv = TmThreadCreatePacketHandler("PcapFile", "packetpool", "packetpool", - "packetpool","packetpool", - "varslot"); + "packetpool", "packetpool", + "pktacqloop"); if (tv == NULL) { printf("ERROR: TmThreadsCreate failed\n"); exit(EXIT_FAILURE); @@ -148,7 +148,8 @@ int RunModeFilePcapAuto(DetectEngineCtx *de_ctx) SCEnter(); char tname[16]; uint16_t cpu = 0; - + TmModule *tm_module; + int cuda = 0; RunModeInitialize(); /* Available cpus */ @@ -163,166 +164,121 @@ int RunModeFilePcapAuto(DetectEngineCtx *de_ctx) TimeModeSetOffline(); - /* create the threads */ - ThreadVars *tv_receivepcap = - TmThreadCreatePacketHandler("ReceivePcapFile", - "packetpool", "packetpool", - "pickup-queue", "simple", - "1slot"); - if (tv_receivepcap == NULL) { - printf("ERROR: TmThreadsCreate failed\n"); - exit(EXIT_FAILURE); - } - TmModule *tm_module = TmModuleGetByName("ReceivePcapFile"); - if (tm_module == NULL) { - printf("ERROR: TmModuleGetByName failed for ReceivePcap\n"); - exit(EXIT_FAILURE); - } - TmSlotSetFuncAppend(tv_receivepcap, tm_module, file); - - TmThreadSetCPU(tv_receivepcap, RECEIVE_CPU_SET); - - if (TmThreadSpawn(tv_receivepcap) != TM_ECODE_OK) { - printf("ERROR: TmThreadSpawn failed\n"); - exit(EXIT_FAILURE); - } - #if defined(__SC_CUDA_SUPPORT__) if (PatternMatchDefaultMatcher() == MPM_B2G_CUDA) { - ThreadVars *tv_decode1 = - TmThreadCreatePacketHandler("Decode", - "pickup-queue", "simple", - "decode-queue1", "simple", - "1slot"); - if (tv_decode1 == NULL) { - printf("ERROR: TmThreadsCreate failed for Decode1\n"); + cuda = 1; + } +#endif + + if (cuda == 0) { + /* create the threads */ + ThreadVars *tv_receivepcap = + TmThreadCreatePacketHandler("ReceivePcapFile", + "packetpool", "packetpool", + "detect-queue1", "simple", + "pktacqloop"); + if (tv_receivepcap == NULL) { + printf("ERROR: TmThreadsCreate failed\n"); exit(EXIT_FAILURE); } - tm_module = TmModuleGetByName("DecodePcapFile"); + tm_module = TmModuleGetByName("ReceivePcapFile"); if (tm_module == NULL) { - printf("ERROR: TmModuleGetByName DecodePcap failed\n"); + printf("ERROR: TmModuleGetByName failed for ReceivePcap\n"); exit(EXIT_FAILURE); } - TmSlotSetFuncAppend(tv_decode1, tm_module, NULL); + TmSlotSetFuncAppend(tv_receivepcap, tm_module, file); - TmThreadSetCPU(tv_decode1, DECODE_CPU_SET); + TmThreadSetCPU(tv_receivepcap, RECEIVE_CPU_SET); - if (TmThreadSpawn(tv_decode1) != TM_ECODE_OK) { - printf("ERROR: TmThreadSpawn failed\n"); - exit(EXIT_FAILURE); - } - - ThreadVars *tv_cuda_PB = - TmThreadCreate("CUDA_PB", - "decode-queue1", "simple", - "cuda-pb-queue1", "simple", - "custom", SCCudaPBTmThreadsSlot1, 0); - if (tv_cuda_PB == NULL) { - printf("ERROR: TmThreadsCreate failed for CUDA_PB\n"); + tm_module = TmModuleGetByName("DecodePcapFile"); + if (tm_module == NULL) { + printf("ERROR: TmModuleGetByName DecodePcap failed\n"); exit(EXIT_FAILURE); } - tv_cuda_PB->type = TVT_PPT; + TmSlotSetFuncAppend(tv_receivepcap, tm_module, NULL); - tm_module = TmModuleGetByName("CudaPacketBatcher"); + tm_module = TmModuleGetByName("StreamTcp"); if (tm_module == NULL) { - printf("ERROR: TmModuleGetByName CudaPacketBatcher failed\n"); + printf("ERROR: TmModuleGetByName StreamTcp failed\n"); exit(EXIT_FAILURE); } - TmSlotSetFuncAppend(tv_cuda_PB, tm_module, (void *)de_ctx); + TmSlotSetFuncAppend(tv_receivepcap, tm_module, (void *)de_ctx); - TmThreadSetCPU(tv_cuda_PB, DETECT_CPU_SET); + TmThreadSetCPU(tv_receivepcap, DECODE_CPU_SET); - if (TmThreadSpawn(tv_cuda_PB) != TM_ECODE_OK) { + if (TmThreadSpawn(tv_receivepcap) != TM_ECODE_OK) { printf("ERROR: TmThreadSpawn failed\n"); exit(EXIT_FAILURE); } - - ThreadVars *tv_stream1 = - TmThreadCreatePacketHandler("Stream1", - "cuda-pb-queue1", "simple", - "stream-queue1", "simple", - "1slot"); - if (tv_stream1 == NULL) { - printf("ERROR: TmThreadsCreate failed for Stream1\n"); +#if defined(__SC_CUDA_SUPPORT__) + } else { + /* create the threads */ + ThreadVars *tv_receivepcap = + TmThreadCreatePacketHandler("ReceivePcapFile", + "packetpool", "packetpool", + "stream-queue1", "simple", + "pktacqloop"); + if (tv_receivepcap == NULL) { + printf("ERROR: TmThreadsCreate failed\n"); exit(EXIT_FAILURE); } - tm_module = TmModuleGetByName("StreamTcp"); + tm_module = TmModuleGetByName("ReceivePcapFile"); if (tm_module == NULL) { - printf("ERROR: TmModuleGetByName StreamTcp failed\n"); + printf("ERROR: TmModuleGetByName failed for ReceivePcap\n"); exit(EXIT_FAILURE); } - TmSlotSetFuncAppend(tv_stream1, tm_module, NULL); + TmSlotSetFuncAppend(tv_receivepcap, tm_module, file); - TmThreadSetCPU(tv_stream1, STREAM_CPU_SET); + TmThreadSetCPU(tv_receivepcap, RECEIVE_CPU_SET); - if (TmThreadSpawn(tv_stream1) != TM_ECODE_OK) { - printf("ERROR: TmThreadSpawn failed\n"); - exit(EXIT_FAILURE); - } - } else { - ThreadVars *tv_decode1 = - TmThreadCreatePacketHandler("Decode & Stream", - "pickup-queue", "simple", - "stream-queue1", "simple", - "varslot"); - if (tv_decode1 == NULL) { - printf("ERROR: TmThreadsCreate failed for Decode1\n"); - exit(EXIT_FAILURE); - } tm_module = TmModuleGetByName("DecodePcapFile"); if (tm_module == NULL) { printf("ERROR: TmModuleGetByName DecodePcap failed\n"); exit(EXIT_FAILURE); } - TmSlotSetFuncAppend(tv_decode1, tm_module, NULL); + TmSlotSetFuncAppend(tv_receivepcap, tm_module, NULL); tm_module = TmModuleGetByName("StreamTcp"); if (tm_module == NULL) { printf("ERROR: TmModuleGetByName StreamTcp failed\n"); exit(EXIT_FAILURE); } - TmSlotSetFuncAppend(tv_decode1, tm_module, NULL); + TmSlotSetFuncAppend(tv_receivepcap, tm_module, NULL); - TmThreadSetCPU(tv_decode1, DECODE_CPU_SET); + TmThreadSetCPU(tv_receivepcap, DECODE_CPU_SET); - if (TmThreadSpawn(tv_decode1) != TM_ECODE_OK) { + if (TmThreadSpawn(tv_receivepcap) != TM_ECODE_OK) { printf("ERROR: TmThreadSpawn failed\n"); exit(EXIT_FAILURE); } - } -#else - ThreadVars *tv_decode1 = - TmThreadCreatePacketHandler("Decode & Stream", - "pickup-queue", "simple", - "stream-queue1", "simple", - "varslot"); - if (tv_decode1 == NULL) { - printf("ERROR: TmThreadsCreate failed for Decode1\n"); - exit(EXIT_FAILURE); - } - tm_module = TmModuleGetByName("DecodePcapFile"); - if (tm_module == NULL) { - printf("ERROR: TmModuleGetByName DecodePcap failed\n"); - exit(EXIT_FAILURE); - } - TmSlotSetFuncAppend(tv_decode1, tm_module, NULL); + ThreadVars *tv_cuda_PB = + TmThreadCreate("CUDA_PB", + "stream-queue1", "simple", + "detect-queue1", "simple", + "custom", SCCudaPBTmThreadsSlot1, 0); + if (tv_cuda_PB == NULL) { + printf("ERROR: TmThreadsCreate failed for CUDA_PB\n"); + exit(EXIT_FAILURE); + } + tv_cuda_PB->type = TVT_PPT; - tm_module = TmModuleGetByName("StreamTcp"); - if (tm_module == NULL) { - printf("ERROR: TmModuleGetByName StreamTcp failed\n"); - exit(EXIT_FAILURE); - } - TmSlotSetFuncAppend(tv_decode1, tm_module, NULL); + tm_module = TmModuleGetByName("CudaPacketBatcher"); + if (tm_module == NULL) { + printf("ERROR: TmModuleGetByName CudaPacketBatcher failed\n"); + exit(EXIT_FAILURE); + } + TmSlotSetFuncAppend(tv_cuda_PB, tm_module, (void *)de_ctx); - TmThreadSetCPU(tv_decode1, DECODE_CPU_SET); + TmThreadSetCPU(tv_cuda_PB, DETECT_CPU_SET); - if (TmThreadSpawn(tv_decode1) != TM_ECODE_OK) { - printf("ERROR: TmThreadSpawn failed\n"); - exit(EXIT_FAILURE); + if (TmThreadSpawn(tv_cuda_PB) != TM_ECODE_OK) { + printf("ERROR: TmThreadSpawn failed\n"); + exit(EXIT_FAILURE); + } +#endif } -#endif /* start with cpu 1 so that if we're creating an odd number of detect * threads we're not creating the most on CPU0. */ if (ncpus > 0) @@ -344,7 +300,7 @@ int RunModeFilePcapAuto(DetectEngineCtx *de_ctx) ThreadVars *tv_detect_ncpu = TmThreadCreatePacketHandler(thread_name, - "stream-queue1", "simple", + "detect-queue1", "simple", "alert-queue1", "simple", "1slot"); if (tv_detect_ncpu == NULL) { @@ -450,7 +406,7 @@ int RunModeFilePcapAutoFp(DetectEngineCtx *de_ctx) snprintf(qname, sizeof(qname), "pickup%"PRIu16, thread+1); strlcat(queues, qname, sizeof(queues)); } - printf("queues %s\n", queues); + SCLogDebug("queues %s", queues); char *file = NULL; if (ConfGet("pcap_file.file", &file) == 0) { @@ -466,7 +422,7 @@ int RunModeFilePcapAutoFp(DetectEngineCtx *de_ctx) TmThreadCreatePacketHandler("ReceivePcapFile", "packetpool", "packetpool", queues, "flow", - "varslot"); + "pktacqloop"); if (tv_receivepcap == NULL) { printf("ERROR: TmThreadsCreate failed\n"); exit(EXIT_FAILURE); @@ -500,7 +456,7 @@ int RunModeFilePcapAutoFp(DetectEngineCtx *de_ctx) snprintf(tname, sizeof(tname), "Detect%"PRIu16, thread+1); snprintf(qname, sizeof(qname), "pickup%"PRIu16, thread+1); - printf("tname %s, qname %s\n", tname, qname); + SCLogDebug("tname %s, qname %s", tname, qname); char *thread_name = SCStrdup(tname); SCLogDebug("Assigning %s affinity to cpu %u", thread_name, cpu); @@ -561,20 +517,5 @@ int RunModeFilePcapAutoFp(DetectEngineCtx *de_ctx) else cpu++; } -/* - ThreadVars *tv_outputs = TmThreadCreatePacketHandler("Outputs", - "alert-queue1", "simple", "packetpool", "packetpool", "varslot"); - - if (threading_set_cpu_affinity) { - TmThreadSetCPUAffinity(tv_outputs, 0); - if (ncpus > 1) - TmThreadSetThreadPriority(tv_outputs, PRIO_MEDIUM); - } - - if (TmThreadSpawn(tv_outputs) != TM_ECODE_OK) { - printf("ERROR: TmThreadSpawn failed\n"); - exit(EXIT_FAILURE); - } -*/ return 0; } diff --git a/src/runmode-pcap-file.h b/src/runmode-pcap-file.h index f75e257ff4..5dbd3a9c26 100644 --- a/src/runmode-pcap-file.h +++ b/src/runmode-pcap-file.h @@ -23,7 +23,7 @@ #ifndef __RUNMODE_PCAP_FILE_H__ #define __RUNMODE_PCAP_FILE_H__ -int RunModeFilePcap2(DetectEngineCtx *); +int RunModeFilePcapSingle(DetectEngineCtx *); int RunModeFilePcapAuto(DetectEngineCtx *); int RunModeFilePcapAutoFp(DetectEngineCtx *de_ctx); void RunModeFilePcapRegister(void); diff --git a/src/source-pcap-file.c b/src/source-pcap-file.c index ff6bf2745a..6f3b53ae63 100644 --- a/src/source-pcap-file.c +++ b/src/source-pcap-file.c @@ -44,11 +44,13 @@ #include "util-error.h" #include "util-privs.h" #include "tmqh-packetpool.h" +#include "tm-threads.h" +#include "util-optimize.h" extern uint8_t suricata_ctl_flags; extern int max_pending_packets; -static int pcap_max_read_packets = 0; +//static int pcap_max_read_packets = 0; typedef struct PcapFileGlobalVars_ { pcap_t *pcap_handle; @@ -59,28 +61,25 @@ typedef struct PcapFileGlobalVars_ { } PcapFileGlobalVars; /** max packets < 65536 */ -#define PCAP_FILE_MAX_PKTS 256 +//#define PCAP_FILE_MAX_PKTS 256 typedef struct PcapFileThreadVars_ { /* counters */ uint32_t pkts; uint64_t bytes; - uint32_t errs; ThreadVars *tv; - - Packet *in_p; - - Packet *array[PCAP_FILE_MAX_PKTS]; - uint16_t array_idx; + TmSlot *slot; uint8_t done; + uint32_t errs; } PcapFileThreadVars; static PcapFileGlobalVars pcap_g; -TmEcode ReceivePcapFile(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); +TmEcode ReceivePcapFileLoop(ThreadVars *, void *, void *); + TmEcode ReceivePcapFileThreadInit(ThreadVars *, void *, void **); void ReceivePcapFileThreadExitStats(ThreadVars *, void *); TmEcode ReceivePcapFileThreadDeinit(ThreadVars *, void *); @@ -93,7 +92,8 @@ void TmModuleReceivePcapFileRegister (void) { tmm_modules[TMM_RECEIVEPCAPFILE].name = "ReceivePcapFile"; tmm_modules[TMM_RECEIVEPCAPFILE].ThreadInit = ReceivePcapFileThreadInit; - tmm_modules[TMM_RECEIVEPCAPFILE].Func = ReceivePcapFile; + tmm_modules[TMM_RECEIVEPCAPFILE].Func = NULL; + tmm_modules[TMM_RECEIVEPCAPFILE].PktAcqLoop = ReceivePcapFileLoop; tmm_modules[TMM_RECEIVEPCAPFILE].ThreadExitPrintStats = ReceivePcapFileThreadExitStats; tmm_modules[TMM_RECEIVEPCAPFILE].ThreadDeinit = NULL; tmm_modules[TMM_RECEIVEPCAPFILE].RegisterTests = NULL; @@ -110,19 +110,13 @@ void TmModuleDecodePcapFileRegister (void) { tmm_modules[TMM_DECODEPCAPFILE].cap_flags = 0; } -void PcapFileCallback(char *user, struct pcap_pkthdr *h, u_char *pkt) { +void PcapFileCallbackLoop(char *user, struct pcap_pkthdr *h, u_char *pkt) { SCEnter(); PcapFileThreadVars *ptv = (PcapFileThreadVars *)user; + Packet *p = PacketGetFromQueueOrAlloc(); - Packet *p = NULL; - if (ptv->array_idx == 0) { - p = ptv->in_p; - } else { - p = PacketGetFromQueueOrAlloc(); - } - - if (p == NULL) { + if (unlikely(p == NULL)) { SCReturn; } @@ -134,77 +128,57 @@ void PcapFileCallback(char *user, struct pcap_pkthdr *h, u_char *pkt) { ptv->pkts++; ptv->bytes += h->caplen; - if (PacketCopyData(p, pkt, h->caplen)) + if (unlikely(PacketCopyData(p, pkt, h->caplen))) SCReturn; - //printf("PcapFileCallback: p->pktlen: %" PRIu32 " (pkt %02x, p->pkt %02x)\n", GET_PKT_LEN(p), *pkt, *GET_PKT_DATA(p)); - /* store the packet in our array */ - ptv->array[ptv->array_idx] = p; - ptv->array_idx++; + TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p); SCReturn; } /** - * \brief Main PCAP file reading function + * \brief Main PCAP file reading Loop function */ -TmEcode ReceivePcapFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) { - SCEnter(); +TmEcode ReceivePcapFileLoop(ThreadVars *tv, void *data, void *slot) { uint16_t packet_q_len = 0; - PcapFileThreadVars *ptv = (PcapFileThreadVars *)data; + TmSlot *s = (TmSlot *)slot; + ptv->slot = s->slot_next; + int r; - if (ptv->done == 1 || suricata_ctl_flags & SURICATA_STOP || - suricata_ctl_flags & SURICATA_KILL) { - SCReturnInt(TM_ECODE_FAILED); - } - - if (postpq == NULL) { - pcap_max_read_packets = 1; - } - - ptv->array_idx = 0; - ptv->in_p = p; + SCEnter(); - /* make sure we have at least one packet in the packet pool, to prevent - * us from alloc'ing packets at line rate */ - do { - packet_q_len = PacketPoolSize(); - if (packet_q_len == 0) { - PacketPoolWait(); + while (1) { + if (suricata_ctl_flags & SURICATA_STOP || + suricata_ctl_flags & SURICATA_KILL) + { + SCReturnInt(TM_ECODE_FAILED); } - } while (packet_q_len == 0); - - /* Right now we just support reading packets one at a time. */ - int r = pcap_dispatch(pcap_g.pcap_handle, (pcap_max_read_packets < packet_q_len) ? pcap_max_read_packets : packet_q_len, - (pcap_handler)PcapFileCallback, (u_char *)ptv); - if (r < 0) { - SCLogError(SC_ERR_PCAP_DISPATCH, "error code %" PRId32 " %s", - r, pcap_geterr(pcap_g.pcap_handle)); - - /* in the error state we just kill the engine */ - EngineKill(); - SCReturnInt(TM_ECODE_FAILED); - } else if (r == 0) { - SCLogInfo("pcap file end of file reached (pcap err code %" PRId32 ")", r); - - EngineStop(); - ptv->done = 1; - /* fall through */ - } - - uint16_t cnt = 0; - for (cnt = 0; cnt < ptv->array_idx; cnt++) { - Packet *pp = ptv->array[cnt]; - - pcap_g.cnt++; + /* make sure we have at least one packet in the packet pool, to prevent + * us from alloc'ing packets at line rate */ + do { + packet_q_len = PacketPoolSize(); + if (unlikely(packet_q_len == 0)) { + PacketPoolWait(); + } + } while (packet_q_len == 0); + + /* Right now we just support reading packets one at a time. */ + r = pcap_dispatch(pcap_g.pcap_handle, (int)packet_q_len, + (pcap_handler)PcapFileCallbackLoop, (u_char *)ptv); + if (unlikely(r < 0)) { + SCLogError(SC_ERR_PCAP_DISPATCH, "error code %" PRId32 " %s", + r, pcap_geterr(pcap_g.pcap_handle)); + + /* in the error state we just kill the engine */ + EngineKill(); + SCReturnInt(TM_ECODE_FAILED); + } else if (unlikely(r == 0)) { + SCLogInfo("pcap file end of file reached (pcap err code %" PRId32 ")", r); - if (cnt > 0) { - pp->pcap_cnt = pcap_g.cnt; - PacketEnqueue(postpq, pp); - } else { - pp->pcap_cnt = pcap_g.cnt; + EngineStop(); + break; } } @@ -219,11 +193,6 @@ TmEcode ReceivePcapFileThreadInit(ThreadVars *tv, void *initdata, void **data) { SCReturnInt(TM_ECODE_FAILED); } - /* use max_pending_packets as pcap read size unless it's bigger than - * our size limit */ - pcap_max_read_packets = (PCAP_FILE_MAX_PKTS < max_pending_packets) ? - PCAP_FILE_MAX_PKTS : max_pending_packets; - SCLogInfo("reading pcap file %s", (char *)initdata); PcapFileThreadVars *ptv = SCMalloc(sizeof(PcapFileThreadVars)); diff --git a/src/tm-modules.h b/src/tm-modules.h index e77bc0e2a1..07a7ce594c 100644 --- a/src/tm-modules.h +++ b/src/tm-modules.h @@ -43,6 +43,8 @@ typedef struct TmModule_ { /** the packet processing function */ TmEcode (*Func)(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); + TmEcode (*PktAcqLoop)(ThreadVars *, void *, void *); + void (*RegisterTests)(void); uint8_t cap_flags; /**< Flags to indicate the capability requierment of diff --git a/src/tm-threads.c b/src/tm-threads.c index 6437a4edd9..e9563d051b 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -41,6 +41,7 @@ #include #include "util-privs.h" #include "util-cpu.h" +#include "util-optimize.h" #ifdef OS_FREEBSD #include @@ -464,6 +465,129 @@ static inline TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, return TM_ECODE_OK; } +/* + + pcap/nfq + + pkt read + callback + process_pkt + + pfring + + pkt read + process_pkt + + slot: + setup + + pkt_ack_loop(tv, slot_data) + + deinit + + process_pkt: + while(s) + run s; + queue; + + */ + + +/** + * \brief Process the rest of the functions (if any) and queue. + */ +TmEcode TmThreadsSlotProcessPkt(ThreadVars *tv, TmSlot *s, Packet *p) { + TmEcode r; + + if (likely(p != NULL)) { + if (s != NULL ) { + /* run the thread module(s) */ + r = TmThreadsSlotVarRun(tv, p, s); + if (unlikely(r == TM_ECODE_FAILED)) { + TmqhOutputPacketpool(tv, p); + TmThreadsSetFlag(tv, THV_FAILED); + return TM_ECODE_FAILED; + } + } + + /* output the packet */ + tv->tmqh_out(tv, p); + } + + return TM_ECODE_OK; +} + +void *TmThreadsSlotPktAcqLoop(void *td) { + ThreadVars *tv = (ThreadVars *)td; + TmSlot *s = tv->tm_slots; + char run = 1; + TmEcode r = TM_ECODE_OK; + TmSlot *slot = NULL; + + /* Set the thread name */ + SCSetThreadName(tv->name); + + /* Drop the capabilities for this thread */ + SCDropCaps(tv); + + if (tv->thread_setup_flags != 0) + TmThreadSetupOptions(tv); + + /* check if we are setup properly */ + if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) { + EngineKill(); + + TmThreadsSetFlag(tv, THV_CLOSED); + pthread_exit((void *) -1); + } + + for (slot = s; slot != NULL; slot = slot->slot_next) { + if (slot->SlotThreadInit != NULL) { + r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot->slot_data); + if (r != TM_ECODE_OK) { + EngineKill(); + + TmThreadsSetFlag(tv, THV_CLOSED); + pthread_exit((void *) -1); + } + } + memset(&slot->slot_pre_pq, 0, sizeof(PacketQueue)); + memset(&slot->slot_post_pq, 0, sizeof(PacketQueue)); + } + + TmThreadsSetFlag(tv, THV_INIT_DONE); + + while(run) { + TmThreadTestThreadUnPaused(tv); + + r = s->PktAcqLoop(tv, s->slot_data, s); + + if (r == TM_ECODE_FAILED || TmThreadsCheckFlag(tv, THV_KILL)) { + run = 0; + } + } + SCPerfUpdateCounterArray(tv->sc_perf_pca, &tv->sc_perf_pctx, 0); + + for (slot = s; slot != NULL; slot = slot->slot_next) { + if (slot->SlotThreadExitPrintStats != NULL) { + slot->SlotThreadExitPrintStats(tv, slot->slot_data); + } + + if (slot->SlotThreadDeinit != NULL) { + r = slot->SlotThreadDeinit(tv, slot->slot_data); + if (r != TM_ECODE_OK) { + TmThreadsSetFlag(tv, THV_CLOSED); + pthread_exit((void *) -1); + } + } + } + + SCLogDebug("%s ending", tv->name); + TmThreadsSetFlag(tv, THV_CLOSED); + pthread_exit((void *) 0); +} + + /** * \todo Only the first "slot" currently makes the "post_pq" available * to the thread module. @@ -608,6 +732,8 @@ TmEcode TmThreadSetSlots(ThreadVars *tv, char *name, void *(*fn_p)(void *)) tv->tm_func = TmThreadsSlot1NoInOut; } else if (strcmp(name, "varslot") == 0) { tv->tm_func = TmThreadsSlotVar; + } else if (strcmp(name, "pktacqloop") == 0) { + tv->tm_func = TmThreadsSlotPktAcqLoop; } else if (strcmp(name, "custom") == 0) { if (fn_p == NULL) goto error; @@ -641,6 +767,7 @@ void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, void *data) slot->SlotThreadInit = tm->ThreadInit; slot->slot_initdata = data; slot->SlotFunc = tm->Func; + slot->PktAcqLoop = tm->PktAcqLoop; slot->SlotThreadExitPrintStats = tm->ThreadExitPrintStats; slot->SlotThreadDeinit = tm->ThreadDeinit; diff --git a/src/tm-threads.h b/src/tm-threads.h index 8bbcc11573..eab558a38c 100644 --- a/src/tm-threads.h +++ b/src/tm-threads.h @@ -39,6 +39,8 @@ typedef struct TmSlot_ { TmEcode (*SlotFunc)(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); + TmEcode (*PktAcqLoop)(ThreadVars *, void *, void *); + TmEcode (*SlotThreadInit)(ThreadVars *, void *, void **); void (*SlotThreadExitPrintStats)(ThreadVars *, void *); TmEcode (*SlotThreadDeinit)(ThreadVars *, void *); @@ -83,6 +85,8 @@ void TmThreadKillThreads(void); void TmThreadAppend(ThreadVars *, int); void TmThreadRemove(ThreadVars *, int); +TmEcode TmThreadsSlotProcessPkt(ThreadVars *, TmSlot *, Packet *); + TmEcode TmThreadSetCPUAffinity(ThreadVars *, uint16_t); TmEcode TmThreadSetThreadPriority(ThreadVars *, int); TmEcode TmThreadSetCPU(ThreadVars *, uint8_t); diff --git a/src/util-mpm-b2g-cuda.c b/src/util-mpm-b2g-cuda.c index 0451cec83f..70caa4088a 100644 --- a/src/util-mpm-b2g-cuda.c +++ b/src/util-mpm-b2g-cuda.c @@ -2332,7 +2332,7 @@ int B2gCudaStartDispatcherThreadRC(const char *name) "ERROR: TmModuleGetByName failed for Cuda_Mpm_B2g_RC"); exit(EXIT_FAILURE); } - Tm1SlotSetFunc(tv_CMB2_RC, tm_module, data); + TmSlotSetFuncAppend(tv_CMB2_RC, tm_module, data); if (TmThreadSpawn(tv_CMB2_RC) != TM_ECODE_OK) { SCLogError(SC_ERR_TM_THREADS_ERROR, "ERROR: TmThreadSpawn failed");