flow: prepare flow forced reuse logging

Most flows are marked for clean up by the flow manager, which then
passes them to the recycler. The recycler logs and cleans up. However,
under resource stress conditions, the packet threads can recycle
existing flow directly. So here the recycler has no role to play, as
the flow is immediately used.

For this reason, the packet threads need to be able to invoke the
flow logger directly.

The flow logging thread ctx will stored in the DecodeThreadVars
stucture. Therefore, this patch makes the DecodeThreadVars an argument
to FlowHandlePacket.
pull/1058/head
Victor Julien 11 years ago
parent bd490736c2
commit de034f1867

@ -192,7 +192,7 @@ int DecodeICMPV4(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, uint8_t *pkt,
/* ICMP ICMP_DEST_UNREACH influence TCP/UDP flows */
if (ICMPV4_DEST_UNREACH_IS_VALID(p)) {
FlowHandlePacket(tv, p);
FlowHandlePacket(tv, dtv, p);
}
}
}

@ -337,7 +337,7 @@ int DecodeICMPV6(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p,
#endif
/* Flow is an integral part of us */
FlowHandlePacket(tv, p);
FlowHandlePacket(tv, dtv, p);
return TM_ECODE_OK;
}

@ -74,7 +74,7 @@ int DecodeSCTP(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, uint8_t *pkt, u
#endif
/* Flow is an integral part of us */
FlowHandlePacket(tv, p);
FlowHandlePacket(tv, dtv, p);
return TM_ECODE_OK;
}

@ -203,7 +203,7 @@ int DecodeTCP(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, uint8_t *pkt, ui
#endif
/* Flow is an integral part of us */
FlowHandlePacket(tv, p);
FlowHandlePacket(tv, dtv, p);
return TM_ECODE_OK;
}

