diff --git a/src/flow-manager.c b/src/flow-manager.c index 71bf2e0a49..415106944e 100644 --- a/src/flow-manager.c +++ b/src/flow-manager.c @@ -591,6 +591,150 @@ void FlowManagerThreadSpawn() return; } +/** \brief Thread that manages timed out flows. + * + * \param td ThreadVars casted to void ptr + */ +void *FlowRecyclerThread(void *td) +{ + /* block usr2. usr2 to be handled by the main thread only */ + UtilSignalBlock(SIGUSR2); + + ThreadVars *th_v = (ThreadVars *)td; + struct timeval ts; + struct timespec cond_time; + int flow_update_delay_sec = FLOW_NORMAL_MODE_UPDATE_DELAY_SEC; + int flow_update_delay_nsec = FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC; + + if (th_v->thread_setup_flags != 0) + TmThreadSetupOptions(th_v); + + memset(&ts, 0, sizeof(ts)); + + /* set the thread name */ + if (SCSetThreadName(th_v->name) < 0) { + SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name"); + } else { + SCLogDebug("%s started...", th_v->name); + } + + /* Set the threads capability */ + th_v->cap_flags = 0; + SCDropCaps(th_v); + + TmThreadsSetFlag(th_v, THV_INIT_DONE); + while (1) + { + if (TmThreadsCheckFlag(th_v, THV_PAUSE)) { + TmThreadsSetFlag(th_v, THV_PAUSED); + TmThreadTestThreadUnPaused(th_v); + TmThreadsUnsetFlag(th_v, THV_PAUSED); + } + + /* Get the time */ + memset(&ts, 0, sizeof(ts)); + TimeGet(&ts); + SCLogDebug("ts %" PRIdMAX "", (intmax_t)ts.tv_sec); + + uint32_t len = 0; + FQLOCK_LOCK(&flow_recycle_q); + len = flow_recycle_q.len; + FQLOCK_UNLOCK(&flow_recycle_q); + + SCLogDebug("%u flows to recycle", len); + + if (TmThreadsCheckFlag(th_v, THV_KILL)) { + SCPerfSyncCounters(th_v); + break; + } + + cond_time.tv_sec = time(NULL) + flow_update_delay_sec; + cond_time.tv_nsec = flow_update_delay_nsec; + SCCtrlMutexLock(&flow_recycler_ctrl_mutex); + SCCtrlCondTimedwait(&flow_recycler_ctrl_cond, + &flow_recycler_ctrl_mutex, &cond_time); + SCCtrlMutexUnlock(&flow_recycler_ctrl_mutex); + + SCLogDebug("woke up..."); + + SCPerfSyncCountersIfSignalled(th_v); + } + + TmThreadsSetFlag(th_v, THV_RUNNING_DONE); + TmThreadWaitForFlag(th_v, THV_DEINIT); + + TmThreadsSetFlag(th_v, THV_CLOSED); + pthread_exit((void *) 0); + return NULL; +} + +/** \brief spawn the flow recycler thread */ +void FlowRecyclerThreadSpawn() +{ + ThreadVars *tv_flowmgr = NULL; + + SCCtrlCondInit(&flow_recycler_ctrl_cond, NULL); + SCCtrlMutexInit(&flow_recycler_ctrl_mutex, NULL); + + tv_flowmgr = TmThreadCreateMgmtThread("FlowRecyclerThread", + FlowRecyclerThread, 0); + + TmThreadSetCPU(tv_flowmgr, MANAGEMENT_CPU_SET); + + if (tv_flowmgr == NULL) { + printf("ERROR: TmThreadsCreate failed\n"); + exit(1); + } + if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) { + printf("ERROR: TmThreadSpawn failed\n"); + exit(1); + } + + return; +} + +/** + * \brief Used to kill flow recycler thread(s). + * + * \todo Kinda hackish since it uses the tv name to identify flow recycler + * thread. We need an all weather identification scheme. + */ +void FlowKillFlowRecyclerThread(void) +{ + ThreadVars *tv = NULL; + int cnt = 0; + + SCCtrlCondSignal(&flow_recycler_ctrl_cond); + + SCMutexLock(&tv_root_lock); + + /* flow manager thread(s) is/are a part of mgmt threads */ + tv = tv_root[TVT_MGMT]; + + while (tv != NULL) { + if (strcasecmp(tv->name, "FlowRecyclerThread") == 0) { + TmThreadsSetFlag(tv, THV_KILL); + TmThreadsSetFlag(tv, THV_DEINIT); + + /* be sure it has shut down */ + while (!TmThreadsCheckFlag(tv, THV_CLOSED)) { + usleep(100); + } + cnt++; + } + tv = tv->next; + } + + /* not possible, unless someone decides to rename FlowManagerThread */ + if (cnt == 0) { + SCMutexUnlock(&tv_root_lock); + abort(); + } + + SCMutexUnlock(&tv_root_lock); + return; +} + #ifdef UNITTESTS /** diff --git a/src/flow-manager.h b/src/flow-manager.h index 090e74fbc3..3ad5c9b63a 100644 --- a/src/flow-manager.h +++ b/src/flow-manager.h @@ -33,4 +33,13 @@ void FlowManagerThreadSpawn(void); void FlowKillFlowManagerThread(void); void FlowMgrRegisterTests (void); +/** flow recycler scheduling condition */ +SCCtrlCondT flow_recycler_ctrl_cond; +SCCtrlMutex flow_recycler_ctrl_mutex; +#define FlowWakeupFlowRecyclerThread() \ + SCCtrlCondSignal(&flow_recycler_ctrl_cond) + +void FlowRecyclerThreadSpawn(void); +void FlowKillFlowRecyclerThread(void); + #endif /* __FLOW_MANAGER_H__ */ diff --git a/src/suricata.c b/src/suricata.c index 5ec53a4366..85d0019f41 100644 --- a/src/suricata.c +++ b/src/suricata.c @@ -2282,6 +2282,7 @@ int main(int argc, char **argv) } /* Spawn the flow manager thread */ FlowManagerThreadSpawn(); + FlowRecyclerThreadSpawn(); StreamTcpInitConfig(STREAM_VERBOSE); SCPerfSpawnThreads(); @@ -2400,6 +2401,7 @@ int main(int argc, char **argv) if (suri.run_mode != RUNMODE_UNIX_SOCKET) { SCPerfReleaseResources(); + FlowKillFlowRecyclerThread(); FlowShutdown(); StreamTcpFreeConfig(STREAM_VERBOSE); }