threading improvements. Replaced the use of slot(2/3) with varslot. Improve error handling in slot functions. Additional helper functions for thread creation

remotes/origin/master-1.0.x
Anoop Saldanha 16 years ago committed by Victor Julien
parent 03d084858c
commit f35d9f0437

@ -406,13 +406,13 @@ void AppLayerDetectProtoThreadSpawn()
{
ThreadVars *tv_applayerdetect = NULL;
tv_applayerdetect = TmThreadCreate("AppLayerDetectProtoThread", NULL, NULL, NULL, NULL,
"custom", AppLayerDetectProtoThread, 0);
tv_applayerdetect = TmThreadCreateMgmtThread("AppLayerDetectProtoThread",
AppLayerDetectProtoThread, 0);
if (tv_applayerdetect == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(1);
}
if (TmThreadSpawn(tv_applayerdetect, TVT_PPT, THV_USE) != 0) {
if (TmThreadSpawn(tv_applayerdetect) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(1);
}

@ -101,25 +101,23 @@ void PerfSpawnThreads(void)
ThreadVars *tv_mgmt = NULL;
/* Spawn the stats wakeup thread */
tv_wakeup = TmThreadCreate("PerfWakeupThread", NULL, NULL, NULL, NULL,
"custom", PerfWakeupThread, 1);
tv_wakeup = TmThreadCreateMgmtThread("PerfWakeupThread", PerfWakeupThread, 1);
if (tv_wakeup == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(1);
}
if (TmThreadSpawn(tv_wakeup, TVT_MGMT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_wakeup) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(1);
}
/* Spawn the stats mgmt thread */
tv_mgmt = TmThreadCreate("PerfMgmtThread", NULL, NULL, NULL, NULL,
"custom", PerfMgmtThread, 1);
tv_mgmt = TmThreadCreateMgmtThread("PerfMgmtThread", PerfMgmtThread, 1);
if (tv_mgmt == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(1);
}
if (TmThreadSpawn(tv_mgmt, TVT_MGMT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_mgmt) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(1);
}

@ -226,7 +226,7 @@ int RunModeIdsPcap(DetectEngineCtx *de_ctx, char *iface) {
TimeModeSetLive();
/* create the threads */
ThreadVars *tv_receivepcap = TmThreadCreate("ReceivePcap","packetpool","packetpool","pickup-queue","simple","1slot_noinout", NULL, 0);
ThreadVars *tv_receivepcap = TmThreadCreatePacketHandler("ReceivePcap","packetpool","packetpool","pickup-queue","simple","1slot_noinout");
if (tv_receivepcap == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -238,12 +238,12 @@ int RunModeIdsPcap(DetectEngineCtx *de_ctx, char *iface) {
}
Tm1SlotSetFunc(tv_receivepcap,tm_module,(void *)iface);
if (TmThreadSpawn(tv_receivepcap, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_receivepcap) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_decode1 = TmThreadCreate("Decode1","pickup-queue","simple","decode-queue1","simple","1slot", NULL, 0);
ThreadVars *tv_decode1 = TmThreadCreatePacketHandler("Decode1","pickup-queue","simple","decode-queue1","simple","1slot");
if (tv_decode1 == NULL) {
printf("ERROR: TmThreadsCreate failed for Decode1\n");
exit(EXIT_FAILURE);
@ -255,12 +255,12 @@ int RunModeIdsPcap(DetectEngineCtx *de_ctx, char *iface) {
}
Tm1SlotSetFunc(tv_decode1,tm_module,NULL);
if (TmThreadSpawn(tv_decode1, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_decode1) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_stream1 = TmThreadCreate("Stream1","decode-queue1","simple","stream-queue1","simple","1slot", NULL, 0);
ThreadVars *tv_stream1 = TmThreadCreatePacketHandler("Stream1","decode-queue1","simple","stream-queue1","simple","1slot");
if (tv_stream1 == NULL) {
printf("ERROR: TmThreadsCreate failed for Stream1\n");
exit(EXIT_FAILURE);
@ -272,12 +272,12 @@ int RunModeIdsPcap(DetectEngineCtx *de_ctx, char *iface) {
}
Tm1SlotSetFunc(tv_stream1,tm_module,NULL);
if (TmThreadSpawn(tv_stream1, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_stream1) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_detect1 = TmThreadCreate("Detect1","stream-queue1","simple","verdict-queue","simple","1slot", NULL, 0);
ThreadVars *tv_detect1 = TmThreadCreatePacketHandler("Detect1","stream-queue1","simple","verdict-queue","simple","1slot");
if (tv_detect1 == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -289,12 +289,12 @@ int RunModeIdsPcap(DetectEngineCtx *de_ctx, char *iface) {
}
Tm1SlotSetFunc(tv_detect1,tm_module,(void *)de_ctx);
if (TmThreadSpawn(tv_detect1, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_detect1) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_detect2 = TmThreadCreate("Detect2","stream-queue1","simple","verdict-queue","simple","1slot", NULL, 0);
ThreadVars *tv_detect2 = TmThreadCreatePacketHandler("Detect2","stream-queue1","simple","verdict-queue","simple","1slot");
if (tv_detect2 == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -306,12 +306,12 @@ int RunModeIdsPcap(DetectEngineCtx *de_ctx, char *iface) {
}
Tm1SlotSetFunc(tv_detect2,tm_module,(void *)de_ctx);
if (TmThreadSpawn(tv_detect2, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_detect2) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_rreject = TmThreadCreate("RespondReject","verdict-queue","simple","alert-queue1","simple","1slot", NULL, 0);
ThreadVars *tv_rreject = TmThreadCreatePacketHandler("RespondReject","verdict-queue","simple","alert-queue1","simple","1slot");
if (tv_rreject == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -323,12 +323,12 @@ int RunModeIdsPcap(DetectEngineCtx *de_ctx, char *iface) {
}
Tm1SlotSetFunc(tv_rreject,tm_module,NULL);
if (TmThreadSpawn(tv_rreject, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_rreject) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_alert = TmThreadCreate("AlertFastlog&Httplog","alert-queue1","simple","alert-queue2","simple","2slot", NULL, 0);
ThreadVars *tv_alert = TmThreadCreatePacketHandler("AlertFastlog&Httplog","alert-queue1","simple","alert-queue2","simple","varslot");
if (tv_alert == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -338,21 +338,21 @@ int RunModeIdsPcap(DetectEngineCtx *de_ctx, char *iface) {
printf("ERROR: TmModuleGetByName for AlertFastlog failed\n");
exit(EXIT_FAILURE);
}
Tm2SlotSetFunc1(tv_alert,tm_module,NULL);
TmVarSlotSetFuncAppend(tv_alert, tm_module, NULL);
tm_module = TmModuleGetByName("LogHttplog");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName failed\n");
exit(EXIT_FAILURE);
}
Tm2SlotSetFunc2(tv_alert,tm_module,NULL);
TmVarSlotSetFuncAppend(tv_alert, tm_module, NULL);
if (TmThreadSpawn(tv_alert, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_alert) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_unified = TmThreadCreate("AlertUnifiedLog","alert-queue2","simple","alert-queue3","simple","2slot", NULL, 0);
ThreadVars *tv_unified = TmThreadCreatePacketHandler("AlertUnifiedLog","alert-queue2","simple","alert-queue3","simple","varslot");
if (tv_unified == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -363,21 +363,21 @@ int RunModeIdsPcap(DetectEngineCtx *de_ctx, char *iface) {
printf("ERROR: TmModuleGetByName for AlertUnifiedLog failed\n");
exit(EXIT_FAILURE);
}
Tm2SlotSetFunc1(tv_unified,tm_module,NULL);
TmVarSlotSetFuncAppend(tv_unified, tm_module, NULL);
tm_module = TmModuleGetByName("AlertUnifiedAlert");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName for AlertUnifiedAlert failed\n");
exit(EXIT_FAILURE);
}
Tm2SlotSetFunc2(tv_unified,tm_module,NULL);
TmVarSlotSetFuncAppend(tv_unified, tm_module, NULL);
if (TmThreadSpawn(tv_unified, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_unified) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_debugalert = TmThreadCreate("AlertDebuglog","alert-queue3","simple","packetpool","packetpool","1slot", NULL, 0);
ThreadVars *tv_debugalert = TmThreadCreatePacketHandler("AlertDebuglog","alert-queue3","simple","packetpool","packetpool","1slot");
if (tv_debugalert == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -389,7 +389,7 @@ int RunModeIdsPcap(DetectEngineCtx *de_ctx, char *iface) {
}
Tm1SlotSetFunc(tv_debugalert,tm_module,NULL);
if (TmThreadSpawn(tv_debugalert, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_debugalert) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
@ -402,7 +402,7 @@ int RunModeIdsPcap2(DetectEngineCtx *de_ctx, char *iface) {
TimeModeSetLive();
/* create the threads */
ThreadVars *tv_receivepcap = TmThreadCreate("ReceivePcap","packetpool","packetpool","pickup-queue","simple","1slot_noinout", NULL, 0);
ThreadVars *tv_receivepcap = TmThreadCreatePacketHandler("ReceivePcap","packetpool","packetpool","pickup-queue","simple","1slot_noinout");
if (tv_receivepcap == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -414,12 +414,12 @@ int RunModeIdsPcap2(DetectEngineCtx *de_ctx, char *iface) {
}
Tm1SlotSetFunc(tv_receivepcap,tm_module,(void *)iface);
if (TmThreadSpawn(tv_receivepcap, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_receivepcap) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_decode1 = TmThreadCreate("Decode1","pickup-queue","simple","decode-queue1,decode-queue2,decode-queue3,decode-queue4","flow","1slot", NULL, 0);
ThreadVars *tv_decode1 = TmThreadCreatePacketHandler("Decode1","pickup-queue","simple","decode-queue1,decode-queue2,decode-queue3,decode-queue4","flow","1slot");
if (tv_decode1 == NULL) {
printf("ERROR: TmThreadsCreate failed for Decode1\n");
exit(EXIT_FAILURE);
@ -431,12 +431,12 @@ int RunModeIdsPcap2(DetectEngineCtx *de_ctx, char *iface) {
}
Tm1SlotSetFunc(tv_decode1,tm_module,NULL);
if (TmThreadSpawn(tv_decode1, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_decode1) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_stream1 = TmThreadCreate("Stream1","decode-queue1","simple","stream-queue1","simple","1slot", NULL, 0);
ThreadVars *tv_stream1 = TmThreadCreatePacketHandler("Stream1","decode-queue1","simple","stream-queue1","simple","1slot");
if (tv_stream1 == NULL) {
printf("ERROR: TmThreadsCreate failed for Stream1\n");
exit(EXIT_FAILURE);
@ -448,12 +448,12 @@ int RunModeIdsPcap2(DetectEngineCtx *de_ctx, char *iface) {
}
Tm1SlotSetFunc(tv_stream1,tm_module,NULL);
if (TmThreadSpawn(tv_stream1, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_stream1) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_stream2 = TmThreadCreate("Stream2","decode-queue2","simple","stream-queue1","simple","1slot", NULL, 0);
ThreadVars *tv_stream2 = TmThreadCreatePacketHandler("Stream2","decode-queue2","simple","stream-queue1","simple","1slot");
if (tv_stream2 == NULL) {
printf("ERROR: TmThreadsCreate failed for Stream2\n");
exit(EXIT_FAILURE);
@ -465,12 +465,12 @@ int RunModeIdsPcap2(DetectEngineCtx *de_ctx, char *iface) {
}
Tm1SlotSetFunc(tv_stream2,tm_module,NULL);
if (TmThreadSpawn(tv_stream2, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_stream2) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_stream3 = TmThreadCreate("Stream3","decode-queue3","simple","stream-queue2","simple","1slot", NULL, 0);
ThreadVars *tv_stream3 = TmThreadCreatePacketHandler("Stream3","decode-queue3","simple","stream-queue2","simple","1slot");
if (tv_stream3 == NULL) {
printf("ERROR: TmThreadsCreate failed for Stream1\n");
exit(EXIT_FAILURE);
@ -482,12 +482,12 @@ int RunModeIdsPcap2(DetectEngineCtx *de_ctx, char *iface) {
}
Tm1SlotSetFunc(tv_stream3,tm_module,NULL);
if (TmThreadSpawn(tv_stream3, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_stream3) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_stream4 = TmThreadCreate("Stream4","decode-queue4","simple","stream-queue2","simple","1slot", NULL, 0);
ThreadVars *tv_stream4 = TmThreadCreatePacketHandler("Stream4","decode-queue4","simple","stream-queue2","simple","1slot");
if (tv_stream4 == NULL) {
printf("ERROR: TmThreadsCreate failed for Stream1\n");
exit(EXIT_FAILURE);
@ -499,12 +499,12 @@ int RunModeIdsPcap2(DetectEngineCtx *de_ctx, char *iface) {
}
Tm1SlotSetFunc(tv_stream4,tm_module,NULL);
if (TmThreadSpawn(tv_stream4, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_stream4) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_detect1 = TmThreadCreate("Detect1","stream-queue1","simple","verdict-queue","simple","1slot", NULL, 0);
ThreadVars *tv_detect1 = TmThreadCreatePacketHandler("Detect1","stream-queue1","simple","verdict-queue","simple","1slot");
if (tv_detect1 == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -516,12 +516,12 @@ int RunModeIdsPcap2(DetectEngineCtx *de_ctx, char *iface) {
}
Tm1SlotSetFunc(tv_detect1,tm_module,(void *)de_ctx);
if (TmThreadSpawn(tv_detect1, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_detect1) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_detect2 = TmThreadCreate("Detect2","stream-queue2","simple","verdict-queue","simple","1slot", NULL, 0);
ThreadVars *tv_detect2 = TmThreadCreatePacketHandler("Detect2","stream-queue2","simple","verdict-queue","simple","1slot");
if (tv_detect2 == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -533,12 +533,12 @@ int RunModeIdsPcap2(DetectEngineCtx *de_ctx, char *iface) {
}
Tm1SlotSetFunc(tv_detect2,tm_module,(void *)de_ctx);
if (TmThreadSpawn(tv_detect2, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_detect2) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_rreject = TmThreadCreate("RespondReject","verdict-queue","simple","alert-queue1","simple","1slot", NULL, 0);
ThreadVars *tv_rreject = TmThreadCreatePacketHandler("RespondReject","verdict-queue","simple","alert-queue1","simple","1slot");
if (tv_rreject == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -550,12 +550,12 @@ int RunModeIdsPcap2(DetectEngineCtx *de_ctx, char *iface) {
}
Tm1SlotSetFunc(tv_rreject,tm_module,NULL);
if (TmThreadSpawn(tv_rreject, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_rreject) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_alert = TmThreadCreate("AlertFastlog&Httplog","alert-queue1","simple","alert-queue2","simple","2slot", NULL, 0);
ThreadVars *tv_alert = TmThreadCreatePacketHandler("AlertFastlog&Httplog","alert-queue1","simple","alert-queue2","simple","varslot");
if (tv_alert == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -565,21 +565,21 @@ int RunModeIdsPcap2(DetectEngineCtx *de_ctx, char *iface) {
printf("ERROR: TmModuleGetByName for AlertFastlog failed\n");
exit(EXIT_FAILURE);
}
Tm2SlotSetFunc1(tv_alert,tm_module,NULL);
TmVarSlotSetFuncAppend(tv_alert, tm_module, NULL);
tm_module = TmModuleGetByName("LogHttplog");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName failed\n");
exit(EXIT_FAILURE);
}
Tm2SlotSetFunc2(tv_alert,tm_module,NULL);
TmVarSlotSetFuncAppend(tv_alert, tm_module, NULL);
if (TmThreadSpawn(tv_alert, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_alert) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_unified = TmThreadCreate("AlertUnifiedLog","alert-queue2","simple","alert-queue3","simple","2slot", NULL, 0);
ThreadVars *tv_unified = TmThreadCreatePacketHandler("AlertUnifiedLog","alert-queue2","simple","alert-queue3","simple","varslot");
if (tv_unified == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -590,21 +590,21 @@ int RunModeIdsPcap2(DetectEngineCtx *de_ctx, char *iface) {
printf("ERROR: TmModuleGetByName for AlertUnifiedLog failed\n");
exit(EXIT_FAILURE);
}
Tm2SlotSetFunc1(tv_unified,tm_module,NULL);
TmVarSlotSetFuncAppend(tv_unified,tm_module,NULL);
tm_module = TmModuleGetByName("AlertUnifiedAlert");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName for AlertUnifiedAlert failed\n");
exit(EXIT_FAILURE);
}
Tm2SlotSetFunc2(tv_unified,tm_module,NULL);
TmVarSlotSetFuncAppend(tv_unified,tm_module,NULL);
if (TmThreadSpawn(tv_unified, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_unified) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_debugalert = TmThreadCreate("AlertDebuglog","alert-queue3","simple","packetpool","packetpool","1slot", NULL, 0);
ThreadVars *tv_debugalert = TmThreadCreatePacketHandler("AlertDebuglog","alert-queue3","simple","packetpool","packetpool","1slot");
if (tv_debugalert == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -616,7 +616,7 @@ int RunModeIdsPcap2(DetectEngineCtx *de_ctx, char *iface) {
}
Tm1SlotSetFunc(tv_debugalert,tm_module,NULL);
if (TmThreadSpawn(tv_debugalert, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_debugalert) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
@ -628,7 +628,7 @@ int RunModeIpsNFQ(DetectEngineCtx *de_ctx) {
TimeModeSetLive();
/* create the threads */
ThreadVars *tv_receivenfq = TmThreadCreate("ReceiveNFQ","packetpool","packetpool","pickup-queue","simple","1slot_noinout", NULL, 0);
ThreadVars *tv_receivenfq = TmThreadCreatePacketHandler("ReceiveNFQ","packetpool","packetpool","pickup-queue","simple","1slot_noinout");
if (tv_receivenfq == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -640,12 +640,12 @@ int RunModeIpsNFQ(DetectEngineCtx *de_ctx) {
}
Tm1SlotSetFunc(tv_receivenfq,tm_module,NULL);
if (TmThreadSpawn(tv_receivenfq, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_receivenfq) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_decode1 = TmThreadCreate("Decode1","pickup-queue","simple","decode-queue1","simple","1slot", NULL, 0);
ThreadVars *tv_decode1 = TmThreadCreatePacketHandler("Decode1","pickup-queue","simple","decode-queue1","simple","1slot");
if (tv_decode1 == NULL) {
printf("ERROR: TmThreadsCreate failed for Decode1\n");
exit(EXIT_FAILURE);
@ -657,12 +657,12 @@ int RunModeIpsNFQ(DetectEngineCtx *de_ctx) {
}
Tm1SlotSetFunc(tv_decode1,tm_module,NULL);
if (TmThreadSpawn(tv_decode1, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_decode1) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_stream1 = TmThreadCreate("Stream1","decode-queue1","simple","stream-queue1","simple","1slot", NULL, 0);
ThreadVars *tv_stream1 = TmThreadCreatePacketHandler("Stream1","decode-queue1","simple","stream-queue1","simple","1slot");
if (tv_stream1 == NULL) {
printf("ERROR: TmThreadsCreate failed for Stream1\n");
exit(EXIT_FAILURE);
@ -674,12 +674,12 @@ int RunModeIpsNFQ(DetectEngineCtx *de_ctx) {
}
Tm1SlotSetFunc(tv_stream1,tm_module,NULL);
if (TmThreadSpawn(tv_stream1, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_stream1) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_detect1 = TmThreadCreate("Detect1","stream-queue1","simple","verdict-queue","simple","1slot", NULL, 0);
ThreadVars *tv_detect1 = TmThreadCreatePacketHandler("Detect1","stream-queue1","simple","verdict-queue","simple","1slot");
if (tv_detect1 == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -691,12 +691,12 @@ int RunModeIpsNFQ(DetectEngineCtx *de_ctx) {
}
Tm1SlotSetFunc(tv_detect1,tm_module,(void *)de_ctx);
if (TmThreadSpawn(tv_detect1, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_detect1) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_detect2 = TmThreadCreate("Detect2","stream-queue1","simple","verdict-queue","simple","1slot", NULL, 0);
ThreadVars *tv_detect2 = TmThreadCreatePacketHandler("Detect2","stream-queue1","simple","verdict-queue","simple","1slot");
if (tv_detect2 == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -708,12 +708,12 @@ int RunModeIpsNFQ(DetectEngineCtx *de_ctx) {
}
Tm1SlotSetFunc(tv_detect2,tm_module,(void *)de_ctx);
if (TmThreadSpawn(tv_detect2, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_detect2) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_verdict = TmThreadCreate("Verdict","verdict-queue","simple","respond-queue","simple","1slot", NULL, 0);
ThreadVars *tv_verdict = TmThreadCreatePacketHandler("Verdict","verdict-queue","simple","respond-queue","simple","1slot");
if (tv_verdict == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -725,12 +725,12 @@ int RunModeIpsNFQ(DetectEngineCtx *de_ctx) {
}
Tm1SlotSetFunc(tv_verdict,tm_module,NULL);
if (TmThreadSpawn(tv_verdict, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_verdict) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_rreject = TmThreadCreate("RespondReject","respond-queue","simple","alert-queue1","simple","1slot", NULL, 0);
ThreadVars *tv_rreject = TmThreadCreatePacketHandler("RespondReject","respond-queue","simple","alert-queue1","simple","1slot");
if (tv_rreject == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -742,12 +742,12 @@ int RunModeIpsNFQ(DetectEngineCtx *de_ctx) {
}
Tm1SlotSetFunc(tv_rreject,tm_module,NULL);
if (TmThreadSpawn(tv_rreject, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_rreject) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_alert = TmThreadCreate("AlertFastlog&Httplog","alert-queue1","simple","alert-queue2","simple","2slot", NULL, 0);
ThreadVars *tv_alert = TmThreadCreatePacketHandler("AlertFastlog&Httplog","alert-queue1","simple","alert-queue2","simple","varslot");
if (tv_alert == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -757,21 +757,21 @@ int RunModeIpsNFQ(DetectEngineCtx *de_ctx) {
printf("ERROR: TmModuleGetByName for AlertFastlog failed\n");
exit(EXIT_FAILURE);
}
Tm2SlotSetFunc1(tv_alert,tm_module,NULL);
TmVarSlotSetFuncAppend(tv_alert, tm_module, NULL);
tm_module = TmModuleGetByName("LogHttplog");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName failed\n");
exit(EXIT_FAILURE);
}
Tm2SlotSetFunc2(tv_alert,tm_module,NULL);
TmVarSlotSetFuncAppend(tv_alert, tm_module, NULL);
if (TmThreadSpawn(tv_alert, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_alert) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_unified = TmThreadCreate("AlertUnifiedLog","alert-queue2","simple","alert-queue3","simple","2slot", NULL, 0);
ThreadVars *tv_unified = TmThreadCreatePacketHandler("AlertUnifiedLog","alert-queue2","simple","alert-queue3","simple","varslot");
if (tv_unified == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -782,21 +782,21 @@ int RunModeIpsNFQ(DetectEngineCtx *de_ctx) {
printf("ERROR: TmModuleGetByName for AlertUnifiedLog failed\n");
exit(EXIT_FAILURE);
}
Tm2SlotSetFunc1(tv_unified,tm_module,NULL);
TmVarSlotSetFuncAppend(tv_unified, tm_module, NULL);
tm_module = TmModuleGetByName("AlertUnifiedAlert");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName for AlertUnifiedAlert failed\n");
exit(EXIT_FAILURE);
}
Tm2SlotSetFunc2(tv_unified,tm_module,NULL);
TmVarSlotSetFuncAppend(tv_unified, tm_module, NULL);
if (TmThreadSpawn(tv_unified, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_unified) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_debugalert = TmThreadCreate("AlertDebuglog","alert-queue3","simple","packetpool","packetpool","1slot", NULL, 0);
ThreadVars *tv_debugalert = TmThreadCreatePacketHandler("AlertDebuglog","alert-queue3","simple","packetpool","packetpool","1slot");
if (tv_debugalert == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -808,7 +808,7 @@ int RunModeIpsNFQ(DetectEngineCtx *de_ctx) {
}
Tm1SlotSetFunc(tv_debugalert,tm_module,NULL);
if (TmThreadSpawn(tv_debugalert, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_debugalert) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
@ -821,7 +821,7 @@ int RunModeFilePcap(DetectEngineCtx *de_ctx, char *file) {
TimeModeSetOffline();
/* create the threads */
ThreadVars *tv_receivepcap = TmThreadCreate("ReceivePcapFile","packetpool","packetpool","pickup-queue","simple","1slot", NULL, 0);
ThreadVars *tv_receivepcap = TmThreadCreatePacketHandler("ReceivePcapFile","packetpool","packetpool","pickup-queue","simple","1slot");
if (tv_receivepcap == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -833,12 +833,12 @@ int RunModeFilePcap(DetectEngineCtx *de_ctx, char *file) {
}
Tm1SlotSetFunc(tv_receivepcap,tm_module,file);
if (TmThreadSpawn(tv_receivepcap, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_receivepcap) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_decode1 = TmThreadCreate("Decode1","pickup-queue","simple","decode-queue1","simple","1slot", NULL, 0);
ThreadVars *tv_decode1 = TmThreadCreatePacketHandler("Decode1","pickup-queue","simple","decode-queue1","simple","1slot");
if (tv_decode1 == NULL) {
printf("ERROR: TmThreadsCreate failed for Decode1\n");
exit(EXIT_FAILURE);
@ -850,12 +850,12 @@ int RunModeFilePcap(DetectEngineCtx *de_ctx, char *file) {
}
Tm1SlotSetFunc(tv_decode1,tm_module,NULL);
if (TmThreadSpawn(tv_decode1, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_decode1) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
//#if 0
ThreadVars *tv_stream1 = TmThreadCreate("Stream1","decode-queue1","simple","stream-queue1","simple","1slot", NULL, 0);
ThreadVars *tv_stream1 = TmThreadCreatePacketHandler("Stream1","decode-queue1","simple","stream-queue1","simple","1slot");
if (tv_stream1 == NULL) {
printf("ERROR: TmThreadsCreate failed for Stream1\n");
exit(EXIT_FAILURE);
@ -867,12 +867,12 @@ int RunModeFilePcap(DetectEngineCtx *de_ctx, char *file) {
}
Tm1SlotSetFunc(tv_stream1,tm_module,NULL);
if (TmThreadSpawn(tv_stream1, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_stream1) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_detect1 = TmThreadCreate("Detect1","stream-queue1","simple","alert-queue1","simple","1slot", NULL, 0);
ThreadVars *tv_detect1 = TmThreadCreatePacketHandler("Detect1","stream-queue1","simple","alert-queue1","simple","1slot");
//#endif
//ThreadVars *tv_detect1 = TmThreadCreate("Detect1","decode-queue1","simple","alert-queue1","simple","1slot");
if (tv_detect1 == NULL) {
@ -886,12 +886,12 @@ int RunModeFilePcap(DetectEngineCtx *de_ctx, char *file) {
}
Tm1SlotSetFunc(tv_detect1,tm_module,(void *)de_ctx);
if (TmThreadSpawn(tv_detect1, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_detect1) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_detect2 = TmThreadCreate("Detect2","stream-queue1","simple","alert-queue1","simple","1slot", NULL, 0);
ThreadVars *tv_detect2 = TmThreadCreatePacketHandler("Detect2","stream-queue1","simple","alert-queue1","simple","1slot");
if (tv_detect2 == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -903,12 +903,12 @@ int RunModeFilePcap(DetectEngineCtx *de_ctx, char *file) {
}
Tm1SlotSetFunc(tv_detect2,tm_module,(void *)de_ctx);
if (TmThreadSpawn(tv_detect2, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_detect2) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_alert = TmThreadCreate("AlertFastlog&Httplog","alert-queue1","simple","alert-queue2","simple","2slot", NULL, 0);
ThreadVars *tv_alert = TmThreadCreatePacketHandler("AlertFastlog&Httplog","alert-queue1","simple","alert-queue2","simple","varslot");
if (tv_alert == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -918,21 +918,21 @@ int RunModeFilePcap(DetectEngineCtx *de_ctx, char *file) {
printf("ERROR: TmModuleGetByName for AlertFastlog failed\n");
exit(EXIT_FAILURE);
}
Tm2SlotSetFunc1(tv_alert,tm_module,NULL);
TmVarSlotSetFuncAppend(tv_alert,tm_module,NULL);
tm_module = TmModuleGetByName("LogHttplog");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName failed\n");
exit(EXIT_FAILURE);
}
Tm2SlotSetFunc2(tv_alert,tm_module,NULL);
TmVarSlotSetFuncAppend(tv_alert,tm_module,NULL);
if (TmThreadSpawn(tv_alert, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_alert) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_unified = TmThreadCreate("AlertUnifiedLog","alert-queue2","simple","alert-queue3","simple","2slot", NULL, 0);
ThreadVars *tv_unified = TmThreadCreatePacketHandler("AlertUnifiedLog","alert-queue2","simple","alert-queue3","simple","varslot");
if (tv_unified == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -943,21 +943,21 @@ int RunModeFilePcap(DetectEngineCtx *de_ctx, char *file) {
printf("ERROR: TmModuleGetByName for AlertUnifiedLog failed\n");
exit(EXIT_FAILURE);
}
Tm2SlotSetFunc1(tv_unified,tm_module,NULL);
TmVarSlotSetFuncAppend(tv_unified,tm_module,NULL);
tm_module = TmModuleGetByName("AlertUnifiedAlert");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName for AlertUnifiedAlert failed\n");
exit(EXIT_FAILURE);
}
Tm2SlotSetFunc2(tv_unified,tm_module,NULL);
TmVarSlotSetFuncAppend(tv_unified,tm_module,NULL);
if (TmThreadSpawn(tv_unified, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_unified) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_debugalert = TmThreadCreate("AlertDebuglog","alert-queue3","simple","packetpool","packetpool","1slot", NULL, 0);
ThreadVars *tv_debugalert = TmThreadCreatePacketHandler("AlertDebuglog","alert-queue3","simple","packetpool","packetpool","1slot");
if (tv_debugalert == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -969,7 +969,7 @@ int RunModeFilePcap(DetectEngineCtx *de_ctx, char *file) {
}
Tm1SlotSetFunc(tv_debugalert,tm_module,NULL);
if (TmThreadSpawn(tv_debugalert, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv_debugalert) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
@ -984,7 +984,7 @@ int RunModeFilePcap2(DetectEngineCtx *de_ctx, char *file) {
TimeModeSetOffline();
/* create the threads */
ThreadVars *tv = TmThreadCreate("PcapFile","packetpool","packetpool","packetpool","packetpool","varslot", NULL, 0);
ThreadVars *tv = TmThreadCreatePacketHandler("PcapFile","packetpool","packetpool","packetpool","packetpool","varslot");
if (tv == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -1053,7 +1053,7 @@ int RunModeFilePcap2(DetectEngineCtx *de_ctx, char *file) {
}
TmVarSlotSetFuncAppend(tv,tm_module,NULL);
if (TmThreadSpawn(tv, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
if (TmThreadSpawn(tv) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
@ -1278,6 +1278,9 @@ int main(int argc, char **argv)
/* Spawn the perf counter threads */
PerfSpawnThreads();
/* Check if the alloted queues have at least 1 reader and writer */
TmValidateQueueState();
/* Un-pause all the paused threads */
TmThreadContinueThreads();
@ -1323,6 +1326,8 @@ int main(int argc, char **argv)
break;
}
TmThreadCheckThreadState();
usleep(100);
}

@ -467,13 +467,13 @@ void FlowManagerThreadSpawn()
{
ThreadVars *tv_flowmgr = NULL;
tv_flowmgr = TmThreadCreate("FlowManagerThread", NULL, NULL, NULL, NULL,
"custom", FlowManagerThread, 0);
tv_flowmgr = TmThreadCreateMgmtThread("FlowManagerThread", FlowManagerThread, 0);
if (tv_flowmgr == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(1);
}
if (TmThreadSpawn(tv_flowmgr, TVT_PPT, THV_USE) != 0) {
if (TmThreadSpawn(tv_flowmgr) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(1);
}

@ -9,9 +9,18 @@
/** Thread flags set and read by threads to control the threads */
#define THV_USE 0x01 /** thread is in use */
#define THV_PAUSE 0x02
#define THV_KILL 0x04
#define THV_CLOSED 0x08 /* thread done, should be joinable */
#define THV_PAUSE 0x02 /** thread has been paused */
#define THV_KILL 0x04 /** thread has been asked to cleanup and exit */
#define THV_FAILED 0x08 /** thread has encountered an error and failed */
#define THV_CLOSED 0x10 /** thread done, should be joinable */
/** Thread flags set and read by threads, to control the threads, when they
encounter certain conditions like failure */
#define THV_RESTART_THREAD 0x01 /** restart the thread */
#define THV_ENGINE_EXIT 0x02 /** shut the engine down gracefully */
/** Maximum no of times a thread can be restarted */
#define THV_MAX_RESTARTS 50
/** \brief Per thread variable structure */
typedef struct ThreadVars_ {
@ -19,6 +28,16 @@ typedef struct ThreadVars_ {
char *name;
uint8_t flags;
/** aof(action on failure) determines what should be done with the thread
when it encounters certain conditions like failures */
uint8_t aof;
/** the type of thread as defined in tm-threads.h (TVT_PPT, TVT_MGMT) */
uint8_t type;
/** no of times the thread has been restarted on failure */
uint8_t restarted;
/** queue's */
Tmq *inq;
Tmq *outq;

@ -59,3 +59,26 @@ void TmqResetQueues(void) {
tmq_id = 0;
}
/**
* \brief Checks if all the queues allocated so far have at least one reader
* and writer.
*/
void TmValidateQueueState(void)
{
int i = 0;
for (i = 0; i < tmq_id; i++) {
if (tmqs[i].reader_cnt == 0) {
printf("Error: Queue \"%s\" doesn't have a reader\n", tmqs[i].name);
goto error;
} else if (tmqs[i].writer_cnt == 0) {
printf("Error: Queue \"%s\" doesn't have a writer\n", tmqs[i].name);
goto error;
}
}
return;
error:
exit(EXIT_FAILURE);
}

@ -4,7 +4,8 @@
typedef struct Tmq_ {
char *name;
uint16_t id;
uint16_t usecnt;
uint16_t reader_cnt;
uint16_t writer_cnt;
} Tmq;
Tmq* TmqCreateQueue(char *name);
@ -12,6 +13,7 @@ Tmq* TmqGetQueueByName(char *name);
void TmqDebugList(void);
void TmqResetQueues(void);
void TmValidateQueueState(void);
#endif /* __TM_QUEUES_H__ */

@ -1,5 +1,6 @@
/** Copyright (c) 2009 Open Information Security Foundation.
* \author Victor Julien <victor@inliniac.net>
* \author Anoop Saldanha <poonaatsoc@gmail.com>
*/
#include <sys/types.h> /* for gettid(2) */
@ -15,6 +16,7 @@
#include "tm-queuehandlers.h"
#include "tm-modules.h"
#include "tm-threads.h"
#include "tmqh-packetpool.h"
/* prototypes */
static int SetCPUAffinity(int cpu);
@ -25,6 +27,10 @@ ThreadVars *tv_root[TVT_MAX] = { NULL };
/* lock to protect tv_root */
pthread_mutex_t tv_root_lock = PTHREAD_MUTEX_INITIALIZER;
/* Action On Failure(AOF). Determines how the engine should behave when a
thread encounters a failure. Defaults to restart the failed thread */
uint8_t tv_aof = THV_RESTART_THREAD;
typedef struct TmSlot_ {
/* function pointers */
int (*SlotInit)(ThreadVars *, void *, void **);
@ -46,16 +52,6 @@ typedef struct Tm1Slot_ {
TmSlot s;
} Tm1Slot;
/* 2 function slot */
typedef struct Tm2Slot_ {
TmSlot s1, s2;
} Tm2Slot;
/* 3 function slot */
typedef struct Tm3Slot_ {
TmSlot s1, s2, s3;
} Tm3Slot;
/* Variable number of function slots */
typedef struct TmVarSlot_ {
TmSlot *s;
@ -88,16 +84,20 @@ void *TmThreadsSlot1NoIn(void *td) {
TmThreadTestThreadUnPaused(tv);
r = s->s.SlotFunc(tv, p, s->s.slot_data, &s->s.slot_pq);
/* handle error */
if (r == 1) {
TmqhReleasePacketsToPacketPool(&s->s.slot_pq);
TmqhOutputPacketpool(tv, p);
tv->flags |= THV_FAILED;
break;
}
while (s->s.slot_pq.len > 0) {
Packet *extra = PacketDequeue(&s->s.slot_pq);
tv->tmqh_out(tv, extra);
}
/* XXX handle error */
if (r == 1) {
run = 0;
}
tv->tmqh_out(tv, p);
if (tv->flags & THV_KILL)
@ -147,9 +147,12 @@ void *TmThreadsSlot1NoOut(void *td) {
p = tv->tmqh_in(tv);
r = s->s.SlotFunc(tv, p, s->s.slot_data, /* no outqh no pq */NULL);
/* XXX handle error */
/* handle error */
if (r == 1) {
run = 0;
TmqhOutputPacketpool(tv, p);
tv->flags |= THV_FAILED;
break;
}
if (tv->flags & THV_KILL)
@ -199,9 +202,11 @@ void *TmThreadsSlot1NoInOut(void *td) {
r = s->s.SlotFunc(tv, NULL, s->s.slot_data, /* no outqh, no pq */NULL);
//printf("%s: TmThreadsSlot1NoInNoOut: r %" PRId32 "\n", tv->name, r);
/* XXX handle error */
/* handle error */
if (r == 1) {
run = 0;
tv->flags |= THV_FAILED;
break;
}
if (tv->flags & THV_KILL) {
@ -260,6 +265,15 @@ void *TmThreadsSlot1(void *td) {
//printf("%s: TmThreadsSlot1: p == NULL\n", tv->name);
} else {
r = s->s.SlotFunc(tv, p, s->s.slot_data, &s->s.slot_pq);
/* handle error */
if (r == 1) {
TmqhReleasePacketsToPacketPool(&s->s.slot_pq);
TmqhOutputPacketpool(tv, p);
tv->flags |= THV_FAILED;
break;
}
while (s->s.slot_pq.len > 0) {
/* handle new packets from this func */
Packet *extra_p = PacketDequeue(&s->s.slot_pq);
@ -267,10 +281,6 @@ void *TmThreadsSlot1(void *td) {
}
//printf("%s: TmThreadsSlot1: p %p, r %" PRId32 "\n", tv->name, p, r);
/* XXX handle error */
if (r == 1) {
run = 0;
}
/* output the packet */
tv->tmqh_out(tv, p);
@ -299,294 +309,20 @@ void *TmThreadsSlot1(void *td) {
pthread_exit((void *) 0);
}
void *TmThreadsSlot2(void *td) {
ThreadVars *tv = (ThreadVars *)td;
Tm2Slot *s = (Tm2Slot *)tv->tm_slots;
Packet *p = NULL;
char run = 1;
int r = 0;
if (tv->set_cpu_affinity == 1)
SetCPUAffinity(tv->cpu_affinity);
//printf("TmThreadsSlot2: %s starting\n", tv->name);
if (s->s1.SlotInit != NULL) {
r = s->s1.SlotInit(tv, s->s1.slot_initdata, &s->s1.slot_data);
if (r != 0) {
EngineKill();
tv->flags |= THV_CLOSED;
pthread_exit((void *) -1);
}
}
if (s->s2.SlotInit != NULL) {
r = s->s2.SlotInit(tv, s->s2.slot_initdata, &s->s2.slot_data);
if (r != 0) {
EngineKill();
tv->flags |= THV_CLOSED;
pthread_exit((void *) -1);
}
}
while(run) {
TmThreadTestThreadUnPaused(tv);
/* input a packet */
p = tv->tmqh_in(tv);
if (p == NULL) {
//printf("%s: TmThreadsSlot1: p == NULL\n", tv->name);
} else {
r = s->s1.SlotFunc(tv, p, s->s1.slot_data, &s->s1.slot_pq);
while (s->s1.slot_pq.len > 0) {
/* handle new packets from this func */
Packet *extra_p = PacketDequeue(&s->s1.slot_pq);
r = s->s2.SlotFunc(tv, extra_p, s->s2.slot_data, &s->s2.slot_pq);
while (s->s2.slot_pq.len > 0) {
/* handle new packets from this func */
Packet *extra_p2 = PacketDequeue(&s->s2.slot_pq);
tv->tmqh_out(tv, extra_p2);
}
if (r == 1) {
run = 0;
}
tv->tmqh_out(tv, extra_p);
}
if (r == 1) {
run = 0;
}
r = s->s2.SlotFunc(tv, p, s->s2.slot_data, &s->s2.slot_pq);
while (s->s2.slot_pq.len > 0) {
/* handle new packets from this func */
Packet *extra_p = PacketDequeue(&s->s2.slot_pq);
tv->tmqh_out(tv, extra_p);
}
//printf("%s: TmThreadsSlot1: p %p, r %" PRId32 "\n", tv->name, p, r);
/* XXX handle error */
if (r == 1) {
run = 0;
}
/* output the packet */
tv->tmqh_out(tv, p);
}
if (tv->flags & THV_KILL) {
//printf("%s: TmThreadsSlot1: KILL is set\n", tv->name);
run = 0;
}
}
if (s->s1.SlotExitPrintStats != NULL) {
s->s1.SlotExitPrintStats(tv, s->s1.slot_data);
}
if (s->s1.SlotDeinit != NULL) {
r = s->s1.SlotDeinit(tv, s->s1.slot_data);
if (r != 0) {
pthread_exit((void *) -1);
tv->flags |= THV_CLOSED;
}
}
if (s->s2.SlotExitPrintStats != NULL) {
s->s2.SlotExitPrintStats(tv, s->s2.slot_data);
}
if (s->s2.SlotDeinit != NULL) {
r = s->s2.SlotDeinit(tv, s->s2.slot_data);
if (r != 0) {
pthread_exit((void *) -1);
tv->flags |= THV_CLOSED;
}
}
//printf("TmThreadsSlot2: %s ending\n", tv->name);
tv->flags |= THV_CLOSED;
pthread_exit((void *) 0);
}
void *TmThreadsSlot3(void *td) {
ThreadVars *tv = (ThreadVars *)td;
Tm3Slot *s = (Tm3Slot *)tv->tm_slots;
Packet *p = NULL;
char run = 1;
int r = 0;
if (tv->set_cpu_affinity == 1)
SetCPUAffinity(tv->cpu_affinity);
//printf("TmThreadsSlot3: %s starting\n", tv->name);
if (s->s1.SlotInit != NULL) {
r = s->s1.SlotInit(tv, s->s1.slot_initdata, &s->s1.slot_data);
if (r != 0) {
EngineKill();
tv->flags |= THV_CLOSED;
pthread_exit((void *) -1);
}
}
if (s->s2.SlotInit != NULL) {
r = s->s2.SlotInit(tv, s->s2.slot_initdata, &s->s2.slot_data);
if (r != 0) {
EngineKill();
tv->flags |= THV_CLOSED;
pthread_exit((void *) -1);
}
}
if (s->s3.SlotInit != NULL) {
r = s->s3.SlotInit(tv, s->s3.slot_initdata, &s->s3.slot_data);
if (r != 0) {
EngineKill();
tv->flags |= THV_CLOSED;
pthread_exit((void *) -1);
}
}
while(run) {
TmThreadTestThreadUnPaused(tv);
/* input a packet */
p = tv->tmqh_in(tv);
if (p == NULL) {
//printf("%s: TmThreadsSlot1: p == NULL\n", tv->name);
} else {
/* slot 1 */
r = s->s1.SlotFunc(tv, p, s->s1.slot_data, &s->s1.slot_pq);
while (s->s1.slot_pq.len > 0) {
/* handle new packets from this func */
Packet *extra_p = PacketDequeue(&s->s1.slot_pq);
r = s->s2.SlotFunc(tv, extra_p, s->s2.slot_data, &s->s2.slot_pq);
while (s->s2.slot_pq.len > 0) {
/* handle new packets from this func */
Packet *extra_p2 = PacketDequeue(&s->s2.slot_pq);
r = s->s3.SlotFunc(tv, extra_p2, s->s3.slot_data, &s->s3.slot_pq);
while (s->s3.slot_pq.len > 0) {
/* handle new packets from this func */
Packet *extra_p3 = PacketDequeue(&s->s3.slot_pq);
tv->tmqh_out(tv, extra_p3);
}
if (r == 1) {
run = 0;
}
tv->tmqh_out(tv, extra_p2);
}
if (r == 1) {
run = 0;
}
tv->tmqh_out(tv, extra_p);
}
if (r == 1) {
run = 0;
}
/* slot 2 */
r = s->s2.SlotFunc(tv, p, s->s2.slot_data, &s->s2.slot_pq);
while (s->s2.slot_pq.len > 0) {
/* handle new packets from this func */
Packet *extra_p = PacketDequeue(&s->s2.slot_pq);
r = s->s3.SlotFunc(tv, extra_p, s->s3.slot_data, &s->s3.slot_pq);
while (s->s3.slot_pq.len > 0) {
/* handle new packets from this func */
Packet *extra_p2 = PacketDequeue(&s->s3.slot_pq);
tv->tmqh_out(tv, extra_p2);
}
if (r == 1) {
run = 0;
}
tv->tmqh_out(tv, extra_p);
}
/* slot 3 */
r = s->s3.SlotFunc(tv, p, s->s3.slot_data, &s->s3.slot_pq);
while (s->s3.slot_pq.len > 0) {
/* handle new packets from this func */
Packet *extra_p = PacketDequeue(&s->s3.slot_pq);
tv->tmqh_out(tv, extra_p);
}
//printf("%s: TmThreadsSlot1: p %p, r %" PRId32 "\n", tv->name, p, r);
/* XXX handle error */
if (r == 1) {
run = 0;
}
/* output the packet */
tv->tmqh_out(tv, p);
}
if (tv->flags & THV_KILL) {
//printf("%s: TmThreadsSlot1: KILL is set\n", tv->name);
run = 0;
}
}
if (s->s1.SlotExitPrintStats != NULL) {
s->s1.SlotExitPrintStats(tv, s->s1.slot_data);
}
if (s->s1.SlotDeinit != NULL) {
r = s->s1.SlotDeinit(tv, s->s1.slot_data);
if (r != 0) {
tv->flags |= THV_CLOSED;
pthread_exit((void *) -1);
}
}
if (s->s2.SlotExitPrintStats != NULL) {
s->s2.SlotExitPrintStats(tv, s->s2.slot_data);
}
if (s->s2.SlotDeinit != NULL) {
r = s->s2.SlotDeinit(tv, s->s2.slot_data);
if (r != 0) {
tv->flags |= THV_CLOSED;
pthread_exit((void *) -1);
}
}
if (s->s3.SlotExitPrintStats != NULL) {
s->s3.SlotExitPrintStats(tv, s->s3.slot_data);
}
if (s->s3.SlotDeinit != NULL) {
r = s->s3.SlotDeinit(tv, s->s3.slot_data);
if (r != 0) {
tv->flags |= THV_CLOSED;
pthread_exit((void *) -1);
}
}
//printf("TmThreadsSlot3: %s ending\n", tv->name);
tv->flags |= THV_CLOSED;
pthread_exit((void *) 0);
}
/* separate run function so we can call it recursively */
static inline int TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *slot) {
int r = 0;
TmSlot *s = NULL;
int retval = 0;
for (s = slot; s != NULL; s = s->slot_next) {
r = s->SlotFunc(tv, p, s->slot_data, &s->slot_pq);
/* XXX handle error */
/* handle error */
if (r == 1) {
//printf("TmThreadsSlotVarRun: s->SlotFunc %p returned 1\n", s->SlotFunc);
retval = 1;
/* Encountered error. Return packets to packetpool and return */
TmqhReleasePacketsToPacketPool(&s->slot_pq);
tv->flags |= THV_FAILED;
return 1;
}
/* handle new packets */
@ -599,14 +335,17 @@ static inline int TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *slot)
/* XXX handle error */
if (r == 1) {
//printf("TmThreadsSlotVarRun: recursive TmThreadsSlotVarRun returned 1\n");
retval = 1;
TmqhReleasePacketsToPacketPool(&s->slot_pq);
TmqhOutputPacketpool(tv, extra_p);
tv->flags |= THV_FAILED;
return 1;
}
}
tv->tmqh_out(tv, extra_p);
}
}
return retval;
return 0;
}
void *TmThreadsSlotVar(void *td) {
@ -649,7 +388,9 @@ void *TmThreadsSlotVar(void *td) {
/* XXX handle error */
if (r == 1) {
//printf("TmThreadsSlotVar: TmThreadsSlotVarRun returned 1, breaking out of the loop.\n");
run = 0;
TmqhOutputPacketpool(tv, p);
tv->flags |= THV_FAILED;
break;
}
/* output the packet */
@ -706,12 +447,6 @@ int TmThreadSetSlots(ThreadVars *tv, char *name, void *(*fn_p)(void *)) {
} else if (strcmp(name, "1slot_noinout") == 0) {
size = sizeof(Tm1Slot);
tv->tm_func = TmThreadsSlot1NoInOut;
} else if (strcmp(name, "2slot") == 0) {
size = sizeof(Tm2Slot);
tv->tm_func = TmThreadsSlot2;
} else if (strcmp(name, "3slot") == 0) {
size = sizeof(Tm3Slot);
tv->tm_func = TmThreadsSlot3;
} else if (strcmp(name, "varslot") == 0) {
size = sizeof(TmVarSlot);
tv->tm_func = TmThreadsSlotVar;
@ -721,6 +456,9 @@ int TmThreadSetSlots(ThreadVars *tv, char *name, void *(*fn_p)(void *)) {
tv->tm_func = fn_p;
return 0;
} else {
printf("Error: Slot \"%s\" not supported\n", name);
goto error;
}
tv->tm_slots = malloc(size);
@ -746,76 +484,6 @@ void Tm1SlotSetFunc(ThreadVars *tv, TmModule *tm, void *data) {
s1->s.SlotDeinit = tm->Deinit;
}
void Tm2SlotSetFunc1(ThreadVars *tv, TmModule *tm, void *data) {
Tm2Slot *s = (Tm2Slot *)tv->tm_slots;
if (s->s1.SlotFunc != NULL)
printf("Warning: slot 1 is already set tp %p, "
"overwriting with %p\n", s->s1.SlotFunc, tm->Func);
s->s1.SlotInit = tm->Init;
s->s1.slot_initdata = data;
s->s1.SlotFunc = tm->Func;
s->s1.SlotExitPrintStats = tm->ExitPrintStats;
s->s1.SlotDeinit = tm->Deinit;
}
void Tm2SlotSetFunc2(ThreadVars *tv, TmModule *tm, void *data) {
Tm2Slot *s = (Tm2Slot *)tv->tm_slots;
if (s->s2.SlotFunc != NULL)
printf("Warning: slot 2 is already set tp %p, "
"overwriting with %p\n", s->s2.SlotFunc, tm->Func);
s->s2.SlotInit = tm->Init;
s->s2.slot_initdata = data;
s->s2.SlotFunc = tm->Func;
s->s2.SlotExitPrintStats = tm->ExitPrintStats;
s->s2.SlotDeinit = tm->Deinit;
}
void Tm3SlotSetFunc1(ThreadVars *tv, TmModule *tm, void *data) {
Tm3Slot *s = (Tm3Slot *)tv->tm_slots;
if (s->s1.SlotFunc != NULL)
printf("Warning: slot 1 is already set tp %p, "
"overwriting with %p\n", s->s1.SlotFunc, tm->Func);
s->s1.SlotInit = tm->Init;
s->s1.slot_initdata = data;
s->s1.SlotFunc = tm->Func;
s->s1.SlotExitPrintStats = tm->ExitPrintStats;
s->s1.SlotDeinit = tm->Deinit;
}
void Tm3SlotSetFunc2(ThreadVars *tv, TmModule *tm, void *data) {
Tm3Slot *s = (Tm3Slot *)tv->tm_slots;
if (s->s2.SlotFunc != NULL)
printf("Warning: slot 2 is already set tp %p, "
"overwriting with %p\n", s->s2.SlotFunc, tm->Func);
s->s2.SlotInit = tm->Init;
s->s2.slot_initdata = data;
s->s2.SlotFunc = tm->Func;
s->s2.SlotExitPrintStats = tm->ExitPrintStats;
s->s2.SlotDeinit = tm->Deinit;
}
void Tm3SlotSetFunc3(ThreadVars *tv, TmModule *tm, void *data) {
Tm3Slot *s = (Tm3Slot *)tv->tm_slots;
if (s->s3.SlotFunc != NULL)
printf("Warning: slot 3 is already set tp %p, "
"overwriting with %p\n", s->s3.SlotFunc, tm->Func);
s->s3.SlotInit = tm->Init;
s->s3.slot_initdata = data;
s->s3.SlotFunc = tm->Func;
s->s3.SlotExitPrintStats = tm->ExitPrintStats;
s->s3.SlotDeinit = tm->Deinit;
}
void TmVarSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, void *data) {
TmVarSlot *s = (TmVarSlot *)tv->tm_slots;
TmSlot *slot = malloc(sizeof(TmSlot));
@ -902,6 +570,10 @@ ThreadVars *TmThreadCreate(char *name, char *inq_name, char *inqh_name,
memset(tv, 0, sizeof(ThreadVars));
tv->name = name;
/* default state for every newly created thread */
tv->flags = THV_USE | THV_PAUSE;
/* default aof for every newly created thread */
tv->aof = THV_RESTART_THREAD;
/* set the incoming queue */
if (inq_name != NULL) {
@ -912,7 +584,7 @@ ThreadVars *TmThreadCreate(char *name, char *inq_name, char *inqh_name,
}
tv->inq = tmq;
tv->inq->usecnt++;
tv->inq->reader_cnt++;
//printf("TmThreadCreate: tv->inq->id %" PRIu32 "\n", tv->inq->id);
}
if (inqh_name != NULL) {
@ -942,7 +614,7 @@ ThreadVars *TmThreadCreate(char *name, char *inq_name, char *inqh_name,
}
tv->outq = tmq;
tv->outq->usecnt++;
tv->outq->writer_cnt++;
}
//printf("TmThreadCreate: tv->outq->id %" PRIu32 "\n", tv->outq->id);
}
@ -961,6 +633,62 @@ error:
return NULL;
}
/**
* \brief Creates and returns a TV instance for a Packet Processing Thread.
* This function doesn't support custom slots, and hence shouldn't be
* supplied \"custom\" as its slot type. All PPT threads are created
* with a mucond(see TmThreadCreate declaration) of 0. Hence the tv
* conditional variables are not used to kill the thread.
*
* \param name Name of this TV instance
* \param inq_name Incoming queue name
* \param inqh_name Incoming queue handler name as set by TmqhSetup()
* \param outq_name Outgoing queue name
* \param outqh_name Outgoing queue handler as set by TmqhSetup()
* \param slots String representation for the slot function to be used
*
* \retval the newly created TV instance, or NULL on error
*/
ThreadVars *TmThreadCreatePacketHandler(char *name, char *inq_name,
char *inqh_name, char *outq_name,
char *outqh_name, char *slots)
{
ThreadVars *tv = NULL;
tv = TmThreadCreate(name, inq_name, inqh_name, outq_name, outqh_name,
slots, NULL, 0);
if (tv != NULL)
tv->type = TVT_PPT;
return tv;
}
/**
* \brief Creates and returns the TV instance for a Management thread(MGMT).
* This function supports only custom slot functions and hence a
* function pointer should be sent as an argument.
*
* \param name Name of this TV instance
* \param fn_p Pointer to function when \"slots\" is of type \"custom\"
* \param mucond Flag to indicate whether to initialize the condition
* and the mutex variables for this newly created TV.
*
* \retval the newly created TV instance, or NULL on error
*/
ThreadVars *TmThreadCreateMgmtThread(char *name, void *(fn_p)(void *),
int mucond)
{
ThreadVars *tv = NULL;
tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "custom", fn_p, mucond);
if (tv != NULL)
tv->type = TVT_MGMT;
return tv;
}
/**
* \brief Appends this TV to tv_root based on its type
*
@ -1013,14 +741,14 @@ void TmThreadKillThreads(void) {
if (t->inq != NULL) {
int i;
//printf("TmThreadKillThreads: t->inq->usecnt %" PRIu32 "\n", t->inq->usecnt);
//printf("TmThreadKillThreads: (t->inq->reader_cnt + t->inq->writer_cnt) %" PRIu32 "\n", (t->inq->reader_cnt + t->inq->writer_cnt));
/* make sure our packet pending counter doesn't block */
pthread_cond_signal(&cond_pending);
/* signal the queue for the number of users */
for (i = 0; i < t->inq->usecnt; i++)
for (i = 0; i < (t->inq->reader_cnt + t->inq->writer_cnt); i++)
pthread_cond_signal(&trans_q[t->inq->id].cond_q);
/* to be sure, signal more */
@ -1035,7 +763,7 @@ void TmThreadKillThreads(void) {
cnt++;
for (i = 0; i < t->inq->usecnt; i++)
for (i = 0; i < (t->inq->reader_cnt + t->inq->writer_cnt); i++)
pthread_cond_signal(&trans_q[t->inq->id].cond_q);
usleep(100);
@ -1079,27 +807,17 @@ void TmThreadKillThreads(void) {
/**
* \brief Spawns a thread associated with the ThreadVars instance tv
*
* \param type Type this TV belongs to.
* \param flags Flags that should be set for this thread
*
* \retval 0 on success and -1 on failure
*/
int TmThreadSpawn(ThreadVars *tv, int type, int flags)
int TmThreadSpawn(ThreadVars *tv)
{
pthread_attr_t attr;
if (type < 0 || type >= TVT_MAX) {
printf("ThreadVars of category %" PRId32 " does not exist\n", type);
return -1;
}
if (tv->tm_func == NULL) {
printf("ERROR: no thread function set\n");
return -1;
}
tv->flags = flags;
/* Initialize and set thread detached attribute */
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
@ -1110,11 +828,39 @@ int TmThreadSpawn(ThreadVars *tv, int type, int flags)
return -1;
}
TmThreadAppend(tv, type);
TmThreadAppend(tv, tv->type);
return 0;
}
/**
* \brief Sets the thread flags for a thread instance(tv)
*
* \param tv Pointer to the thread instance for which the flag has to be set
* \param flags Holds the thread state this thread instance has to be set to
*/
void TmThreadSetFlags(ThreadVars *tv, uint8_t flags)
{
if (tv != NULL)
tv->flags = flags;
return;
}
/**
* \brief Sets the aof(Action on failure) for a thread instance(tv)
*
* \param tv Pointer to the thread instance for which the aof has to be set
* \param aof Holds the aof this thread instance has to be set to
*/
void TmThreadSetAOF(ThreadVars *tv, uint8_t aof)
{
if (tv != NULL)
tv->aof = aof;
return;
}
/**
* \brief Initializes the mutex and condition variables for this TV
*
@ -1223,3 +969,64 @@ void TmThreadPauseThreads()
return;
}
/**
* \brief Restarts the thread sent as the argument
*
* \param tv Pointer to the thread instance(tv) to be restarted
*/
static void TmThreadRestartThread(ThreadVars *tv)
{
if (tv->restarted >= THV_MAX_RESTARTS) {
printf("Warning: thread restarts exceeded threshhold limit for thread"
"\"%s\"", tv->name);
/* makes sense to reset the tv_aof to engine_exit?! */
// tv->aof = THV_ENGINE_EXIT;
return;
}
tv->flags &= ~((uint8_t)(THV_CLOSED | THV_FAILED));
if (TmThreadSpawn(tv) != 0) {
printf("Error: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
tv->restarted++;
printf("Thread \"%s\" restarted\n", tv->name);
return;
}
/**
* \brief Used to check the thread for certain conditions of failure. If the
* thread has been specified to restart on failure, the thread is
* restarted. If the thread has been specified to gracefully shutdown
* the engine on failure, it does so. The global aof flag, tv_aof
* overrides the thread aof flag, if it holds a THV_ENGINE_EXIT;
*/
void TmThreadCheckThreadState(void)
{
ThreadVars *tv = NULL;
int i = 0;
for (i = 0; i < TVT_MAX; i++) {
tv = tv_root[i];
while (tv) {
if (tv->flags & THV_FAILED) {
pthread_join(tv->t, NULL);
if ( !(tv_aof & THV_ENGINE_EXIT) &&
(tv->aof & THV_RESTART_THREAD) ) {
TmThreadRestartThread(tv);
} else {
tv->flags |= THV_CLOSED;
EngineKill();
}
}
tv = tv->next;
}
}
return;
}

@ -14,24 +14,21 @@ extern pthread_mutex_t tv_root_lock;
void Tm1SlotSetFunc(ThreadVars *, TmModule *, void *);
void Tm2SlotSetFunc1(ThreadVars *, TmModule *, void *);
void TmVarSlotSetFuncAppend(ThreadVars *, TmModule *, void *);
void Tm2SlotSetFunc2(ThreadVars *, TmModule *, void *);
ThreadVars *TmThreadCreate(char *, char *, char *, char *, char *, char *,
void *(fn_p)(void *), int);
void Tm3SlotSetFunc1(ThreadVars *, TmModule *, void *);
ThreadVars *TmThreadCreatePacketHandler(char *, char *, char *, char *, char *,
char *);
void Tm3SlotSetFunc2(ThreadVars *, TmModule *, void *);
ThreadVars *TmThreadCreateMgmtThread(char *name, void *(fn_p)(void *), int);
void Tm3SlotSetFunc3(ThreadVars *, TmModule *, void *);
int TmThreadSpawn(ThreadVars *);
void TmVarSlotSetFuncAppend(ThreadVars *, TmModule *, void *);
void TmThreadSetFlags(ThreadVars *, uint8_t);
ThreadVars *TmThreadCreate(char *name, char *inq_name, char *inqh_name,
char *outq_name, char *outqh_name, char *slots,
void *(fn_p)(void *), int);
int TmThreadSpawn(ThreadVars *, int, int);
void TmThreadSetAOF(ThreadVars *, uint8_t);
void TmThreadKillThreads(void);
@ -51,5 +48,7 @@ void TmThreadPause(ThreadVars *);
void TmThreadPauseThreads(void);
void TmThreadCheckThreadState(void);
#endif /* __TM_THREADS_H__ */

@ -75,7 +75,7 @@ static int StoreQueueId(TmqhFlowCtx *ctx, char *name) {
if (tmq == NULL)
return -1;
}
tmq->usecnt++;
tmq->writer_cnt++;
uint16_t id = tmq->id;
//printf("StoreQueueId: id %u\n", id);

@ -13,8 +13,7 @@
#include "pkt-var.h"
Packet *TmqhInputPacketpool(ThreadVars *t);
void TmqhOutputPacketpool(ThreadVars *t, Packet *p);
#include "tmqh-packetpool.h"
void TmqhPacketpoolRegister (void) {
tmqh_table[TMQH_PACKETPOOL].name = "packetpool";
@ -56,6 +55,9 @@ void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
PacketQueue *q = &packet_q;
char proot = 0;
if (p == NULL)
return;
if (IS_TUNNEL_PKT(p)) {
//printf("TmqhOutputPacketpool: tunnel packet: %p %s\n", p,p->root ? "upper layer":"root");
@ -140,3 +142,27 @@ void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
mutex_unlock(&mutex_pending);
}
/**
* \brief Release all the packets in the queue back to the packetpool. Mainly
* used by threads that have failed, and wants to return the packets back
* to the packetpool.
*
* \param pq Pointer to the packetqueue from which the packets have to be
* returned back to the packetpool
*/
void TmqhReleasePacketsToPacketPool(PacketQueue *pq)
{
Packet *p = NULL;
if (pq == NULL)
return;
mutex_lock(&pq->mutex_q);
while ( (p = PacketDequeue(pq)) != NULL)
TmqhOutputPacketpool(NULL, p);
mutex_unlock(&pq->mutex_q);
return;
}

@ -3,6 +3,9 @@
#ifndef __TMQH_PACKETPOOL_H__
#define __TMQH_PACKETPOOL_H__
Packet *TmqhInputPacketpool(ThreadVars *);
void TmqhOutputPacketpool(ThreadVars *, Packet *);
void TmqhReleasePacketsToPacketPool(PacketQueue *);
void TmqhPacketpoolRegister (void);
#endif /* __TMQH_PACKETPOOL_H__ */

Loading…
Cancel
Save