@ -85,12 +85,12 @@ int DecodeUDP(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, uint8_t *pkt, ui
if (unlikely(DecodeTeredo(tv, dtv, p, p->payload, p->payload_len, pq) == TM_ECODE_OK)) {
/* Here we have a Teredo packet and don't need to handle app
* layer */
FlowHandlePacket(tv, p);
FlowHandlePacket(tv, dtv, p);
return TM_ECODE_OK;
}
/* Flow is an integral part of us */
FlowHandlePacket(tv, p);
FlowHandlePacket(tv, dtv, p);
/* handle the app layer part of the UDP packet payload */
if (unlikely(p->flow != NULL)) {

@ -573,6 +573,9 @@ typedef struct DecodeThreadVars_
int vlan_disabled;
/* thread data for flow logging api */
void *output_flow_thread_data;
/** stats/counters */
uint16_t counter_pkts;
uint16_t counter_bytes;

@ -42,12 +42,16 @@
#include "util-hash-lookup3.h"
#include "conf.h"
#include "output.h"
#include "output-flow.h"
#define FLOW_DEFAULT_FLOW_PRUNE 5
SC_ATOMIC_EXTERN(unsigned int, flow_prune_idx);
SC_ATOMIC_EXTERN(unsigned int, flow_flags);
static Flow *FlowGetUsedFlow(void);
static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv);
#ifdef FLOW_DEBUG_STATS
#define FLOW_DEBUG_STATS_PROTO_ALL 0
@ -422,9 +426,12 @@ static inline int FlowCreateCheck(const Packet *p)
* Get a new flow. We're checking memcap first and will try to make room
* if the memcap is reached.
*
* \param tv thread vars
* \param dtv decode thread vars (for flow log api thread data)
*
* \retval f *LOCKED* flow on succes, NULL on error.
*/
static Flow *FlowGetNew(const Packet *p)
static Flow *FlowGetNew(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p)
{
Flow *f = NULL;
@ -447,7 +454,7 @@ static Flow *FlowGetNew(const Packet *p)
FlowWakeupFlowManagerThread();
}
f = FlowGetUsedFlow();
f = FlowGetUsedFlow(tv, dtv);
if (f == NULL) {
/* very rare, but we can fail. Just giving up */
return NULL;
@ -473,7 +480,7 @@ static Flow *FlowGetNew(const Packet *p)
return f;
}
/* FlowGetFlowFromHash
/** \brief Get Flow for packet
*
* Hash retrieval function for flows. Looks up the hash bucket containing the
* flow pointer. Then compares the packet with the found flow to see if it is
@ -485,9 +492,12 @@ static Flow *FlowGetNew(const Packet *p)
*
* The p->flow pointer is updated to point to the flow.
*
* returns a *LOCKED* flow or NULL
* \param tv thread vars
* \param dtv decode thread vars (for flow log api thread data)
*
* \retval f *LOCKED* flow or NULL
*/
Flow *FlowGetFlowFromHash(const Packet *p)
Flow *FlowGetFlowFromHash(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p)
{
Flow *f = NULL;
FlowHashCountInit;
@ -504,7 +514,7 @@ Flow *FlowGetFlowFromHash(const Packet *p)
/* see if the bucket already has a flow */
if (fb->head == NULL) {
f = FlowGetNew(p);
f = FlowGetNew(tv, dtv, p);
if (f == NULL) {
FBLOCK_UNLOCK(fb);
FlowHashCountUpdate;
@ -538,7 +548,7 @@ Flow *FlowGetFlowFromHash(const Packet *p)
f = f->hnext;
if (f == NULL) {
f = pf->hnext = FlowGetNew(p);
f = pf->hnext = FlowGetNew(tv, dtv, p);
if (f == NULL) {
FBLOCK_UNLOCK(fb);
FlowHashCountUpdate;
@ -603,9 +613,12 @@ Flow *FlowGetFlowFromHash(const Packet *p)
* top each time since that would clear the top of the hash leading to longer
* and longer search times under high pressure (observed).
*
* \param tv thread vars
* \param dtv decode thread vars (for flow log api thread data)
*
* \retval f flow or NULL
*/
static Flow *FlowGetUsedFlow(void)
static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv)
{
uint32_t idx = SC_ATOMIC_GET(flow_prune_idx) % flow_config.hash_size;
uint32_t cnt = flow_config.hash_size;
@ -653,6 +666,10 @@ static Flow *FlowGetUsedFlow(void)
f->fb = NULL;
FBLOCK_UNLOCK(fb);
/* invoke flow log api */
if (dtv && dtv->output_flow_thread_data)
(void)OutputFlowLog(tv, dtv->output_flow_thread_data, f);
FlowClearMemory(f, f->protomap);
FLOWLOCK_UNLOCK(f);

@ -68,7 +68,7 @@ typedef struct FlowBucket_ {
/* prototypes */
Flow *FlowGetFlowFromHash(const Packet *);
Flow *FlowGetFlowFromHash(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *);
/** enable to print stats on hash lookups in flow-debug.log */
//#define FLOW_DEBUG_STATS

@ -232,14 +232,15 @@ static inline int FlowUpdateSeenFlag(const Packet *p)
* This is called for every packet.
*
* \param tv threadvars
* \param dtv decode thread vars (for flow output api thread data)
* \param p packet to handle flow for
*/
void FlowHandlePacket(ThreadVars *tv, Packet *p)
void FlowHandlePacket(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p)
{
/* Get this packet's flow from the hash. FlowHandlePacket() will setup
* a new flow if nescesary. If we get NULL, we're out of flow memory.
* The returned flow is locked. */
Flow *f = FlowGetFlowFromHash(p);
Flow *f = FlowGetFlowFromHash(tv, dtv, p);
if (f == NULL)
return;

@ -401,7 +401,7 @@ typedef struct FlowProto_ {
int (*GetProtoState)(void *);
} FlowProto;
void FlowHandlePacket (ThreadVars *, Packet *);
void FlowHandlePacket (ThreadVars *, DecodeThreadVars *, Packet *);
void FlowInitConfig (char);
void FlowPrintQueueInfo (void);
void FlowShutdown(void);

@ -831,7 +831,7 @@ uint32_t UTHBuildPacketOfFlows(uint32_t start, uint32_t end, uint8_t dir) {
p->src.addr_data32[0] = i + 1;
p->dst.addr_data32[0] = i;
}
FlowHandlePacket(NULL, p);
FlowHandlePacket(NULL, NULL, p);
if (p->flow != NULL)
SC_ATOMIC_RESET(p->flow->use_cnt);

Loading…
Cancel
Save