@ -428,7 +428,11 @@ static void *TmThreadsSlotVar(void *td)
StatsSetupPrivate ( tv ) ;
TmThreadsSetFlag ( tv , THV_INIT_DONE ) ;
// Each 'worker' thread uses this func to process/decode the packet read.
// Each decode method is different to receive methods in that they do not
// enter infinite loops. They use this as the core loop. As a result, at this
// point the worker threads can be considered both initialized and running.
TmThreadsSetFlag ( tv , THV_INIT_DONE | THV_RUNNING ) ;
s = ( TmSlot * ) tv - > tm_slots ;
@ -1033,7 +1037,6 @@ ThreadVars *TmThreadCreatePacketHandler(const char *name, const char *inq_name,
tv - > id = TmThreadsRegisterThread ( tv , tv - > type ) ;
}
return tv ;
}
@ -1773,10 +1776,113 @@ void TmThreadWaitForFlag(ThreadVars *tv, uint32_t flags)
void TmThreadContinue ( ThreadVars * tv )
{
TmThreadsUnsetFlag ( tv , THV_PAUSE ) ;
return ;
}
/**
* \ brief Waits for all threads to be in a running state
*
* \ retval TM_ECODE_OK if all are running or error if a thread failed
*/
TmEcode TmThreadWaitOnThreadRunning ( void )
{
uint16_t RX_num = 0 ;
uint16_t W_num = 0 ;
uint16_t FM_num = 0 ;
uint16_t FR_num = 0 ;
uint16_t TX_num = 0 ;
struct timeval start_ts ;
struct timeval cur_ts ;
gettimeofday ( & start_ts , NULL ) ;
again :
SCMutexLock ( & tv_root_lock ) ;
for ( int i = 0 ; i < TVT_MAX ; i + + ) {
ThreadVars * tv = tv_root [ i ] ;
while ( tv ! = NULL ) {
if ( TmThreadsCheckFlag ( tv , ( THV_FAILED | THV_CLOSED | THV_DEAD ) ) ) {
SCMutexUnlock ( & tv_root_lock ) ;
SCLogError ( SC_ERR_THREAD_INIT ,
" thread \" %s \" failed to "
" start: flags %04x " ,
tv - > name , SC_ATOMIC_GET ( tv - > flags ) ) ;
return TM_ECODE_FAILED ;
}
if ( ! ( TmThreadsCheckFlag ( tv , THV_RUNNING | THV_RUNNING_DONE ) ) ) {
SCMutexUnlock ( & tv_root_lock ) ;
/* 60 seconds provided for the thread to transition from
* THV_INIT_DONE to THV_RUNNING */
gettimeofday ( & cur_ts , NULL ) ;
if ( ( cur_ts . tv_sec - start_ts . tv_sec ) > 60 ) {
SCLogError ( SC_ERR_THREAD_INIT ,
" thread \" %s \" failed to "
" start in time: flags %04x " ,
tv - > name , SC_ATOMIC_GET ( tv - > flags ) ) ;
return TM_ECODE_FAILED ;
}
/* sleep a little to give the thread some
* time to start running */
SleepUsec ( 100 ) ;
goto again ;
}
if ( strncmp ( thread_name_autofp , tv - > name , strlen ( thread_name_autofp ) ) = = 0 )
RX_num + + ;
else if ( strncmp ( thread_name_workers , tv - > name , strlen ( thread_name_workers ) ) = = 0 )
W_num + + ;
else if ( strncmp ( thread_name_verdict , tv - > name , strlen ( thread_name_verdict ) ) = = 0 )
TX_num + + ;
else if ( strncmp ( thread_name_flow_mgr , tv - > name , strlen ( thread_name_flow_mgr ) ) = = 0 )
FM_num + + ;
else if ( strncmp ( thread_name_flow_rec , tv - > name , strlen ( thread_name_flow_rec ) ) = = 0 )
FR_num + + ;
tv = tv - > next ;
}
}
SCMutexUnlock ( & tv_root_lock ) ;
/* Construct a welcome string displaying
* initialized thread types and counts */
uint16_t app_len = 32 ;
uint16_t buf_len = 256 ;
char append_str [ app_len ] ;
char thread_counts [ buf_len ] ;
strlcpy ( thread_counts , " Threads created -> " , strlen ( " Threads created -> " ) + 1 ) ;
if ( RX_num > 0 ) {
snprintf ( append_str , app_len , " RX: %u " , RX_num ) ;
strlcat ( thread_counts , append_str , buf_len ) ;
}
if ( W_num > 0 ) {
snprintf ( append_str , app_len , " W: %u " , W_num ) ;
strlcat ( thread_counts , append_str , buf_len ) ;
}
if ( TX_num > 0 ) {
snprintf ( append_str , app_len , " TX: %u " , TX_num ) ;
strlcat ( thread_counts , append_str , buf_len ) ;
}
if ( FM_num > 0 ) {
snprintf ( append_str , app_len , " FM: %u " , FM_num ) ;
strlcat ( thread_counts , append_str , buf_len ) ;
}
if ( FR_num > 0 ) {
snprintf ( append_str , app_len , " FR: %u " , FR_num ) ;
strlcat ( thread_counts , append_str , buf_len ) ;
}
snprintf ( append_str , app_len , " Engine started. " ) ;
strlcat ( thread_counts , append_str , buf_len ) ;
SCLogNotice ( " %s " , thread_counts ) ;
return TM_ECODE_OK ;
}
/**
* \ brief Unpauses all threads present in tv_root
*/
@ -1823,12 +1929,6 @@ void TmThreadCheckThreadState(void)
*/
TmEcode TmThreadWaitOnThreadInit ( void )
{
uint16_t RX_num = 0 ;
uint16_t W_num = 0 ;
uint16_t FM_num = 0 ;
uint16_t FR_num = 0 ;
uint16_t TX_num = 0 ;
struct timeval start_ts ;
struct timeval cur_ts ;
gettimeofday ( & start_ts , NULL ) ;
@ -1877,55 +1977,11 @@ again:
return TM_ECODE_FAILED ;
}
if ( strncmp ( thread_name_autofp , tv - > name , strlen ( thread_name_autofp ) ) = = 0 )
RX_num + + ;
else if ( strncmp ( thread_name_workers , tv - > name , strlen ( thread_name_workers ) ) = = 0 )
W_num + + ;
else if ( strncmp ( thread_name_verdict , tv - > name , strlen ( thread_name_verdict ) ) = = 0 )
TX_num + + ;
else if ( strncmp ( thread_name_flow_mgr , tv - > name , strlen ( thread_name_flow_mgr ) ) = = 0 )
FM_num + + ;
else if ( strncmp ( thread_name_flow_rec , tv - > name , strlen ( thread_name_flow_rec ) ) = = 0 )
FR_num + + ;
tv = tv - > next ;
}
}
SCMutexUnlock ( & tv_root_lock ) ;
/* Construct a welcome string displaying
* initialized thread types and counts */
uint16_t app_len = 32 ;
uint16_t buf_len = 256 ;
char append_str [ app_len ] ;
char thread_counts [ buf_len ] ;
strlcpy ( thread_counts , " Threads created -> " , strlen ( " Threads created -> " ) + 1 ) ;
if ( RX_num > 0 ) {
snprintf ( append_str , app_len , " RX: %u " , RX_num ) ;
strlcat ( thread_counts , append_str , buf_len ) ;
}
if ( W_num > 0 ) {
snprintf ( append_str , app_len , " W: %u " , W_num ) ;
strlcat ( thread_counts , append_str , buf_len ) ;
}
if ( TX_num > 0 ) {
snprintf ( append_str , app_len , " TX: %u " , TX_num ) ;
strlcat ( thread_counts , append_str , buf_len ) ;
}
if ( FM_num > 0 ) {
snprintf ( append_str , app_len , " FM: %u " , FM_num ) ;
strlcat ( thread_counts , append_str , buf_len ) ;
}
if ( FR_num > 0 ) {
snprintf ( append_str , app_len , " FR: %u " , FR_num ) ;
strlcat ( thread_counts , append_str , buf_len ) ;
}
snprintf ( append_str , app_len , " Engine started. " ) ;
strlcat ( thread_counts , append_str , buf_len ) ;
SCLogNotice ( " %s " , thread_counts ) ;
return TM_ECODE_OK ;
}