diff --git a/src/threadvars.h b/src/threadvars.h index 0e04af2ca9..f8277b7758 100644 --- a/src/threadvars.h +++ b/src/threadvars.h @@ -87,6 +87,9 @@ typedef struct ThreadVars_ { void *(*tm_func)(void *); struct TmSlot_ *tm_slots; + /** stream packet queue for flow time out injection */ + struct PacketQueue_ *stream_pq; + uint8_t thread_setup_flags; /** the type of thread as defined in tm-threads.h (TVT_PPT, TVT_MGMT) */ diff --git a/src/tm-threads.c b/src/tm-threads.c index f9dd12c07f..bb8ab9be90 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -270,6 +270,16 @@ void *TmThreadsSlotPktAcqLoop(void *td) SCMutexInit(&slot->slot_pre_pq.mutex_q, NULL); memset(&slot->slot_post_pq, 0, sizeof(PacketQueue)); SCMutexInit(&slot->slot_post_pq.mutex_q, NULL); + + /* get the 'pre qeueue' from module before the stream module */ + if (slot->slot_next != NULL && slot->slot_next->tm_id == TMM_STREAMTCP) { + SCLogDebug("pre-stream packetqueue %p (postq)", &s->slot_post_pq); + tv->stream_pq = &slot->slot_post_pq; + /* if the stream module is the first, get the threads input queue */ + } else if (slot == (TmSlot *)tv->tm_slots && slot->tm_id == TMM_STREAMTCP) { + tv->stream_pq = &trans_q[tv->inq->id]; + SCLogDebug("pre-stream packetqueue %p (inq)", &slot->slot_pre_pq); + } } tv->sc_perf_pca = SCPerfGetAllCountersArray(&tv->sc_perf_pctx); @@ -314,14 +324,19 @@ void *TmThreadsSlotPktAcqLoop(void *td) goto error; } } + + BUG_ON(slot->slot_pre_pq.len); + BUG_ON(slot->slot_post_pq.len); } + tv->stream_pq = NULL; SCLogDebug("%s ending", tv->name); TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) 0); return NULL; error: + tv->stream_pq = NULL; pthread_exit((void *) -1); return NULL; } @@ -378,6 +393,19 @@ void *TmThreadsSlotVar(void *td) SCMutexInit(&s->slot_pre_pq.mutex_q, NULL); memset(&s->slot_post_pq, 0, sizeof(PacketQueue)); SCMutexInit(&s->slot_post_pq.mutex_q, NULL); + + /* special case: we need to access the stream queue + * from the flow timeout code */ + + /* get the 'pre qeueue' from module before the stream module */ + if (s->slot_next != NULL && s->slot_next->tm_id == TMM_STREAMTCP) { + SCLogDebug("pre-stream packetqueue %p (preq)", &s->slot_pre_pq); + tv->stream_pq = &s->slot_pre_pq; + /* if the stream module is the first, get the threads input queue */ + } else if (s == (TmSlot *)tv->tm_slots && s->tm_id == TMM_STREAMTCP) { + tv->stream_pq = &trans_q[tv->inq->id]; + SCLogDebug("pre-stream packetqueue %p (inq)", &s->slot_pre_pq); + } } tv->sc_perf_pca = SCPerfGetAllCountersArray(&tv->sc_perf_pctx); @@ -465,14 +493,18 @@ void *TmThreadsSlotVar(void *td) goto error; } } + BUG_ON(s->slot_pre_pq.len); + BUG_ON(s->slot_post_pq.len); } SCLogDebug("%s ending", tv->name); + tv->stream_pq = NULL; TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) 0); return NULL; error: + tv->stream_pq = NULL; pthread_exit((void *) -1); return NULL; }