flow/recycler: bring back pthread_cond_t sleep

Bug #4379.
pull/7533/head
Victor Julien 4 years ago committed by Victor Julien
parent 633e6cf09e
commit f271fb4575

@ -93,6 +93,8 @@ SC_ATOMIC_EXTERN(unsigned int, flow_flags);
SCCtrlCondT flow_manager_ctrl_cond; SCCtrlCondT flow_manager_ctrl_cond;
SCCtrlMutex flow_manager_ctrl_mutex; SCCtrlMutex flow_manager_ctrl_mutex;
SCCtrlCondT flow_recycler_ctrl_cond;
SCCtrlMutex flow_recycler_ctrl_mutex;
void FlowTimeoutsInit(void) void FlowTimeoutsInit(void)
{ {
@ -300,11 +302,13 @@ static uint32_t ProcessAsideQueue(FlowManagerTimeoutThread *td, FlowTimeoutCount
FlowQueuePrivateAppendFlow(&recycle, f); FlowQueuePrivateAppendFlow(&recycle, f);
if (recycle.len == 100) { if (recycle.len == 100) {
FlowQueueAppendPrivate(&flow_recycle_q, &recycle); FlowQueueAppendPrivate(&flow_recycle_q, &recycle);
FlowWakeupFlowRecyclerThread();
} }
cnt++; cnt++;
} }
if (recycle.len) { if (recycle.len) {
FlowQueueAppendPrivate(&flow_recycle_q, &recycle); FlowQueueAppendPrivate(&flow_recycle_q, &recycle);
FlowWakeupFlowRecyclerThread();
} }
return cnt; return cnt;
} }
@ -586,9 +590,11 @@ static uint32_t FlowCleanupHash(void)
FBLOCK_UNLOCK(fb); FBLOCK_UNLOCK(fb);
if (local_queue.len >= 25) { if (local_queue.len >= 25) {
FlowQueueAppendPrivate(&flow_recycle_q, &local_queue); FlowQueueAppendPrivate(&flow_recycle_q, &local_queue);
FlowWakeupFlowRecyclerThread();
} }
} }
FlowQueueAppendPrivate(&flow_recycle_q, &local_queue); FlowQueueAppendPrivate(&flow_recycle_q, &local_queue);
FlowWakeupFlowRecyclerThread();
return cnt; return cnt;
} }
@ -1052,6 +1058,7 @@ static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
{ {
FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)thread_data; FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)thread_data;
BUG_ON(ftd == NULL); BUG_ON(ftd == NULL);
const bool time_is_live = TimeModeIsLive();
uint64_t recycled_cnt = 0; uint64_t recycled_cnt = 0;
struct timeval ts; struct timeval ts;
memset(&ts, 0, sizeof(ts)); memset(&ts, 0, sizeof(ts));
@ -1084,7 +1091,30 @@ static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
break; break;
} }
usleep(250); const bool emerg = (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY);
if (emerg || !time_is_live) {
usleep(250);
} else {
struct timeval cond_tv;
gettimeofday(&cond_tv, NULL);
cond_tv.tv_sec += 1;
struct timespec cond_time = FROM_TIMEVAL(cond_tv);
SCCtrlMutexLock(&flow_recycler_ctrl_mutex);
while (1) {
int rc = SCCtrlCondTimedwait(
&flow_recycler_ctrl_cond, &flow_recycler_ctrl_mutex, &cond_time);
if (rc == ETIMEDOUT || rc < 0) {
break;
}
if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
break;
}
if (SC_ATOMIC_GET(flow_recycle_q.non_empty) == true) {
break;
}
}
SCCtrlMutexUnlock(&flow_recycler_ctrl_mutex);
}
SCLogDebug("woke up..."); SCLogDebug("woke up...");
@ -1120,6 +1150,9 @@ void FlowRecyclerThreadSpawn()
} }
flowrec_number = (uint32_t)setting; flowrec_number = (uint32_t)setting;
SCCtrlCondInit(&flow_recycler_ctrl_cond, NULL);
SCCtrlMutexInit(&flow_recycler_ctrl_mutex, NULL);
SCLogConfig("using %u flow recycler threads", flowrec_number); SCLogConfig("using %u flow recycler threads", flowrec_number);
for (uint32_t u = 0; u < flowrec_number; u++) { for (uint32_t u = 0; u < flowrec_number; u++) {
@ -1161,6 +1194,7 @@ void FlowDisableFlowRecyclerThread(void)
/* make sure all flows are processed */ /* make sure all flows are processed */
do { do {
FlowWakeupFlowRecyclerThread();
usleep(10); usleep(10);
} while (FlowRecyclerReadyToShutdown() == false); } while (FlowRecyclerReadyToShutdown() == false);
@ -1194,6 +1228,7 @@ again:
{ {
if (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) { if (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) {
SCMutexUnlock(&tv_root_lock); SCMutexUnlock(&tv_root_lock);
FlowWakeupFlowRecyclerThread();
/* sleep outside lock */ /* sleep outside lock */
SleepMsec(1); SleepMsec(1);
goto again; goto again;

@ -28,6 +28,9 @@
extern SCCtrlCondT flow_manager_ctrl_cond; extern SCCtrlCondT flow_manager_ctrl_cond;
extern SCCtrlMutex flow_manager_ctrl_mutex; extern SCCtrlMutex flow_manager_ctrl_mutex;
#define FlowWakeupFlowManagerThread() SCCtrlCondSignal(&flow_manager_ctrl_cond) #define FlowWakeupFlowManagerThread() SCCtrlCondSignal(&flow_manager_ctrl_cond)
extern SCCtrlCondT flow_recycler_ctrl_cond;
extern SCCtrlMutex flow_recycler_ctrl_mutex;
#define FlowWakeupFlowRecyclerThread() SCCtrlCondSignal(&flow_recycler_ctrl_cond)
#define FlowTimeoutsReset() FlowTimeoutsInit() #define FlowTimeoutsReset() FlowTimeoutsInit()
void FlowTimeoutsInit(void); void FlowTimeoutsInit(void);

Loading…
Cancel
Save