Give easy access for thread stream packet queue

Access it from ThreadVars. This allows for easy injection of packets
into the stream engine.
pull/1267/head
Victor Julien 10 years ago
parent 489ee20560
commit a260cba32b

@ -87,6 +87,9 @@ typedef struct ThreadVars_ {
void *(*tm_func)(void *);
struct TmSlot_ *tm_slots;
/** stream packet queue for flow time out injection */
struct PacketQueue_ *stream_pq;
uint8_t thread_setup_flags;
/** the type of thread as defined in tm-threads.h (TVT_PPT, TVT_MGMT) */

@ -270,6 +270,16 @@ void *TmThreadsSlotPktAcqLoop(void *td)
SCMutexInit(&slot->slot_pre_pq.mutex_q, NULL);
memset(&slot->slot_post_pq, 0, sizeof(PacketQueue));
SCMutexInit(&slot->slot_post_pq.mutex_q, NULL);
/* get the 'pre qeueue' from module before the stream module */
if (slot->slot_next != NULL && slot->slot_next->tm_id == TMM_STREAMTCP) {
SCLogDebug("pre-stream packetqueue %p (postq)", &s->slot_post_pq);
tv->stream_pq = &slot->slot_post_pq;
/* if the stream module is the first, get the threads input queue */
} else if (slot == (TmSlot *)tv->tm_slots && slot->tm_id == TMM_STREAMTCP) {
tv->stream_pq = &trans_q[tv->inq->id];
SCLogDebug("pre-stream packetqueue %p (inq)", &slot->slot_pre_pq);
}
}
tv->sc_perf_pca = SCPerfGetAllCountersArray(&tv->sc_perf_pctx);
@ -314,14 +324,19 @@ void *TmThreadsSlotPktAcqLoop(void *td)
goto error;
}
}
BUG_ON(slot->slot_pre_pq.len);
BUG_ON(slot->slot_post_pq.len);
}
tv->stream_pq = NULL;
SCLogDebug("%s ending", tv->name);
TmThreadsSetFlag(tv, THV_CLOSED);
pthread_exit((void *) 0);
return NULL;
error:
tv->stream_pq = NULL;
pthread_exit((void *) -1);
return NULL;
}
@ -378,6 +393,19 @@ void *TmThreadsSlotVar(void *td)
SCMutexInit(&s->slot_pre_pq.mutex_q, NULL);
memset(&s->slot_post_pq, 0, sizeof(PacketQueue));
SCMutexInit(&s->slot_post_pq.mutex_q, NULL);
/* special case: we need to access the stream queue
* from the flow timeout code */
/* get the 'pre qeueue' from module before the stream module */
if (s->slot_next != NULL && s->slot_next->tm_id == TMM_STREAMTCP) {
SCLogDebug("pre-stream packetqueue %p (preq)", &s->slot_pre_pq);
tv->stream_pq = &s->slot_pre_pq;
/* if the stream module is the first, get the threads input queue */
} else if (s == (TmSlot *)tv->tm_slots && s->tm_id == TMM_STREAMTCP) {
tv->stream_pq = &trans_q[tv->inq->id];
SCLogDebug("pre-stream packetqueue %p (inq)", &s->slot_pre_pq);
}
}
tv->sc_perf_pca = SCPerfGetAllCountersArray(&tv->sc_perf_pctx);
@ -465,14 +493,18 @@ void *TmThreadsSlotVar(void *td)
goto error;
}
}
BUG_ON(s->slot_pre_pq.len);
BUG_ON(s->slot_post_pq.len);
}
SCLogDebug("%s ending", tv->name);
tv->stream_pq = NULL;
TmThreadsSetFlag(tv, THV_CLOSED);
pthread_exit((void *) 0);
return NULL;
error:
tv->stream_pq = NULL;
pthread_exit((void *) -1);
return NULL;
}

Loading…
Cancel
Save