flow: introduce FlowRecycler stub

FlowRecycler thread stub. Start/stop code.
pull/1058/head
Victor Julien 12 years ago
parent e892d99827
commit 94cb52897b

@ -591,6 +591,150 @@ void FlowManagerThreadSpawn()
return;
}
/** \brief Thread that manages timed out flows.
*
* \param td ThreadVars casted to void ptr
*/
void *FlowRecyclerThread(void *td)
{
/* block usr2. usr2 to be handled by the main thread only */
UtilSignalBlock(SIGUSR2);
ThreadVars *th_v = (ThreadVars *)td;
struct timeval ts;
struct timespec cond_time;
int flow_update_delay_sec = FLOW_NORMAL_MODE_UPDATE_DELAY_SEC;
int flow_update_delay_nsec = FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC;
if (th_v->thread_setup_flags != 0)
TmThreadSetupOptions(th_v);
memset(&ts, 0, sizeof(ts));
/* set the thread name */
if (SCSetThreadName(th_v->name) < 0) {
SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
} else {
SCLogDebug("%s started...", th_v->name);
}
/* Set the threads capability */
th_v->cap_flags = 0;
SCDropCaps(th_v);
TmThreadsSetFlag(th_v, THV_INIT_DONE);
while (1)
{
if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
TmThreadsSetFlag(th_v, THV_PAUSED);
TmThreadTestThreadUnPaused(th_v);
TmThreadsUnsetFlag(th_v, THV_PAUSED);
}
/* Get the time */
memset(&ts, 0, sizeof(ts));
TimeGet(&ts);
SCLogDebug("ts %" PRIdMAX "", (intmax_t)ts.tv_sec);
uint32_t len = 0;
FQLOCK_LOCK(&flow_recycle_q);
len = flow_recycle_q.len;
FQLOCK_UNLOCK(&flow_recycle_q);
SCLogDebug("%u flows to recycle", len);
if (TmThreadsCheckFlag(th_v, THV_KILL)) {
SCPerfSyncCounters(th_v);
break;
}
cond_time.tv_sec = time(NULL) + flow_update_delay_sec;
cond_time.tv_nsec = flow_update_delay_nsec;
SCCtrlMutexLock(&flow_recycler_ctrl_mutex);
SCCtrlCondTimedwait(&flow_recycler_ctrl_cond,
&flow_recycler_ctrl_mutex, &cond_time);
SCCtrlMutexUnlock(&flow_recycler_ctrl_mutex);
SCLogDebug("woke up...");
SCPerfSyncCountersIfSignalled(th_v);
}
TmThreadsSetFlag(th_v, THV_RUNNING_DONE);
TmThreadWaitForFlag(th_v, THV_DEINIT);
TmThreadsSetFlag(th_v, THV_CLOSED);
pthread_exit((void *) 0);
return NULL;
}
/** \brief spawn the flow recycler thread */
void FlowRecyclerThreadSpawn()
{
ThreadVars *tv_flowmgr = NULL;
SCCtrlCondInit(&flow_recycler_ctrl_cond, NULL);
SCCtrlMutexInit(&flow_recycler_ctrl_mutex, NULL);
tv_flowmgr = TmThreadCreateMgmtThread("FlowRecyclerThread",
FlowRecyclerThread, 0);
TmThreadSetCPU(tv_flowmgr, MANAGEMENT_CPU_SET);
if (tv_flowmgr == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(1);
}
if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(1);
}
return;
}
/**
* \brief Used to kill flow recycler thread(s).
*
* \todo Kinda hackish since it uses the tv name to identify flow recycler
* thread. We need an all weather identification scheme.
*/
void FlowKillFlowRecyclerThread(void)
{
ThreadVars *tv = NULL;
int cnt = 0;
SCCtrlCondSignal(&flow_recycler_ctrl_cond);
SCMutexLock(&tv_root_lock);
/* flow manager thread(s) is/are a part of mgmt threads */
tv = tv_root[TVT_MGMT];
while (tv != NULL) {
if (strcasecmp(tv->name, "FlowRecyclerThread") == 0) {
TmThreadsSetFlag(tv, THV_KILL);
TmThreadsSetFlag(tv, THV_DEINIT);
/* be sure it has shut down */
while (!TmThreadsCheckFlag(tv, THV_CLOSED)) {
usleep(100);
}
cnt++;
}
tv = tv->next;
}
/* not possible, unless someone decides to rename FlowManagerThread */
if (cnt == 0) {
SCMutexUnlock(&tv_root_lock);
abort();
}
SCMutexUnlock(&tv_root_lock);
return;
}
#ifdef UNITTESTS
/**

@ -33,4 +33,13 @@ void FlowManagerThreadSpawn(void);
void FlowKillFlowManagerThread(void);
void FlowMgrRegisterTests (void);
/** flow recycler scheduling condition */
SCCtrlCondT flow_recycler_ctrl_cond;
SCCtrlMutex flow_recycler_ctrl_mutex;
#define FlowWakeupFlowRecyclerThread() \
SCCtrlCondSignal(&flow_recycler_ctrl_cond)
void FlowRecyclerThreadSpawn(void);
void FlowKillFlowRecyclerThread(void);
#endif /* __FLOW_MANAGER_H__ */

@ -2282,6 +2282,7 @@ int main(int argc, char **argv)
}
/* Spawn the flow manager thread */
FlowManagerThreadSpawn();
FlowRecyclerThreadSpawn();
StreamTcpInitConfig(STREAM_VERBOSE);
SCPerfSpawnThreads();
@ -2400,6 +2401,7 @@ int main(int argc, char **argv)
if (suri.run_mode != RUNMODE_UNIX_SOCKET) {
SCPerfReleaseResources();
FlowKillFlowRecyclerThread();
FlowShutdown();
StreamTcpFreeConfig(STREAM_VERBOSE);
}

Loading…
Cancel
Save