From 9df8e1c98451ddd689e4ad274729deceebcb5c16 Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Mon, 11 Nov 2019 11:11:55 +0100 Subject: [PATCH] threading: add shortcut to flowworker --- src/threadvars.h | 4 ++++ src/tm-threads.c | 6 ++++++ 2 files changed, 10 insertions(+) diff --git a/src/threadvars.h b/src/threadvars.h index e7b20f38b9..0b0a7acf06 100644 --- a/src/threadvars.h +++ b/src/threadvars.h @@ -82,6 +82,10 @@ typedef struct ThreadVars_ { /** slot functions */ void *(*tm_func)(void *); struct TmSlot_ *tm_slots; + /** pointer to the flowworker in the pipeline. Used as starting point + * for injected packets. Can be NULL if the flowworker is not part + * of this thread. */ + struct TmSlot_ *tm_flowworker; /** Stream packet queue for flow time out injection. Either a pointer to the * workers input queue or to stream_pq_local */ diff --git a/src/tm-threads.c b/src/tm-threads.c index 94e8f2a8f6..b592950a9b 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -301,6 +301,7 @@ static void *TmThreadsSlotPktAcqLoop(void *td) /* if the flowworker module is the first, get the threads input queue */ if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) { tv->stream_pq = &trans_q[tv->inq->id]; + tv->tm_flowworker = slot; SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq); /* setup a queue */ } else if (slot->tm_id == TMM_FLOWWORKER) { @@ -309,6 +310,7 @@ static void *TmThreadsSlotPktAcqLoop(void *td) FatalError(SC_ERR_MEM_ALLOC, "failed to alloc PacketQueue"); SCMutexInit(&tv->stream_pq_local->mutex_q, NULL); tv->stream_pq = tv->stream_pq_local; + tv->tm_flowworker = slot; SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq); } } @@ -427,6 +429,7 @@ static void *TmThreadsSlotPktAcqLoopAFL(void *td) /* if the flowworker module is the first, get the threads input queue */ if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) { tv->stream_pq = &trans_q[tv->inq->id]; + tv->tm_flowworker = slot; SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq); /* setup a queue */ } else if (slot->tm_id == TMM_FLOWWORKER) { @@ -435,6 +438,7 @@ static void *TmThreadsSlotPktAcqLoopAFL(void *td) FatalError(SC_ERR_MEM_ALLOC, "failed to alloc PacketQueue"); SCMutexInit(&tv->stream_pq_local->mutex_q, NULL); tv->stream_pq = tv->stream_pq_local; + tv->tm_flowworker = slot; SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq); } } @@ -541,6 +545,7 @@ static void *TmThreadsSlotVar(void *td) /* if the flowworker module is the first, get the threads input queue */ if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) { tv->stream_pq = &trans_q[tv->inq->id]; + tv->tm_flowworker = s; SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq); /* setup a queue */ } else if (s->tm_id == TMM_FLOWWORKER) { @@ -549,6 +554,7 @@ static void *TmThreadsSlotVar(void *td) FatalError(SC_ERR_MEM_ALLOC, "failed to alloc PacketQueue"); SCMutexInit(&tv->stream_pq_local->mutex_q, NULL); tv->stream_pq = tv->stream_pq_local; + tv->tm_flowworker = s; SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq); } }