diff --git a/src/tm-threads.c b/src/tm-threads.c index 9df10ff937..bc866aa215 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -1521,6 +1521,64 @@ void TmThreadKillThread(ThreadVars *tv) return; } +/** \internal + * + * \brief make sure that all packet threads are done processing their + * in-flight packets + */ +static void TmThreadDrainPacketThreads(void) +{ + /* value in seconds */ +#define THREAD_KILL_MAX_WAIT_TIME 60 + /* value in microseconds */ +#define WAIT_TIME 1000 + + uint64_t total_wait_time = 0; + + ThreadVars *tv = NULL; + +again: + if (total_wait_time > (THREAD_KILL_MAX_WAIT_TIME * 1000000)) { + SCLogWarning(SC_ERR_SHUTDOWN, "unable to get all packet threads " + "to process their packets in time"); + return; + } + + SCMutexLock(&tv_root_lock); + + /* all receive threads are part of packet processing threads */ + tv = tv_root[TVT_PPT]; + + while (tv) { + if (tv->inq != NULL) { + /* we wait till we dry out all the inq packets, before we + * kill this thread. Do note that you should have disabled + * packet acquire by now using TmThreadDisableReceiveThreads()*/ + if (!(strlen(tv->inq->name) == strlen("packetpool") && + strcasecmp(tv->inq->name, "packetpool") == 0)) { + PacketQueue *q = &trans_q[tv->inq->id]; + if (q->len != 0) { + SCMutexUnlock(&tv_root_lock); + + total_wait_time += WAIT_TIME; + + /* don't sleep while holding a lock */ + usleep(WAIT_TIME); + goto again; + } + } + } + + tv = tv->next; + } + + SCMutexUnlock(&tv_root_lock); + return; + +#undef THREAD_KILL_MAX_WAIT_TIME +#undef WAIT_TIME +} + /** * \brief Disable all threads having the specified TMs. * @@ -1618,6 +1676,11 @@ again: SCMutexUnlock(&tv_root_lock); + /* finally wait for all packet threads to have + * processed all of their 'live' packets so we + * don't process the last live packets together + * with FFR packets */ + TmThreadDrainPacketThreads(); return; } @@ -1635,6 +1698,8 @@ void TmThreadDisablePacketThreads(void) ThreadVars *tv = NULL; + /* first drain all packet threads of their packets */ + TmThreadDrainPacketThreads(); again: SCMutexLock(&tv_root_lock);