source: add THV_RUNNING flag to notify of running state

Each module (thread) updates its status to indicate running.
Main thread awaits for all threads to be in a running state
before continuing the initialisation process

Implements feature 5384
(https://redmine.openinfosecfoundation.org/issues/5384)
pull/8097/head
Richard McConnell 2 years ago committed by Victor Julien
parent 9fb0137d9d
commit 13beba141c

@ -411,7 +411,7 @@ static void *StatsMgmtThread(void *arg)
}
SCLogDebug("stats_thread_data %p", &stats_thread_data);
TmThreadsSetFlag(tv_local, THV_INIT_DONE);
TmThreadsSetFlag(tv_local, THV_INIT_DONE | THV_RUNNING);
while (1) {
if (TmThreadsCheckFlag(tv_local, THV_PAUSE)) {
TmThreadsSetFlag(tv_local, THV_PAUSED);
@ -480,7 +480,8 @@ static void *StatsWakeupThread(void *arg)
return NULL;
}
TmThreadsSetFlag(tv_local, THV_INIT_DONE);
TmThreadsSetFlag(tv_local, THV_INIT_DONE | THV_RUNNING);
while (1) {
if (TmThreadsCheckFlag(tv_local, THV_PAUSE)) {
TmThreadsSetFlag(tv_local, THV_PAUSED);

@ -799,6 +799,8 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
GetWorkUnitSizing(rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec);
StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec);
TmThreadsSetFlag(th_v, THV_RUNNING);
while (1)
{
if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
@ -1063,6 +1065,8 @@ static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
struct timeval ts;
memset(&ts, 0, sizeof(ts));
TmThreadsSetFlag(th_v, THV_RUNNING);
while (1)
{
if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {

@ -560,7 +560,6 @@ static void AFPPeersListReachedInc(void)
return;
if ((SC_ATOMIC_ADD(peerslist.reached, 1) + 1) == peerslist.turn) {
SCLogInfo("All AFP capture threads are running.");
(void)SC_ATOMIC_SET(peerslist.reached, 0);
/* Set turn to 0 to skip syncrhonization when ReceiveAFPLoop is
* restarted.
@ -1339,6 +1338,10 @@ TmEcode ReceiveAFPLoop(ThreadVars *tv, void *data, void *slot)
fds.fd = ptv->socket;
fds.events = POLLIN;
// Indicate that the thread is actually running its application level code (i.e., it can poll
// packets)
TmThreadsSetFlag(tv, THV_RUNNING);
while (1) {
/* Start by checking the state of our interface */
if (unlikely(ptv->afp_state == AFP_STATE_DOWN)) {

@ -338,6 +338,10 @@ ReceiveErfDagLoop(ThreadVars *tv, void *data, void *slot)
dtv->slot = s->slot_next;
// Indicate that the thread is actually running its application level code (i.e., it can poll
// packets)
TmThreadsSetFlag(tv, THV_RUNNING);
while (1) {
if (suricata_ctl_flags & SURICATA_STOP) {
SCReturnInt(TM_ECODE_OK);

@ -116,6 +116,10 @@ TmEcode ReceiveErfFileLoop(ThreadVars *tv, void *data, void *slot)
etv->slot = ((TmSlot *)slot)->slot_next;
// Indicate that the thread is actually running its application level code (i.e., it can poll
// packets)
TmThreadsSetFlag(tv, THV_RUNNING);
while (1) {
if (suricata_ctl_flags & SURICATA_STOP) {
SCReturnInt(TM_ECODE_OK);

@ -240,6 +240,11 @@ TmEcode ReceiveIPFWLoop(ThreadVars *tv, void *data, void *slot)
SCLogInfo("Thread '%s' will run on port %d (item %d)",
tv->name, nq->port_num, ptv->ipfw_index);
// Indicate that the thread is actually running its application level code (i.e., it can poll
// packets)
TmThreadsSetFlag(tv, THV_RUNNING);
while (1) {
if (unlikely(suricata_ctl_flags != 0)) {
SCReturnInt(TM_ECODE_OK);

@ -911,6 +911,10 @@ TmEcode NapatechPacketLoop(ThreadVars *tv, void *data, void *slot)
TmSlot *s = (TmSlot *) slot;
ntv->slot = s->slot_next;
// Indicate that the thread is actually running its application level code (i.e., it can poll
// packets)
TmThreadsSetFlag(tv, THV_RUNNING);
while (!(suricata_ctl_flags & SURICATA_STOP)) {
/* make sure we have at least one packet in the packet pool, to prevent
* us from alloc'ing packets at line rate */

@ -787,6 +787,11 @@ static TmEcode ReceiveNetmapLoop(ThreadVars *tv, void *data, void *slot)
fds.events = POLLIN;
SCLogDebug("thread %s polling on %d", tv->name, fds.fd);
// Indicate that the thread is actually running its application level code (i.e., it can poll
// packets)
TmThreadsSetFlag(tv, THV_RUNNING);
for(;;) {
if (unlikely(suricata_ctl_flags != 0)) {
break;

@ -430,6 +430,10 @@ TmEcode ReceiveNFLOGLoop(ThreadVars *tv, void *data, void *slot)
SCReturnInt(TM_ECODE_FAILED);
}
// Indicate that the thread is actually running its application level code (i.e., it can poll
// packets)
TmThreadsSetFlag(tv, THV_RUNNING);
while (1) {
if (suricata_ctl_flags != 0)
break;

@ -1009,6 +1009,10 @@ TmEcode ReceiveNFQLoop(ThreadVars *tv, void *data, void *slot)
ntv->slot = ((TmSlot *) slot)->slot_next;
// Indicate that the thread is actually running its application level code (i.e., it can poll
// packets)
TmThreadsSetFlag(tv, THV_RUNNING);
while(1) {
if (unlikely(suricata_ctl_flags != 0)) {
NFQDestroyQueue(nq);

@ -171,6 +171,10 @@ TmEcode ReceivePcapFileLoop(ThreadVars *tv, void *data, void *slot)
ptv->shared.slot = s->slot_next;
ptv->shared.cb_result = TM_ECODE_OK;
// Indicate that the thread is actually running its application level code (i.e., it can poll
// packets)
TmThreadsSetFlag(tv, THV_RUNNING);
if(ptv->is_directory == 0) {
SCLogInfo("Starting file run for %s", ptv->behavior.file->filename);
status = PcapFileDispatch(ptv->behavior.file);

@ -312,6 +312,10 @@ static TmEcode ReceivePcapLoop(ThreadVars *tv, void *data, void *slot)
ptv->slot = s->slot_next;
ptv->cb_result = TM_ECODE_OK;
// Indicate that the thread is actually running its application level code (i.e., it can poll
// packets)
TmThreadsSetFlag(tv, THV_RUNNING);
while (1) {
if (suricata_ctl_flags & SURICATA_STOP) {
SCReturnInt(TM_ECODE_OK);

@ -361,6 +361,10 @@ TmEcode ReceivePfringLoop(ThreadVars *tv, void *data, void *slot)
SCReturnInt(TM_ECODE_FAILED);
}
// Indicate that the thread is actually running its application level code (i.e., it can poll
// packets)
TmThreadsSetFlag(tv, THV_RUNNING);
while(1) {
if (suricata_ctl_flags & SURICATA_STOP) {
SCReturnInt(TM_ECODE_OK);

@ -410,6 +410,10 @@ TmEcode ReceiveWinDivertLoop(ThreadVars *tv, void *data, void *slot)
WinDivertThreadVars *wd_tv = (WinDivertThreadVars *)data;
wd_tv->slot = ((TmSlot *)slot)->slot_next;
// Indicate that the thread is actually running its application level code (i.e., it can poll
// packets)
TmThreadsSetFlag(tv, THV_RUNNING);
while (true) {
if (suricata_ctl_flags & SURICATA_STOP) {
SCReturnInt(TM_ECODE_OK);

@ -31,6 +31,10 @@
#include <signal.h>
#endif
#if HAVE_LIBSYSTEMD
#include <systemd/sd-daemon.h>
#endif
#include "suricata.h"
#include "conf.h"
@ -393,6 +397,23 @@ static void GlobalsDestroy(SCInstance *suri)
suri->pid_filename = NULL;
}
/**
* \brief Used to send OS specific notification of running threads
*
* \retval TmEcode TM_ECODE_OK on success; TM_ECODE_FAILED on failure.
*/
static void OnNotifyRunning(void)
{
#if HAVE_LIBSYSTEMD
if (sd_notify(0, "READY=1") < 0) {
SCLogWarning(SC_ERR_SYSCALL, "failed to notify systemd");
/* Please refer to:
* https://www.freedesktop.org/software/systemd/man/sd_notify.html#Return%20Value
* for discussion on why failure should not be considered an error */
}
#endif
}
/** \brief make sure threads can stop the engine by calling this
* function. Purpose: pcap file mode needs to be able to tell the
* engine the file eof is reached. */
@ -2888,6 +2909,14 @@ int SuricataMain(int argc, char **argv)
/* Un-pause all the paused threads */
TmThreadContinueThreads();
/* Must ensure all threads are fully operational before continuing with init process */
if (TmThreadWaitOnThreadRunning() != TM_ECODE_OK) {
exit(EXIT_FAILURE);
}
/* Print notice and send OS specific notification of threads in running state */
OnNotifyRunning();
PostRunStartedDetectSetup(&suricata);
SCPledge();

@ -52,6 +52,7 @@ struct TmSlot_;
* rule reloads even if no packets are read by the capture method. */
#define THV_CAPTURE_INJECT_PKT BIT_U32(11)
#define THV_DEAD BIT_U32(12) /**< thread has been joined with pthread_join() */
#define THV_RUNNING BIT_U32(13) /**< thread is running */
/** \brief Per thread variable structure */
typedef struct ThreadVars_ {

@ -428,7 +428,11 @@ static void *TmThreadsSlotVar(void *td)
StatsSetupPrivate(tv);
TmThreadsSetFlag(tv, THV_INIT_DONE);
// Each 'worker' thread uses this func to process/decode the packet read.
// Each decode method is different to receive methods in that they do not
// enter infinite loops. They use this as the core loop. As a result, at this
// point the worker threads can be considered both initialized and running.
TmThreadsSetFlag(tv, THV_INIT_DONE | THV_RUNNING);
s = (TmSlot *)tv->tm_slots;
@ -1033,7 +1037,6 @@ ThreadVars *TmThreadCreatePacketHandler(const char *name, const char *inq_name,
tv->id = TmThreadsRegisterThread(tv, tv->type);
}
return tv;
}
@ -1773,10 +1776,113 @@ void TmThreadWaitForFlag(ThreadVars *tv, uint32_t flags)
void TmThreadContinue(ThreadVars *tv)
{
TmThreadsUnsetFlag(tv, THV_PAUSE);
return;
}
/**
* \brief Waits for all threads to be in a running state
*
* \retval TM_ECODE_OK if all are running or error if a thread failed
*/
TmEcode TmThreadWaitOnThreadRunning(void)
{
uint16_t RX_num = 0;
uint16_t W_num = 0;
uint16_t FM_num = 0;
uint16_t FR_num = 0;
uint16_t TX_num = 0;
struct timeval start_ts;
struct timeval cur_ts;
gettimeofday(&start_ts, NULL);
again:
SCMutexLock(&tv_root_lock);
for (int i = 0; i < TVT_MAX; i++) {
ThreadVars *tv = tv_root[i];
while (tv != NULL) {
if (TmThreadsCheckFlag(tv, (THV_FAILED | THV_CLOSED | THV_DEAD))) {
SCMutexUnlock(&tv_root_lock);
SCLogError(SC_ERR_THREAD_INIT,
"thread \"%s\" failed to "
"start: flags %04x",
tv->name, SC_ATOMIC_GET(tv->flags));
return TM_ECODE_FAILED;
}
if (!(TmThreadsCheckFlag(tv, THV_RUNNING | THV_RUNNING_DONE))) {
SCMutexUnlock(&tv_root_lock);
/* 60 seconds provided for the thread to transition from
* THV_INIT_DONE to THV_RUNNING */
gettimeofday(&cur_ts, NULL);
if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
SCLogError(SC_ERR_THREAD_INIT,
"thread \"%s\" failed to "
"start in time: flags %04x",
tv->name, SC_ATOMIC_GET(tv->flags));
return TM_ECODE_FAILED;
}
/* sleep a little to give the thread some
* time to start running */
SleepUsec(100);
goto again;
}
if (strncmp(thread_name_autofp, tv->name, strlen(thread_name_autofp)) == 0)
RX_num++;
else if (strncmp(thread_name_workers, tv->name, strlen(thread_name_workers)) == 0)
W_num++;
else if (strncmp(thread_name_verdict, tv->name, strlen(thread_name_verdict)) == 0)
TX_num++;
else if (strncmp(thread_name_flow_mgr, tv->name, strlen(thread_name_flow_mgr)) == 0)
FM_num++;
else if (strncmp(thread_name_flow_rec, tv->name, strlen(thread_name_flow_rec)) == 0)
FR_num++;
tv = tv->next;
}
}
SCMutexUnlock(&tv_root_lock);
/* Construct a welcome string displaying
* initialized thread types and counts */
uint16_t app_len = 32;
uint16_t buf_len = 256;
char append_str[app_len];
char thread_counts[buf_len];
strlcpy(thread_counts, "Threads created -> ", strlen("Threads created -> ") + 1);
if (RX_num > 0) {
snprintf(append_str, app_len, "RX: %u ", RX_num);
strlcat(thread_counts, append_str, buf_len);
}
if (W_num > 0) {
snprintf(append_str, app_len, "W: %u ", W_num);
strlcat(thread_counts, append_str, buf_len);
}
if (TX_num > 0) {
snprintf(append_str, app_len, "TX: %u ", TX_num);
strlcat(thread_counts, append_str, buf_len);
}
if (FM_num > 0) {
snprintf(append_str, app_len, "FM: %u ", FM_num);
strlcat(thread_counts, append_str, buf_len);
}
if (FR_num > 0) {
snprintf(append_str, app_len, "FR: %u ", FR_num);
strlcat(thread_counts, append_str, buf_len);
}
snprintf(append_str, app_len, " Engine started.");
strlcat(thread_counts, append_str, buf_len);
SCLogNotice("%s", thread_counts);
return TM_ECODE_OK;
}
/**
* \brief Unpauses all threads present in tv_root
*/
@ -1823,12 +1929,6 @@ void TmThreadCheckThreadState(void)
*/
TmEcode TmThreadWaitOnThreadInit(void)
{
uint16_t RX_num = 0;
uint16_t W_num = 0;
uint16_t FM_num = 0;
uint16_t FR_num = 0;
uint16_t TX_num = 0;
struct timeval start_ts;
struct timeval cur_ts;
gettimeofday(&start_ts, NULL);
@ -1877,55 +1977,11 @@ again:
return TM_ECODE_FAILED;
}
if (strncmp(thread_name_autofp, tv->name, strlen(thread_name_autofp)) == 0)
RX_num++;
else if (strncmp(thread_name_workers, tv->name, strlen(thread_name_workers)) == 0)
W_num++;
else if (strncmp(thread_name_verdict, tv->name, strlen(thread_name_verdict)) == 0)
TX_num++;
else if (strncmp(thread_name_flow_mgr, tv->name, strlen(thread_name_flow_mgr)) == 0)
FM_num++;
else if (strncmp(thread_name_flow_rec, tv->name, strlen(thread_name_flow_rec)) == 0)
FR_num++;
tv = tv->next;
}
}
SCMutexUnlock(&tv_root_lock);
/* Construct a welcome string displaying
* initialized thread types and counts */
uint16_t app_len = 32;
uint16_t buf_len = 256;
char append_str[app_len];
char thread_counts[buf_len];
strlcpy(thread_counts, "Threads created -> ", strlen("Threads created -> ") + 1);
if (RX_num > 0) {
snprintf(append_str, app_len, "RX: %u ", RX_num);
strlcat(thread_counts, append_str, buf_len);
}
if (W_num > 0) {
snprintf(append_str, app_len, "W: %u ", W_num);
strlcat(thread_counts, append_str, buf_len);
}
if (TX_num > 0) {
snprintf(append_str, app_len, "TX: %u ", TX_num);
strlcat(thread_counts, append_str, buf_len);
}
if (FM_num > 0) {
snprintf(append_str, app_len, "FM: %u ", FM_num);
strlcat(thread_counts, append_str, buf_len);
}
if (FR_num > 0) {
snprintf(append_str, app_len, "FR: %u ", FR_num);
strlcat(thread_counts, append_str, buf_len);
}
snprintf(append_str, app_len, " Engine started.");
strlcat(thread_counts, append_str, buf_len);
SCLogNotice("%s", thread_counts);
return TM_ECODE_OK;
}

@ -123,6 +123,8 @@ void TmThreadDisableReceiveThreads(void);
uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags);
TmEcode TmThreadWaitOnThreadRunning(void);
static inline void TmThreadsCleanDecodePQ(PacketQueueNoLock *pq)
{
while (1) {

@ -1130,7 +1130,8 @@ static TmEcode UnixManager(ThreadVars *th_v, void *thread_data)
th_v->cap_flags = 0;
SCDropCaps(th_v);
TmThreadsSetFlag(th_v, THV_INIT_DONE);
TmThreadsSetFlag(th_v, THV_INIT_DONE | THV_RUNNING);
while (1) {
ret = UnixMain(&command);
if (ret == 0) {

Loading…
Cancel
Save