diff --git a/src/flow-manager.c b/src/flow-manager.c index 072a3cb74a..e085fffa6b 100644 --- a/src/flow-manager.c +++ b/src/flow-manager.c @@ -280,9 +280,9 @@ static uint32_t ProcessAsideQueue(FlowManagerTimeoutThread *td, FlowTimeoutCount if (f->proto == IPPROTO_TCP && !(f->flags & (FLOW_TIMEOUT_REASSEMBLY_DONE | FLOW_ACTION_DROP)) && - !FlowIsBypassed(f) && FlowForceReassemblyNeedReassembly(f)) { + !FlowIsBypassed(f) && FlowNeedsReassembly(f)) { /* Send the flow to its thread */ - FlowForceReassemblyForFlow(f); + FlowSendToLocalThread(f); FLOWLOCK_UNLOCK(f); /* flow ownership is already passed to the worker thread */ diff --git a/src/flow-timeout.c b/src/flow-timeout.c index 01d60fe251..87ec7e1686 100644 --- a/src/flow-timeout.c +++ b/src/flow-timeout.c @@ -66,17 +66,18 @@ /** * \internal - * \brief Pseudo packet setup for flow forced reassembly. + * \brief Pseudo packet setup to finish a flow when needed. * + * \param p a dummy pseudo packet from packet pool. Not all pseudo + * packets need to force reassembly, in which case we just + * set dummy ack/seq values. * \param direction Direction of the packet. 0 indicates toserver and 1 * indicates toclient. * \param f Pointer to the flow. * \param ssn Pointer to the tcp session. - * \param dummy Indicates to create a dummy pseudo packet. Not all pseudo - * packets need to force reassembly, in which case we just - * set dummy ack/seq values. + * \retval pseudo packet with everything set up */ -static inline Packet *FlowForceReassemblyPseudoPacketSetup( +static inline Packet *FlowPseudoPacketSetup( Packet *p, int direction, Flow *f, const TcpSession *ssn) { const int orig_dir = direction; @@ -263,7 +264,7 @@ error: return NULL; } -Packet *FlowForceReassemblyPseudoPacketGet(int direction, Flow *f, const TcpSession *ssn) +Packet *FlowPseudoPacketGet(int direction, Flow *f, const TcpSession *ssn) { PacketPoolWait(); Packet *p = PacketPoolGetPacket(); @@ -273,7 +274,7 @@ Packet *FlowForceReassemblyPseudoPacketGet(int direction, Flow *f, const TcpSess PACKET_PROFILING_START(p); - return FlowForceReassemblyPseudoPacketSetup(p, direction, f, ssn); + return FlowPseudoPacketSetup(p, direction, f, ssn); } /** @@ -284,7 +285,7 @@ Packet *FlowForceReassemblyPseudoPacketGet(int direction, Flow *f, const TcpSess * \retval false no * \retval true yes */ -bool FlowForceReassemblyNeedReassembly(Flow *f) +bool FlowNeedsReassembly(Flow *f) { if (f == NULL || f->protoctx == NULL) { @@ -330,7 +331,7 @@ bool FlowForceReassemblyNeedReassembly(Flow *f) /** * \internal - * \brief Forces reassembly for flow if it needs it. + * \brief Sends the flow to its respective thread's flow queue. * * The function requires flow to be locked beforehand. * @@ -339,10 +340,8 @@ bool FlowForceReassemblyNeedReassembly(Flow *f) * flag is set, choose the second thread_id (to client/source). * * \param f Pointer to the flow. - * - * \retval 0 This flow doesn't need any reassembly processing; 1 otherwise. */ -void FlowForceReassemblyForFlow(Flow *f) +void FlowSendToLocalThread(Flow *f) { // Choose the thread_id based on whether the flow has been // reversed. @@ -352,7 +351,8 @@ void FlowForceReassemblyForFlow(Flow *f) /** * \internal - * \brief Forces reassembly for flows that need it. + * \brief Remove flows from the hash bucket as they have more work to be done in + * in the detection engine. * * When this function is called we're running in virtually dead engine, * so locking the flows is not strictly required. The reasons it is still @@ -362,10 +362,8 @@ void FlowForceReassemblyForFlow(Flow *f) * - allow us to aggressively check using debug validation assertions * - be robust in case of future changes * - locking overhead is negligible when no other thread fights us - * - * \param q The queue to process flows from. */ -static inline void FlowForceReassemblyForHash(void) +static inline void FlowRemoveHash(void) { for (uint32_t idx = 0; idx < flow_config.hash_size; idx++) { FlowBucket *fb = &flow_hash[idx]; @@ -392,10 +390,10 @@ static inline void FlowForceReassemblyForHash(void) /* in case of additional work, we pull the flow out of the * hash and xfer ownership to the injected packet(s) */ - if (FlowForceReassemblyNeedReassembly(f)) { + if (FlowNeedsReassembly(f)) { RemoveFromHash(f, prev_f); f->flow_end_flags |= FLOW_END_FLAG_SHUTDOWN; - FlowForceReassemblyForFlow(f); + FlowSendToLocalThread(f); FLOWLOCK_UNLOCK(f); f = next_f; continue; @@ -412,10 +410,11 @@ static inline void FlowForceReassemblyForHash(void) } /** - * \brief Force reassembly for all the flows that have unprocessed segments. + * \brief Clean up all the flows that have unprocessed segments and have + * some work to do in the detection engine. */ -void FlowForceReassembly(void) +void FlowWorkToDoCleanup(void) { - /* Carry out flow reassembly for unattended flows */ - FlowForceReassemblyForHash(); + /* Carry out cleanup of unattended flows */ + FlowRemoveHash(); } diff --git a/src/flow-timeout.h b/src/flow-timeout.h index 467fd821cd..60b07be8b8 100644 --- a/src/flow-timeout.h +++ b/src/flow-timeout.h @@ -26,9 +26,9 @@ #include "stream-tcp-private.h" -void FlowForceReassemblyForFlow(Flow *f); -bool FlowForceReassemblyNeedReassembly(Flow *f); -void FlowForceReassembly(void); -Packet *FlowForceReassemblyPseudoPacketGet(int direction, Flow *f, const TcpSession *ssn); +void FlowSendToLocalThread(Flow *f); +bool FlowNeedsReassembly(Flow *f); +void FlowWorkToDoCleanup(void); +Packet *FlowPseudoPacketGet(int direction, Flow *f, const TcpSession *ssn); #endif /* SURICATA_FLOW_TIMEOUT_H */ diff --git a/src/flow-worker.c b/src/flow-worker.c index 1983111a1b..61a3811f09 100644 --- a/src/flow-worker.c +++ b/src/flow-worker.c @@ -120,7 +120,7 @@ static int FlowFinish(ThreadVars *tv, Flow *f, FlowWorkerThreadData *fw, void *d /* insert a pseudo packet in the toserver direction */ if (client == STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_ONLY_DETECTION) { - Packet *p = FlowForceReassemblyPseudoPacketGet(0, f, ssn); + Packet *p = FlowPseudoPacketGet(0, f, ssn); if (p != NULL) { PKT_SET_SRC(p, PKT_SRC_FFR); if (server == STREAM_HAS_UNPROCESSED_SEGMENTS_NONE) { @@ -134,7 +134,7 @@ static int FlowFinish(ThreadVars *tv, Flow *f, FlowWorkerThreadData *fw, void *d /* handle toclient */ if (server == STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_ONLY_DETECTION) { - Packet *p = FlowForceReassemblyPseudoPacketGet(1, f, ssn); + Packet *p = FlowPseudoPacketGet(1, f, ssn); if (p != NULL) { PKT_SET_SRC(p, PKT_SRC_FFR); p->flowflags |= FLOW_PKT_LAST_PSEUDO; @@ -166,7 +166,7 @@ static void CheckWorkQueue(ThreadVars *tv, FlowWorkerThreadData *fw, FlowTimeout if (f->proto == IPPROTO_TCP) { if (!(f->flags & (FLOW_TIMEOUT_REASSEMBLY_DONE | FLOW_ACTION_DROP)) && - !FlowIsBypassed(f) && FlowForceReassemblyNeedReassembly(f) && f->ffr != 0) { + !FlowIsBypassed(f) && FlowNeedsReassembly(f) && f->ffr != 0) { /* read detect thread in case we're doing a reload */ void *detect_thread = SC_ATOMIC_GET(fw->detect_thread); int cnt = FlowFinish(tv, f, fw, detect_thread); diff --git a/src/suricata.c b/src/suricata.c index 0ff1bdbb83..f7a03a208c 100644 --- a/src/suricata.c +++ b/src/suricata.c @@ -2273,14 +2273,14 @@ void PostRunDeinit(const int runmode, struct timeval *start_time) if (runmode == RUNMODE_UNIX_SOCKET) return; - /* needed by FlowForceReassembly */ + /* needed by FlowWorkToDoCleanup */ PacketPoolInit(); /* handle graceful shutdown of the flow engine, it's helper * threads and the packet threads */ FlowDisableFlowManagerThread(); TmThreadDisableReceiveThreads(); - FlowForceReassembly(); + FlowWorkToDoCleanup(); TmThreadDisablePacketThreads(); SCPrintElapsedTime(start_time); FlowDisableFlowRecyclerThread();