diff --git a/src/tm-threads.c b/src/tm-threads.c index d9c5160784..21b40bad6e 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -447,6 +447,17 @@ static void *TmThreadsSlotVar(void *td) /* input a packet */ p = tv->tmqh_in(tv); + /* if we didn't get a packet see if we need to do some housekeeping */ + if (unlikely(p == NULL)) { + if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty) == true) { + p = PacketGetFromQueueOrAlloc(); + if (p != NULL) { + p->flags |= PKT_PSEUDO_STREAM_END; + PKT_SET_SRC(p, PKT_SRC_CAPTURE_TIMEOUT); + } + } + } + if (p != NULL) { /* run the thread module(s) */ r = TmThreadsSlotVarRun(tv, p, s); diff --git a/src/tm-threads.h b/src/tm-threads.h index 76e13d9795..91cf9bbb47 100644 --- a/src/tm-threads.h +++ b/src/tm-threads.h @@ -159,7 +159,7 @@ static inline void TmThreadsSlotProcessPktFail(ThreadVars *tv, TmSlot *s, Packet * manager. * \param s pipeline to run on these packets. */ -static inline void TmThreadsHandleInjectedPackets(ThreadVars *tv) +static inline bool TmThreadsHandleInjectedPackets(ThreadVars *tv) { PacketQueue *pq = tv->stream_pq_local; if (pq && pq->len > 0) { @@ -176,6 +176,9 @@ static inline void TmThreadsHandleInjectedPackets(ThreadVars *tv) } tv->tmqh_out(tv, extra_p); } + return true; + } else { + return false; } } @@ -221,18 +224,34 @@ static inline void TmThreadsCaptureInjectPacket(ThreadVars *tv, Packet *p) } } +/** \brief handle capture timeout + * When a capture method times out we check for house keeping + * tasks in the capture thread. + * + * \param p packet. Capture method may have taken a packet from + * the pool prior to the timing out call. We will then + * use that packet. Otherwise we can get our own. + */ static inline void TmThreadsCaptureHandleTimeout(ThreadVars *tv, Packet *p) { if (TmThreadsCheckFlag(tv, THV_CAPTURE_INJECT_PKT)) { - TmThreadsCaptureInjectPacket(tv, p); - } else { - TmThreadsHandleInjectedPackets(tv); + TmThreadsCaptureInjectPacket(tv, p); /* consumes 'p' */ + return; - /* packet could have been passed to us that we won't use - * return it to the pool. */ - if (p != NULL) - tv->tmqh_out(tv, p); + } else { + if (TmThreadsHandleInjectedPackets(tv) == false) { + /* see if we have to do some house keeping */ + if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty) == true) { + TmThreadsCaptureInjectPacket(tv, p); /* consumes 'p' */ + return; + } + } } + + /* packet could have been passed to us that we won't use + * return it to the pool. */ + if (p != NULL) + tv->tmqh_out(tv, p); } void TmThreadsListThreads(void);