From 7e09cdc26571f18c2ecdafddfb2552b76fb56225 Mon Sep 17 00:00:00 2001 From: Eric Leblond Date: Fri, 10 Aug 2012 15:32:30 +0200 Subject: [PATCH] Delay Detect threads initialization This patch modifies the init of Detect threads. They are now started with a dummy function and their initialisation is done after the signatures are loaded. Just after this, the dummy function is switched to normal one. In IPS mode, this permit to route packets without waiting for the signature to start and should fix #488. Offline mode such as pcap file don't use this mode to be sure to analyse all packets in the file. The patch introduces a "delayed-detect" configuration variable under detect-engine. It can be used to activate the feature (set to "yes" to have signature loaded after capture is started). --- src/cuda-packet-batcher.c | 5 +- src/detect.h | 3 + src/flow-timeout.c | 6 +- src/suricata.c | 58 +++++++++++++--- src/tm-threads.c | 138 +++++++++++++++++++++++++++++++++++--- src/tm-threads.h | 10 ++- src/util-mpm-b2g-cuda.c | 3 +- src/util-runmodes.c | 18 +++-- suricata.yaml.in | 3 + 9 files changed, 211 insertions(+), 33 deletions(-) diff --git a/src/cuda-packet-batcher.c b/src/cuda-packet-batcher.c index 09a5e8d0c2..16b1b2448a 100644 --- a/src/cuda-packet-batcher.c +++ b/src/cuda-packet-batcher.c @@ -338,6 +338,7 @@ void *SCCudaPBTmThreadsSlot1(void *td) TmThreadsSetFlag(tv, THV_INIT_DONE); while(run) { + TmSlotFunc SlotFunc = SC_ATOMIC_GET(s->SlotFunc); TmThreadTestThreadUnPaused(tv); /* input a packet */ @@ -354,9 +355,9 @@ void *SCCudaPBTmThreadsSlot1(void *td) * the Batcher TM(which is waiting on a cond from the previous * feeder TM). Please handle the NULL packet case in the * function that you now call */ - r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data), NULL, NULL); + r = SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data), NULL, NULL); } else { - r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data), NULL, NULL); + r = SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data), NULL, NULL); /* handle error */ if (r == TM_ECODE_FAILED) { TmqhOutputPacketpool(tv, p); diff --git a/src/detect.h b/src/detect.h index be9607e459..dbb692d473 100644 --- a/src/detect.h +++ b/src/detect.h @@ -671,6 +671,9 @@ typedef struct DetectEngineCtx_ { /** Store rule file and line so that parsers can use them in errors. */ char *rule_file; int rule_line; + + /** Is detect engine using a delayed init */ + int delayed_detect; } DetectEngineCtx; /* Engine groups profiles (low, medium, high, custom) */ diff --git a/src/flow-timeout.c b/src/flow-timeout.c index 020f5cc09a..6440f752fd 100644 --- a/src/flow-timeout.c +++ b/src/flow-timeout.c @@ -569,7 +569,8 @@ static inline void FlowForceReassemblyForHash(void) } else { TmSlot *s = stream_pseudo_pkt_detect_tm_slot; while (s != NULL) { - s->SlotFunc(NULL, p, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq, + TmSlotFunc SlotFunc = SC_ATOMIC_GET(s->SlotFunc); + SlotFunc(NULL, p, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq, &s->slot_post_pq); s = s->slot_next; } @@ -598,7 +599,8 @@ static inline void FlowForceReassemblyForHash(void) } else { TmSlot *s = stream_pseudo_pkt_detect_tm_slot; while (s != NULL) { - s->SlotFunc(NULL, p, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq, + TmSlotFunc SlotFunc = SC_ATOMIC_GET(s->SlotFunc); + SlotFunc(NULL, p, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq, &s->slot_post_pq); s = s->slot_next; } diff --git a/src/suricata.c b/src/suricata.c index 00df8ee0a1..7b8ee220a5 100644 --- a/src/suricata.c +++ b/src/suricata.c @@ -686,6 +686,7 @@ int main(int argc, char **argv) #endif /* OS_WIN32 */ int build_info = 0; int rule_reload = 0; + int delayed_detect = 0; char *log_dir; #ifdef OS_WIN32 @@ -1734,18 +1735,42 @@ int main(int argc, char **argv) if (MagicInit() != 0) exit(EXIT_FAILURE); - if (SigLoadSignatures(de_ctx, sig_file, sig_file_exclusive) < 0) { - if (sig_file == NULL) { - SCLogError(SC_ERR_OPENING_FILE, "Signature file has not been provided"); - } else { - SCLogError(SC_ERR_NO_RULES_LOADED, "Loading signatures failed."); + /* In offline mode delayed init of detect is a bad idea */ + if ((run_mode == RUNMODE_PCAP_FILE) || + (run_mode == RUNMODE_ERF_FILE) || + engine_analysis) { + delayed_detect = 0; + } else { + ConfNode *denode = NULL; + ConfNode *decnf = ConfGetNode("detect-engine"); + if (decnf != NULL) { + TAILQ_FOREACH(denode, &decnf->head, next) { + if (strcmp(denode->val, "delayed-detect") == 0) { + (void)ConfGetChildValueBool(denode, "delayed-detect", &delayed_detect); + } + } } - if (de_ctx->failure_fatal) - exit(EXIT_FAILURE); } + de_ctx->delayed_detect = delayed_detect; - if (engine_analysis) { - exit(EXIT_SUCCESS); + SCLogInfo("Delayed detect %s", delayed_detect ? "enabled" : "disabled"); + if (delayed_detect) { + SCLogInfo("Packets will start being processed before signatures are active."); + } + + if (!delayed_detect) { + if (SigLoadSignatures(de_ctx, sig_file, sig_file_exclusive) < 0) { + if (sig_file == NULL) { + SCLogError(SC_ERR_OPENING_FILE, "Signature file has not been provided"); + } else { + SCLogError(SC_ERR_NO_RULES_LOADED, "Loading signatures failed."); + } + if (de_ctx->failure_fatal) + exit(EXIT_FAILURE); + } + if (engine_analysis) { + exit(EXIT_SUCCESS); + } } /* registering singal handlers we use. We register usr2 here, so that one @@ -1855,6 +1880,21 @@ int main(int argc, char **argv) /* Un-pause all the paused threads */ TmThreadContinueThreads(); + if (delayed_detect) { + if (SigLoadSignatures(de_ctx, sig_file, sig_file_exclusive) < 0) { + if (sig_file == NULL) { + SCLogError(SC_ERR_OPENING_FILE, "Signature file has not been provided"); + } else { + SCLogError(SC_ERR_NO_RULES_LOADED, "Loading signatures failed."); + } + if (de_ctx->failure_fatal) + exit(EXIT_FAILURE); + } + TmThreadActivateDummySlot(); + SCLogInfo("Signature(s) loaded, Detect thread(s) activated."); + } + + #ifdef DBG_MEM_ALLOC SCLogInfo("Memory used at startup: %"PRIdMAX, (intmax_t)global_mem); #ifdef DBG_MEM_ALLOC_SKIP_STARTUP diff --git a/src/tm-threads.c b/src/tm-threads.c index f24abcaafc..81f584cdd6 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -1,4 +1,4 @@ -/* Copyright (C) 2007-2010 Open Information Security Foundation +/* Copyright (C) 2007-2012 Open Information Security Foundation * * You can copy, redistribute or modify this Program under the terms of * the GNU General Public License version 2 as published by the Free @@ -20,7 +20,7 @@ * * \author Victor Julien * \author Anoop Saldanha - * \author Eric Leblond + * \author Eric Leblond * * Thread management functions. */ @@ -43,6 +43,7 @@ #include "util-optimize.h" #include "util-profiling.h" #include "util-signal.h" +#include "queue.h" #ifdef PROFILE_LOCKING __thread uint64_t mutex_lock_contention; @@ -119,6 +120,16 @@ void TmThreadsUnsetFlag(ThreadVars *tv, uint8_t flag) SC_ATOMIC_AND(tv->flags, ~flag); } +/** + * \brief Function to use as dummy stack function + * + * \retval TM_ECODE_OK + */ +TmEcode TmDummyFunc(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) +{ + return TM_ECODE_OK; +} + /* 1 slot functions */ void *TmThreadsSlot1NoIn(void *td) { @@ -159,8 +170,9 @@ void *TmThreadsSlot1NoIn(void *td) while (run) { TmThreadTestThreadUnPaused(tv); + TmSlotFunc SlotFunc = SC_ATOMIC_GET(s->SlotFunc); - r = s->SlotFunc(tv, NULL, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq, &s->slot_post_pq); + r = SlotFunc(tv, NULL, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq, &s->slot_post_pq); /* handle error */ if (r == TM_ECODE_FAILED) { @@ -257,11 +269,12 @@ void *TmThreadsSlot1NoOut(void *td) while (run) { TmThreadTestThreadUnPaused(tv); + TmSlotFunc SlotFunc = SC_ATOMIC_GET(s->SlotFunc); p = tv->tmqh_in(tv); PACKET_PROFILING_TMM_START(p, s->tm_id); - r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data), /* no outqh no pq */ NULL, + r = SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data), /* no outqh no pq */ NULL, /* no outqh no pq */ NULL); PACKET_PROFILING_TMM_END(p, s->tm_id); @@ -337,9 +350,10 @@ void *TmThreadsSlot1NoInOut(void *td) TmThreadsSetFlag(tv, THV_INIT_DONE); while (run) { + TmSlotFunc SlotFunc = SC_ATOMIC_GET(s->SlotFunc); TmThreadTestThreadUnPaused(tv); - r = s->SlotFunc(tv, NULL, SC_ATOMIC_GET(s->slot_data), /* no outqh, no pq */NULL, NULL); + r = SlotFunc(tv, NULL, SC_ATOMIC_GET(s->slot_data), /* no outqh, no pq */NULL, NULL); /* handle error */ if (r == TM_ECODE_FAILED) { @@ -420,8 +434,9 @@ void *TmThreadsSlot1(void *td) p = tv->tmqh_in(tv); if (p != NULL) { + TmSlotFunc SlotFunc = SC_ATOMIC_GET(s->SlotFunc); PACKET_PROFILING_TMM_START(p, s->tm_id); - r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq, + r = SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq, &s->slot_post_pq); PACKET_PROFILING_TMM_END(p, s->tm_id); @@ -500,12 +515,13 @@ TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, Packet *extra_p; for (s = slot; s != NULL; s = s->slot_next) { + TmSlotFunc SlotFunc = SC_ATOMIC_GET(s->SlotFunc); PACKET_PROFILING_TMM_START(p, s->tm_id); if (unlikely(s->id == 0)) { - r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq, &s->slot_post_pq); + r = SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq, &s->slot_post_pq); } else { - r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq, NULL); + r = SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq, NULL); } PACKET_PROFILING_TMM_END(p, s->tm_id); @@ -885,20 +901,23 @@ ThreadVars *TmThreadsGetTVContainingSlot(TmSlot *tm_slot) * \param tv TV the slot is attached to. * \param tm TM to append. * \param data Data to be passed on to the slot init function. + * + * \retval The allocated TmSlot or NULL if there is an error */ -void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, void *data) +static inline TmSlot * _TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, void *data) { TmSlot *s = (TmSlot *)tv->tm_slots; TmSlot *slot = SCMalloc(sizeof(TmSlot)); if (slot == NULL) - return; + return NULL; memset(slot, 0, sizeof(TmSlot)); SC_ATOMIC_INIT(slot->slot_data); slot->tv = tv; slot->SlotThreadInit = tm->ThreadInit; slot->slot_initdata = data; - slot->SlotFunc = tm->Func; + SC_ATOMIC_INIT(slot->SlotFunc); + SC_ATOMIC_SET(slot->SlotFunc, tm->Func); slot->PktAcqLoop = tm->PktAcqLoop; slot->SlotThreadExitPrintStats = tm->ThreadExitPrintStats; slot->SlotThreadDeinit = tm->ThreadDeinit; @@ -925,9 +944,106 @@ void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, void *data) } } + return slot; +} + +/** + * \brief Appends a new entry to the slots. + * + * \param tv TV the slot is attached to. + * \param tm TM to append. + * \param data Data to be passed on to the slot init function. + */ +void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, void *data) +{ + _TmSlotSetFuncAppend(tv, tm, data); +} + +typedef struct TmDummySlot_ { + TmSlot *slot; + TmEcode (*SlotFunc)(ThreadVars *, Packet *, void *, PacketQueue *, + PacketQueue *); + TmEcode (*SlotThreadInit)(ThreadVars *, void *, void **); + TAILQ_ENTRY(TmDummySlot_) next; +} TmDummySlot; + +static TAILQ_HEAD(, TmDummySlot_) dummy_slots = + TAILQ_HEAD_INITIALIZER(dummy_slots); + +/** + * \brief Appends a new entry to the slots with a delayed option. + * + * \param tv TV the slot is attached to. + * \param tm TM to append. + * \param data Data to be passed on to the slot init function. + * \param delayed Delayed start of slot if equal to 1 + */ +void TmSlotSetFuncAppendDelayed(ThreadVars *tv, TmModule *tm, void *data, + int delayed) +{ + TmSlot *slot = _TmSlotSetFuncAppend(tv, tm, data); + TmDummySlot *dslot = NULL; + + if ((slot == NULL) || (delayed == 0)) { + return; + } + + dslot = SCMalloc(sizeof(TmDummySlot)); + if (dslot == NULL) { + return; + } + + memset(dslot, 0, sizeof(*dslot)); + + dslot->SlotFunc = SC_ATOMIC_GET(slot->SlotFunc); + SC_ATOMIC_SET(slot->SlotFunc, TmDummyFunc); + dslot->SlotThreadInit = slot->SlotThreadInit; + slot->SlotThreadInit = NULL; + dslot->slot = slot; + + TAILQ_INSERT_TAIL(&dummy_slots, dslot, next); + return; } +/** + * \brief Activate slots that have been started in delayed mode + */ +void TmThreadActivateDummySlot() +{ + TmDummySlot *dslot; + TmSlot *s; + TmEcode r = TM_ECODE_OK; + + TAILQ_FOREACH(dslot, &dummy_slots, next) { + void *slot_data = NULL; + s = dslot->slot; + if (dslot->SlotThreadInit != NULL) { + s->SlotThreadInit = dslot->SlotThreadInit; + r = s->SlotThreadInit(s->tv, s->slot_initdata, &slot_data); + if (r != TM_ECODE_OK) { + EngineKill(); + TmThreadsSetFlag(s->tv, THV_CLOSED | THV_RUNNING_DONE); + } + SC_ATOMIC_SET(s->slot_data, slot_data); + } + SC_ATOMIC_CAS(&s->SlotFunc, TmDummyFunc, dslot->SlotFunc); + } +} + +/** + * \brief Deactivate slots that have been started in delayed mode. + */ +void TmThreadDeActivateDummySlot() +{ + TmDummySlot *dslot; + + TAILQ_FOREACH(dslot, &dummy_slots, next) { + SC_ATOMIC_CAS(&dslot->slot->SlotFunc, dslot->SlotFunc, TmDummyFunc); + dslot->slot->SlotThreadInit = NULL; + } +} + /** * \brief Returns the slot holding a TM with the particular tm_id. * diff --git a/src/tm-threads.h b/src/tm-threads.h index daee37c6b9..81e98d1ac1 100644 --- a/src/tm-threads.h +++ b/src/tm-threads.h @@ -29,13 +29,15 @@ #include "tm-threads-common.h" #include "tm-modules.h" +typedef TmEcode (*TmSlotFunc)(ThreadVars *, Packet *, void *, PacketQueue *, + PacketQueue *); + typedef struct TmSlot_ { /* the TV holding this slot */ ThreadVars *tv; /* function pointers */ - TmEcode (*SlotFunc)(ThreadVars *, Packet *, void *, PacketQueue *, - PacketQueue *); + SC_ATOMIC_DECLARE(TmSlotFunc, SlotFunc); TmEcode (*PktAcqLoop)(ThreadVars *, void *, void *); @@ -72,6 +74,7 @@ extern ThreadVars *tv_root[TVT_MAX]; extern SCMutex tv_root_lock; void TmSlotSetFuncAppend(ThreadVars *, TmModule *, void *); +void TmSlotSetFuncAppendDelayed(ThreadVars *, TmModule *, void *, int delayed); TmSlot *TmSlotGetSlotForTM(int); ThreadVars *TmThreadCreate(char *, char *, char *, char *, char *, char *, @@ -104,6 +107,9 @@ void TmThreadCheckThreadState(void); TmEcode TmThreadWaitOnThreadInit(void); ThreadVars *TmThreadsGetCallingThread(void); +void TmThreadActivateDummySlot(void); +void TmThreadDeActivateDummySlot(void); + int TmThreadsCheckFlag(ThreadVars *, uint8_t); void TmThreadsSetFlag(ThreadVars *, uint8_t); void TmThreadsUnsetFlag(ThreadVars *, uint8_t); diff --git a/src/util-mpm-b2g-cuda.c b/src/util-mpm-b2g-cuda.c index 95881ff77d..7c11e51d28 100644 --- a/src/util-mpm-b2g-cuda.c +++ b/src/util-mpm-b2g-cuda.c @@ -2267,7 +2267,8 @@ void *CudaMpmB2gThreadsSlot1(void *td) * If the MPM is configured to use multiple CUstreams, buffer (1) and buffer (2) are * processed in parallel using multiple streams; In this case * data_queues[tctx->tmq_streamq->id] will contain the results of packet buffer (2). */ - r = s->SlotFunc(tv, + TmSlotFunc SlotFunc = SC_ATOMIC_GET(s->SlotFunc); + r = SlotFunc(tv, (Packet *)data, (void *)tctx, (PacketQueue *)&data_queues[tv->inq->id], diff --git a/src/util-runmodes.c b/src/util-runmodes.c index 7aa693a6cf..ea3d70aca9 100644 --- a/src/util-runmodes.c +++ b/src/util-runmodes.c @@ -330,7 +330,8 @@ int RunModeSetLiveCaptureAuto(DetectEngineCtx *de_ctx, SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName Detect failed"); exit(EXIT_FAILURE); } - TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, (void *)de_ctx); + TmSlotSetFuncAppendDelayed(tv_detect_ncpu, tm_module, + (void *)de_ctx, de_ctx->delayed_detect); TmThreadSetCPU(tv_detect_ncpu, DETECT_CPU_SET); @@ -574,7 +575,8 @@ int RunModeSetLiveCaptureAutoFp(DetectEngineCtx *de_ctx, SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName Detect failed"); exit(EXIT_FAILURE); } - TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, (void *)de_ctx); + TmSlotSetFuncAppendDelayed(tv_detect_ncpu, tm_module, + (void *)de_ctx, de_ctx->delayed_detect); TmThreadSetCPU(tv_detect_ncpu, DETECT_CPU_SET); @@ -674,7 +676,8 @@ static int RunModeSetLiveCaptureWorkersForDevice(DetectEngineCtx *de_ctx, SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName Detect failed"); exit(EXIT_FAILURE); } - TmSlotSetFuncAppend(tv, tm_module, (void *)de_ctx); + TmSlotSetFuncAppendDelayed(tv, tm_module, + (void *)de_ctx, de_ctx->delayed_detect); tm_module = TmModuleGetByName("RespondReject"); if (tm_module == NULL) { @@ -879,7 +882,8 @@ int RunModeSetIPSAuto(DetectEngineCtx *de_ctx, printf("ERROR: TmModuleGetByName Detect failed\n"); exit(EXIT_FAILURE); } - TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, (void *)de_ctx); + TmSlotSetFuncAppendDelayed(tv_detect_ncpu, tm_module, + (void *)de_ctx, de_ctx->delayed_detect); TmThreadSetCPU(tv_detect_ncpu, DETECT_CPU_SET); @@ -1064,7 +1068,8 @@ int RunModeSetIPSAutoFp(DetectEngineCtx *de_ctx, SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName Detect failed"); exit(EXIT_FAILURE); } - TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, (void *)de_ctx); + TmSlotSetFuncAppendDelayed(tv_detect_ncpu, tm_module, + (void *)de_ctx, de_ctx->delayed_detect); TmThreadSetCPU(tv_detect_ncpu, DETECT_CPU_SET); @@ -1181,7 +1186,8 @@ int RunModeSetIPSWorker(DetectEngineCtx *de_ctx, SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName Detect failed"); exit(EXIT_FAILURE); } - TmSlotSetFuncAppend(tv, tm_module, (void *)de_ctx); + TmSlotSetFuncAppendDelayed(tv, tm_module, + (void *)de_ctx, de_ctx->delayed_detect); tm_module = TmModuleGetByName(verdict_mod_name); if (tm_module == NULL) { diff --git a/suricata.yaml.in b/suricata.yaml.in index c0526405ca..a3bbabf663 100644 --- a/suricata.yaml.in +++ b/suricata.yaml.in @@ -294,6 +294,9 @@ detect-engine: # When rule-reload is enabled, sending a USR2 signal to the Suricata process # will trigger a live rule reload. Experimental feature, use with care. #- rule-reload: true + # If set to yes, the loading of signatures will be made after the capture + # is started. This will limit the downtime in IPS mode. + #- delayed-detect: yes # Suricata is multi-threaded. Here the threading can be influenced. threading: