Implement a pkt acq loop infra with support for pcap-file.

remotes/origin/master-1.1.x
Victor Julien 14 years ago
parent 975ebf2e4f
commit b753ecce50

@ -49,7 +49,7 @@ void RunModeFilePcapRegister(void)
{
RunModeRegisterNewRunMode(RUNMODE_PCAP_FILE, "single",
"Single threaded pcap file mode",
RunModeFilePcap2);
RunModeFilePcapSingle);
RunModeRegisterNewRunMode(RUNMODE_PCAP_FILE, "auto",
"Multi threaded pcap file mode",
RunModeFilePcapAuto);
@ -68,7 +68,7 @@ void RunModeFilePcapRegister(void)
/**
* \brief Single thread version of the Pcap file processing.
*/
int RunModeFilePcap2(DetectEngineCtx *de_ctx)
int RunModeFilePcapSingle(DetectEngineCtx *de_ctx)
{
char *file = NULL;
if (ConfGet("pcap_file.file", &file) == 0) {
@ -81,8 +81,8 @@ int RunModeFilePcap2(DetectEngineCtx *de_ctx)
/* create the threads */
ThreadVars *tv = TmThreadCreatePacketHandler("PcapFile",
"packetpool", "packetpool",
"packetpool","packetpool",
"varslot");
"packetpool", "packetpool",
"pktacqloop");
if (tv == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -148,7 +148,8 @@ int RunModeFilePcapAuto(DetectEngineCtx *de_ctx)
SCEnter();
char tname[16];
uint16_t cpu = 0;
TmModule *tm_module;
int cuda = 0;
RunModeInitialize();
/* Available cpus */
@ -163,166 +164,121 @@ int RunModeFilePcapAuto(DetectEngineCtx *de_ctx)
TimeModeSetOffline();
/* create the threads */
ThreadVars *tv_receivepcap =
TmThreadCreatePacketHandler("ReceivePcapFile",
"packetpool", "packetpool",
"pickup-queue", "simple",
"1slot");
if (tv_receivepcap == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
}
TmModule *tm_module = TmModuleGetByName("ReceivePcapFile");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName failed for ReceivePcap\n");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv_receivepcap, tm_module, file);
TmThreadSetCPU(tv_receivepcap, RECEIVE_CPU_SET);
if (TmThreadSpawn(tv_receivepcap) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
#if defined(__SC_CUDA_SUPPORT__)
if (PatternMatchDefaultMatcher() == MPM_B2G_CUDA) {
ThreadVars *tv_decode1 =
TmThreadCreatePacketHandler("Decode",
"pickup-queue", "simple",
"decode-queue1", "simple",
"1slot");
if (tv_decode1 == NULL) {
printf("ERROR: TmThreadsCreate failed for Decode1\n");
cuda = 1;
}
#endif
if (cuda == 0) {
/* create the threads */
ThreadVars *tv_receivepcap =
TmThreadCreatePacketHandler("ReceivePcapFile",
"packetpool", "packetpool",
"detect-queue1", "simple",
"pktacqloop");
if (tv_receivepcap == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
}
tm_module = TmModuleGetByName("DecodePcapFile");
tm_module = TmModuleGetByName("ReceivePcapFile");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName DecodePcap failed\n");
printf("ERROR: TmModuleGetByName failed for ReceivePcap\n");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv_decode1, tm_module, NULL);
TmSlotSetFuncAppend(tv_receivepcap, tm_module, file);
TmThreadSetCPU(tv_decode1, DECODE_CPU_SET);
TmThreadSetCPU(tv_receivepcap, RECEIVE_CPU_SET);
if (TmThreadSpawn(tv_decode1) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_cuda_PB =
TmThreadCreate("CUDA_PB",
"decode-queue1", "simple",
"cuda-pb-queue1", "simple",
"custom", SCCudaPBTmThreadsSlot1, 0);
if (tv_cuda_PB == NULL) {
printf("ERROR: TmThreadsCreate failed for CUDA_PB\n");
tm_module = TmModuleGetByName("DecodePcapFile");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName DecodePcap failed\n");
exit(EXIT_FAILURE);
}
tv_cuda_PB->type = TVT_PPT;
TmSlotSetFuncAppend(tv_receivepcap, tm_module, NULL);
tm_module = TmModuleGetByName("CudaPacketBatcher");
tm_module = TmModuleGetByName("StreamTcp");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName CudaPacketBatcher failed\n");
printf("ERROR: TmModuleGetByName StreamTcp failed\n");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv_cuda_PB, tm_module, (void *)de_ctx);
TmSlotSetFuncAppend(tv_receivepcap, tm_module, (void *)de_ctx);
TmThreadSetCPU(tv_cuda_PB, DETECT_CPU_SET);
TmThreadSetCPU(tv_receivepcap, DECODE_CPU_SET);
if (TmThreadSpawn(tv_cuda_PB) != TM_ECODE_OK) {
if (TmThreadSpawn(tv_receivepcap) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_stream1 =
TmThreadCreatePacketHandler("Stream1",
"cuda-pb-queue1", "simple",
"stream-queue1", "simple",
"1slot");
if (tv_stream1 == NULL) {
printf("ERROR: TmThreadsCreate failed for Stream1\n");
#if defined(__SC_CUDA_SUPPORT__)
} else {
/* create the threads */
ThreadVars *tv_receivepcap =
TmThreadCreatePacketHandler("ReceivePcapFile",
"packetpool", "packetpool",
"stream-queue1", "simple",
"pktacqloop");
if (tv_receivepcap == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
}
tm_module = TmModuleGetByName("StreamTcp");
tm_module = TmModuleGetByName("ReceivePcapFile");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName StreamTcp failed\n");
printf("ERROR: TmModuleGetByName failed for ReceivePcap\n");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv_stream1, tm_module, NULL);
TmSlotSetFuncAppend(tv_receivepcap, tm_module, file);
TmThreadSetCPU(tv_stream1, STREAM_CPU_SET);
TmThreadSetCPU(tv_receivepcap, RECEIVE_CPU_SET);
if (TmThreadSpawn(tv_stream1) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
} else {
ThreadVars *tv_decode1 =
TmThreadCreatePacketHandler("Decode & Stream",
"pickup-queue", "simple",
"stream-queue1", "simple",
"varslot");
if (tv_decode1 == NULL) {
printf("ERROR: TmThreadsCreate failed for Decode1\n");
exit(EXIT_FAILURE);
}
tm_module = TmModuleGetByName("DecodePcapFile");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName DecodePcap failed\n");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv_decode1, tm_module, NULL);
TmSlotSetFuncAppend(tv_receivepcap, tm_module, NULL);
tm_module = TmModuleGetByName("StreamTcp");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName StreamTcp failed\n");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv_decode1, tm_module, NULL);
TmSlotSetFuncAppend(tv_receivepcap, tm_module, NULL);
TmThreadSetCPU(tv_decode1, DECODE_CPU_SET);
TmThreadSetCPU(tv_receivepcap, DECODE_CPU_SET);
if (TmThreadSpawn(tv_decode1) != TM_ECODE_OK) {
if (TmThreadSpawn(tv_receivepcap) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
}
#else
ThreadVars *tv_decode1 =
TmThreadCreatePacketHandler("Decode & Stream",
"pickup-queue", "simple",
"stream-queue1", "simple",
"varslot");
if (tv_decode1 == NULL) {
printf("ERROR: TmThreadsCreate failed for Decode1\n");
exit(EXIT_FAILURE);
}
tm_module = TmModuleGetByName("DecodePcapFile");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName DecodePcap failed\n");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv_decode1, tm_module, NULL);
ThreadVars *tv_cuda_PB =
TmThreadCreate("CUDA_PB",
"stream-queue1", "simple",
"detect-queue1", "simple",
"custom", SCCudaPBTmThreadsSlot1, 0);
if (tv_cuda_PB == NULL) {
printf("ERROR: TmThreadsCreate failed for CUDA_PB\n");
exit(EXIT_FAILURE);
}
tv_cuda_PB->type = TVT_PPT;
tm_module = TmModuleGetByName("StreamTcp");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName StreamTcp failed\n");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv_decode1, tm_module, NULL);
tm_module = TmModuleGetByName("CudaPacketBatcher");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName CudaPacketBatcher failed\n");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv_cuda_PB, tm_module, (void *)de_ctx);
TmThreadSetCPU(tv_decode1, DECODE_CPU_SET);
TmThreadSetCPU(tv_cuda_PB, DETECT_CPU_SET);
if (TmThreadSpawn(tv_decode1) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
if (TmThreadSpawn(tv_cuda_PB) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
#endif
}
#endif
/* start with cpu 1 so that if we're creating an odd number of detect
* threads we're not creating the most on CPU0. */
if (ncpus > 0)
@ -344,7 +300,7 @@ int RunModeFilePcapAuto(DetectEngineCtx *de_ctx)
ThreadVars *tv_detect_ncpu =
TmThreadCreatePacketHandler(thread_name,
"stream-queue1", "simple",
"detect-queue1", "simple",
"alert-queue1", "simple",
"1slot");
if (tv_detect_ncpu == NULL) {
@ -450,7 +406,7 @@ int RunModeFilePcapAutoFp(DetectEngineCtx *de_ctx)
snprintf(qname, sizeof(qname), "pickup%"PRIu16, thread+1);
strlcat(queues, qname, sizeof(queues));
}
printf("queues %s\n", queues);
SCLogDebug("queues %s", queues);
char *file = NULL;
if (ConfGet("pcap_file.file", &file) == 0) {
@ -466,7 +422,7 @@ int RunModeFilePcapAutoFp(DetectEngineCtx *de_ctx)
TmThreadCreatePacketHandler("ReceivePcapFile",
"packetpool", "packetpool",
queues, "flow",
"varslot");
"pktacqloop");
if (tv_receivepcap == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -500,7 +456,7 @@ int RunModeFilePcapAutoFp(DetectEngineCtx *de_ctx)
snprintf(tname, sizeof(tname), "Detect%"PRIu16, thread+1);
snprintf(qname, sizeof(qname), "pickup%"PRIu16, thread+1);
printf("tname %s, qname %s\n", tname, qname);
SCLogDebug("tname %s, qname %s", tname, qname);
char *thread_name = SCStrdup(tname);
SCLogDebug("Assigning %s affinity to cpu %u", thread_name, cpu);
@ -561,20 +517,5 @@ int RunModeFilePcapAutoFp(DetectEngineCtx *de_ctx)
else
cpu++;
}
/*
ThreadVars *tv_outputs = TmThreadCreatePacketHandler("Outputs",
"alert-queue1", "simple", "packetpool", "packetpool", "varslot");
if (threading_set_cpu_affinity) {
TmThreadSetCPUAffinity(tv_outputs, 0);
if (ncpus > 1)
TmThreadSetThreadPriority(tv_outputs, PRIO_MEDIUM);
}
if (TmThreadSpawn(tv_outputs) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
*/
return 0;
}

@ -23,7 +23,7 @@
#ifndef __RUNMODE_PCAP_FILE_H__
#define __RUNMODE_PCAP_FILE_H__
int RunModeFilePcap2(DetectEngineCtx *);
int RunModeFilePcapSingle(DetectEngineCtx *);
int RunModeFilePcapAuto(DetectEngineCtx *);
int RunModeFilePcapAutoFp(DetectEngineCtx *de_ctx);
void RunModeFilePcapRegister(void);

@ -44,11 +44,13 @@
#include "util-error.h"
#include "util-privs.h"
#include "tmqh-packetpool.h"
#include "tm-threads.h"
#include "util-optimize.h"
extern uint8_t suricata_ctl_flags;
extern int max_pending_packets;
static int pcap_max_read_packets = 0;
//static int pcap_max_read_packets = 0;
typedef struct PcapFileGlobalVars_ {
pcap_t *pcap_handle;
@ -59,28 +61,25 @@ typedef struct PcapFileGlobalVars_ {
} PcapFileGlobalVars;
/** max packets < 65536 */
#define PCAP_FILE_MAX_PKTS 256
//#define PCAP_FILE_MAX_PKTS 256
typedef struct PcapFileThreadVars_
{
/* counters */
uint32_t pkts;
uint64_t bytes;
uint32_t errs;
ThreadVars *tv;
Packet *in_p;
Packet *array[PCAP_FILE_MAX_PKTS];
uint16_t array_idx;
TmSlot *slot;
uint8_t done;
uint32_t errs;
} PcapFileThreadVars;
static PcapFileGlobalVars pcap_g;
TmEcode ReceivePcapFile(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
TmEcode ReceivePcapFileLoop(ThreadVars *, void *, void *);
TmEcode ReceivePcapFileThreadInit(ThreadVars *, void *, void **);
void ReceivePcapFileThreadExitStats(ThreadVars *, void *);
TmEcode ReceivePcapFileThreadDeinit(ThreadVars *, void *);
@ -93,7 +92,8 @@ void TmModuleReceivePcapFileRegister (void) {
tmm_modules[TMM_RECEIVEPCAPFILE].name = "ReceivePcapFile";
tmm_modules[TMM_RECEIVEPCAPFILE].ThreadInit = ReceivePcapFileThreadInit;
tmm_modules[TMM_RECEIVEPCAPFILE].Func = ReceivePcapFile;
tmm_modules[TMM_RECEIVEPCAPFILE].Func = NULL;
tmm_modules[TMM_RECEIVEPCAPFILE].PktAcqLoop = ReceivePcapFileLoop;
tmm_modules[TMM_RECEIVEPCAPFILE].ThreadExitPrintStats = ReceivePcapFileThreadExitStats;
tmm_modules[TMM_RECEIVEPCAPFILE].ThreadDeinit = NULL;
tmm_modules[TMM_RECEIVEPCAPFILE].RegisterTests = NULL;
@ -110,19 +110,13 @@ void TmModuleDecodePcapFileRegister (void) {
tmm_modules[TMM_DECODEPCAPFILE].cap_flags = 0;
}
void PcapFileCallback(char *user, struct pcap_pkthdr *h, u_char *pkt) {
void PcapFileCallbackLoop(char *user, struct pcap_pkthdr *h, u_char *pkt) {
SCEnter();
PcapFileThreadVars *ptv = (PcapFileThreadVars *)user;
Packet *p = PacketGetFromQueueOrAlloc();
Packet *p = NULL;
if (ptv->array_idx == 0) {
p = ptv->in_p;
} else {
p = PacketGetFromQueueOrAlloc();
}
if (p == NULL) {
if (unlikely(p == NULL)) {
SCReturn;
}
@ -134,77 +128,57 @@ void PcapFileCallback(char *user, struct pcap_pkthdr *h, u_char *pkt) {
ptv->pkts++;
ptv->bytes += h->caplen;
if (PacketCopyData(p, pkt, h->caplen))
if (unlikely(PacketCopyData(p, pkt, h->caplen)))
SCReturn;
//printf("PcapFileCallback: p->pktlen: %" PRIu32 " (pkt %02x, p->pkt %02x)\n", GET_PKT_LEN(p), *pkt, *GET_PKT_DATA(p));
/* store the packet in our array */
ptv->array[ptv->array_idx] = p;
ptv->array_idx++;
TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p);
SCReturn;
}
/**
* \brief Main PCAP file reading function
* \brief Main PCAP file reading Loop function
*/
TmEcode ReceivePcapFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) {
SCEnter();
TmEcode ReceivePcapFileLoop(ThreadVars *tv, void *data, void *slot) {
uint16_t packet_q_len = 0;
PcapFileThreadVars *ptv = (PcapFileThreadVars *)data;
TmSlot *s = (TmSlot *)slot;
ptv->slot = s->slot_next;
int r;
if (ptv->done == 1 || suricata_ctl_flags & SURICATA_STOP ||
suricata_ctl_flags & SURICATA_KILL) {
SCReturnInt(TM_ECODE_FAILED);
}
if (postpq == NULL) {
pcap_max_read_packets = 1;
}
ptv->array_idx = 0;
ptv->in_p = p;
SCEnter();
/* make sure we have at least one packet in the packet pool, to prevent
* us from alloc'ing packets at line rate */
do {
packet_q_len = PacketPoolSize();
if (packet_q_len == 0) {
PacketPoolWait();
while (1) {
if (suricata_ctl_flags & SURICATA_STOP ||
suricata_ctl_flags & SURICATA_KILL)
{
SCReturnInt(TM_ECODE_FAILED);
}
} while (packet_q_len == 0);
/* Right now we just support reading packets one at a time. */
int r = pcap_dispatch(pcap_g.pcap_handle, (pcap_max_read_packets < packet_q_len) ? pcap_max_read_packets : packet_q_len,
(pcap_handler)PcapFileCallback, (u_char *)ptv);
if (r < 0) {
SCLogError(SC_ERR_PCAP_DISPATCH, "error code %" PRId32 " %s",
r, pcap_geterr(pcap_g.pcap_handle));
/* in the error state we just kill the engine */
EngineKill();
SCReturnInt(TM_ECODE_FAILED);
} else if (r == 0) {
SCLogInfo("pcap file end of file reached (pcap err code %" PRId32 ")", r);
EngineStop();
ptv->done = 1;
/* fall through */
}
uint16_t cnt = 0;
for (cnt = 0; cnt < ptv->array_idx; cnt++) {
Packet *pp = ptv->array[cnt];
pcap_g.cnt++;
/* make sure we have at least one packet in the packet pool, to prevent
* us from alloc'ing packets at line rate */
do {
packet_q_len = PacketPoolSize();
if (unlikely(packet_q_len == 0)) {
PacketPoolWait();
}
} while (packet_q_len == 0);
/* Right now we just support reading packets one at a time. */
r = pcap_dispatch(pcap_g.pcap_handle, (int)packet_q_len,
(pcap_handler)PcapFileCallbackLoop, (u_char *)ptv);
if (unlikely(r < 0)) {
SCLogError(SC_ERR_PCAP_DISPATCH, "error code %" PRId32 " %s",
r, pcap_geterr(pcap_g.pcap_handle));
/* in the error state we just kill the engine */
EngineKill();
SCReturnInt(TM_ECODE_FAILED);
} else if (unlikely(r == 0)) {
SCLogInfo("pcap file end of file reached (pcap err code %" PRId32 ")", r);
if (cnt > 0) {
pp->pcap_cnt = pcap_g.cnt;
PacketEnqueue(postpq, pp);
} else {
pp->pcap_cnt = pcap_g.cnt;
EngineStop();
break;
}
}
@ -219,11 +193,6 @@ TmEcode ReceivePcapFileThreadInit(ThreadVars *tv, void *initdata, void **data) {
SCReturnInt(TM_ECODE_FAILED);
}
/* use max_pending_packets as pcap read size unless it's bigger than
* our size limit */
pcap_max_read_packets = (PCAP_FILE_MAX_PKTS < max_pending_packets) ?
PCAP_FILE_MAX_PKTS : max_pending_packets;
SCLogInfo("reading pcap file %s", (char *)initdata);
PcapFileThreadVars *ptv = SCMalloc(sizeof(PcapFileThreadVars));

@ -43,6 +43,8 @@ typedef struct TmModule_ {
/** the packet processing function */
TmEcode (*Func)(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
TmEcode (*PktAcqLoop)(ThreadVars *, void *, void *);
void (*RegisterTests)(void);
uint8_t cap_flags; /**< Flags to indicate the capability requierment of

@ -41,6 +41,7 @@
#include <unistd.h>
#include "util-privs.h"
#include "util-cpu.h"
#include "util-optimize.h"
#ifdef OS_FREEBSD
#include <sched.h>
@ -464,6 +465,129 @@ static inline TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p,
return TM_ECODE_OK;
}
/*
pcap/nfq
pkt read
callback
process_pkt
pfring
pkt read
process_pkt
slot:
setup
pkt_ack_loop(tv, slot_data)
deinit
process_pkt:
while(s)
run s;
queue;
*/
/**
* \brief Process the rest of the functions (if any) and queue.
*/
TmEcode TmThreadsSlotProcessPkt(ThreadVars *tv, TmSlot *s, Packet *p) {
TmEcode r;
if (likely(p != NULL)) {
if (s != NULL ) {
/* run the thread module(s) */
r = TmThreadsSlotVarRun(tv, p, s);
if (unlikely(r == TM_ECODE_FAILED)) {
TmqhOutputPacketpool(tv, p);
TmThreadsSetFlag(tv, THV_FAILED);
return TM_ECODE_FAILED;
}
}
/* output the packet */
tv->tmqh_out(tv, p);
}
return TM_ECODE_OK;
}
void *TmThreadsSlotPktAcqLoop(void *td) {
ThreadVars *tv = (ThreadVars *)td;
TmSlot *s = tv->tm_slots;
char run = 1;
TmEcode r = TM_ECODE_OK;
TmSlot *slot = NULL;
/* Set the thread name */
SCSetThreadName(tv->name);
/* Drop the capabilities for this thread */
SCDropCaps(tv);
if (tv->thread_setup_flags != 0)
TmThreadSetupOptions(tv);
/* check if we are setup properly */
if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
EngineKill();
TmThreadsSetFlag(tv, THV_CLOSED);
pthread_exit((void *) -1);
}
for (slot = s; slot != NULL; slot = slot->slot_next) {
if (slot->SlotThreadInit != NULL) {
r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot->slot_data);
if (r != TM_ECODE_OK) {
EngineKill();
TmThreadsSetFlag(tv, THV_CLOSED);
pthread_exit((void *) -1);
}
}
memset(&slot->slot_pre_pq, 0, sizeof(PacketQueue));
memset(&slot->slot_post_pq, 0, sizeof(PacketQueue));
}
TmThreadsSetFlag(tv, THV_INIT_DONE);
while(run) {
TmThreadTestThreadUnPaused(tv);
r = s->PktAcqLoop(tv, s->slot_data, s);
if (r == TM_ECODE_FAILED || TmThreadsCheckFlag(tv, THV_KILL)) {
run = 0;
}
}
SCPerfUpdateCounterArray(tv->sc_perf_pca, &tv->sc_perf_pctx, 0);
for (slot = s; slot != NULL; slot = slot->slot_next) {
if (slot->SlotThreadExitPrintStats != NULL) {
slot->SlotThreadExitPrintStats(tv, slot->slot_data);
}
if (slot->SlotThreadDeinit != NULL) {
r = slot->SlotThreadDeinit(tv, slot->slot_data);
if (r != TM_ECODE_OK) {
TmThreadsSetFlag(tv, THV_CLOSED);
pthread_exit((void *) -1);
}
}
}
SCLogDebug("%s ending", tv->name);
TmThreadsSetFlag(tv, THV_CLOSED);
pthread_exit((void *) 0);
}
/**
* \todo Only the first "slot" currently makes the "post_pq" available
* to the thread module.
@ -608,6 +732,8 @@ TmEcode TmThreadSetSlots(ThreadVars *tv, char *name, void *(*fn_p)(void *))
tv->tm_func = TmThreadsSlot1NoInOut;
} else if (strcmp(name, "varslot") == 0) {
tv->tm_func = TmThreadsSlotVar;
} else if (strcmp(name, "pktacqloop") == 0) {
tv->tm_func = TmThreadsSlotPktAcqLoop;
} else if (strcmp(name, "custom") == 0) {
if (fn_p == NULL)
goto error;
@ -641,6 +767,7 @@ void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, void *data)
slot->SlotThreadInit = tm->ThreadInit;
slot->slot_initdata = data;
slot->SlotFunc = tm->Func;
slot->PktAcqLoop = tm->PktAcqLoop;
slot->SlotThreadExitPrintStats = tm->ThreadExitPrintStats;
slot->SlotThreadDeinit = tm->ThreadDeinit;

@ -39,6 +39,8 @@ typedef struct TmSlot_ {
TmEcode (*SlotFunc)(ThreadVars *, Packet *, void *, PacketQueue *,
PacketQueue *);
TmEcode (*PktAcqLoop)(ThreadVars *, void *, void *);
TmEcode (*SlotThreadInit)(ThreadVars *, void *, void **);
void (*SlotThreadExitPrintStats)(ThreadVars *, void *);
TmEcode (*SlotThreadDeinit)(ThreadVars *, void *);
@ -83,6 +85,8 @@ void TmThreadKillThreads(void);
void TmThreadAppend(ThreadVars *, int);
void TmThreadRemove(ThreadVars *, int);
TmEcode TmThreadsSlotProcessPkt(ThreadVars *, TmSlot *, Packet *);
TmEcode TmThreadSetCPUAffinity(ThreadVars *, uint16_t);
TmEcode TmThreadSetThreadPriority(ThreadVars *, int);
TmEcode TmThreadSetCPU(ThreadVars *, uint8_t);

@ -2332,7 +2332,7 @@ int B2gCudaStartDispatcherThreadRC(const char *name)
"ERROR: TmModuleGetByName failed for Cuda_Mpm_B2g_RC");
exit(EXIT_FAILURE);
}
Tm1SlotSetFunc(tv_CMB2_RC, tm_module, data);
TmSlotSetFuncAppend(tv_CMB2_RC, tm_module, data);
if (TmThreadSpawn(tv_CMB2_RC) != TM_ECODE_OK) {
SCLogError(SC_ERR_TM_THREADS_ERROR, "ERROR: TmThreadSpawn failed");

Loading…
Cancel
Save