diff --git a/src/decode-icmpv4.c b/src/decode-icmpv4.c index 8b4402e6ff..6493550d99 100644 --- a/src/decode-icmpv4.c +++ b/src/decode-icmpv4.c @@ -192,7 +192,7 @@ int DecodeICMPV4(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, uint8_t *pkt, /* ICMP ICMP_DEST_UNREACH influence TCP/UDP flows */ if (ICMPV4_DEST_UNREACH_IS_VALID(p)) { - FlowHandlePacket(tv, p); + FlowHandlePacket(tv, dtv, p); } } } diff --git a/src/decode-icmpv6.c b/src/decode-icmpv6.c index 7819f25498..0a58db5488 100644 --- a/src/decode-icmpv6.c +++ b/src/decode-icmpv6.c @@ -337,7 +337,7 @@ int DecodeICMPV6(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, #endif /* Flow is an integral part of us */ - FlowHandlePacket(tv, p); + FlowHandlePacket(tv, dtv, p); return TM_ECODE_OK; } diff --git a/src/decode-sctp.c b/src/decode-sctp.c index 178ed8bd69..6fd8be8d61 100644 --- a/src/decode-sctp.c +++ b/src/decode-sctp.c @@ -74,7 +74,7 @@ int DecodeSCTP(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, uint8_t *pkt, u #endif /* Flow is an integral part of us */ - FlowHandlePacket(tv, p); + FlowHandlePacket(tv, dtv, p); return TM_ECODE_OK; } diff --git a/src/decode-tcp.c b/src/decode-tcp.c index 11ba600f33..47c066160d 100644 --- a/src/decode-tcp.c +++ b/src/decode-tcp.c @@ -203,7 +203,7 @@ int DecodeTCP(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, uint8_t *pkt, ui #endif /* Flow is an integral part of us */ - FlowHandlePacket(tv, p); + FlowHandlePacket(tv, dtv, p); return TM_ECODE_OK; } diff --git a/src/decode-udp.c b/src/decode-udp.c index e1d52082e3..5b83768f9e 100644 --- a/src/decode-udp.c +++ b/src/decode-udp.c @@ -85,12 +85,12 @@ int DecodeUDP(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, uint8_t *pkt, ui if (unlikely(DecodeTeredo(tv, dtv, p, p->payload, p->payload_len, pq) == TM_ECODE_OK)) { /* Here we have a Teredo packet and don't need to handle app * layer */ - FlowHandlePacket(tv, p); + FlowHandlePacket(tv, dtv, p); return TM_ECODE_OK; } /* Flow is an integral part of us */ - FlowHandlePacket(tv, p); + FlowHandlePacket(tv, dtv, p); /* handle the app layer part of the UDP packet payload */ if (unlikely(p->flow != NULL)) { diff --git a/src/decode.h b/src/decode.h index fa48f4ac89..dd60e78876 100644 --- a/src/decode.h +++ b/src/decode.h @@ -573,6 +573,9 @@ typedef struct DecodeThreadVars_ int vlan_disabled; + /* thread data for flow logging api */ + void *output_flow_thread_data; + /** stats/counters */ uint16_t counter_pkts; uint16_t counter_bytes; diff --git a/src/flow-hash.c b/src/flow-hash.c index 52bbd8ebcd..607e00c532 100644 --- a/src/flow-hash.c +++ b/src/flow-hash.c @@ -42,12 +42,16 @@ #include "util-hash-lookup3.h" +#include "conf.h" +#include "output.h" +#include "output-flow.h" + #define FLOW_DEFAULT_FLOW_PRUNE 5 SC_ATOMIC_EXTERN(unsigned int, flow_prune_idx); SC_ATOMIC_EXTERN(unsigned int, flow_flags); -static Flow *FlowGetUsedFlow(void); +static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv); #ifdef FLOW_DEBUG_STATS #define FLOW_DEBUG_STATS_PROTO_ALL 0 @@ -422,9 +426,12 @@ static inline int FlowCreateCheck(const Packet *p) * Get a new flow. We're checking memcap first and will try to make room * if the memcap is reached. * + * \param tv thread vars + * \param dtv decode thread vars (for flow log api thread data) + * * \retval f *LOCKED* flow on succes, NULL on error. */ -static Flow *FlowGetNew(const Packet *p) +static Flow *FlowGetNew(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p) { Flow *f = NULL; @@ -447,7 +454,7 @@ static Flow *FlowGetNew(const Packet *p) FlowWakeupFlowManagerThread(); } - f = FlowGetUsedFlow(); + f = FlowGetUsedFlow(tv, dtv); if (f == NULL) { /* very rare, but we can fail. Just giving up */ return NULL; @@ -473,7 +480,7 @@ static Flow *FlowGetNew(const Packet *p) return f; } -/* FlowGetFlowFromHash +/** \brief Get Flow for packet * * Hash retrieval function for flows. Looks up the hash bucket containing the * flow pointer. Then compares the packet with the found flow to see if it is @@ -485,9 +492,12 @@ static Flow *FlowGetNew(const Packet *p) * * The p->flow pointer is updated to point to the flow. * - * returns a *LOCKED* flow or NULL + * \param tv thread vars + * \param dtv decode thread vars (for flow log api thread data) + * + * \retval f *LOCKED* flow or NULL */ -Flow *FlowGetFlowFromHash(const Packet *p) +Flow *FlowGetFlowFromHash(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p) { Flow *f = NULL; FlowHashCountInit; @@ -504,7 +514,7 @@ Flow *FlowGetFlowFromHash(const Packet *p) /* see if the bucket already has a flow */ if (fb->head == NULL) { - f = FlowGetNew(p); + f = FlowGetNew(tv, dtv, p); if (f == NULL) { FBLOCK_UNLOCK(fb); FlowHashCountUpdate; @@ -538,7 +548,7 @@ Flow *FlowGetFlowFromHash(const Packet *p) f = f->hnext; if (f == NULL) { - f = pf->hnext = FlowGetNew(p); + f = pf->hnext = FlowGetNew(tv, dtv, p); if (f == NULL) { FBLOCK_UNLOCK(fb); FlowHashCountUpdate; @@ -603,9 +613,12 @@ Flow *FlowGetFlowFromHash(const Packet *p) * top each time since that would clear the top of the hash leading to longer * and longer search times under high pressure (observed). * + * \param tv thread vars + * \param dtv decode thread vars (for flow log api thread data) + * * \retval f flow or NULL */ -static Flow *FlowGetUsedFlow(void) +static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv) { uint32_t idx = SC_ATOMIC_GET(flow_prune_idx) % flow_config.hash_size; uint32_t cnt = flow_config.hash_size; @@ -653,6 +666,10 @@ static Flow *FlowGetUsedFlow(void) f->fb = NULL; FBLOCK_UNLOCK(fb); + /* invoke flow log api */ + if (dtv && dtv->output_flow_thread_data) + (void)OutputFlowLog(tv, dtv->output_flow_thread_data, f); + FlowClearMemory(f, f->protomap); FLOWLOCK_UNLOCK(f); diff --git a/src/flow-hash.h b/src/flow-hash.h index 0de1a64346..a5635b06ce 100644 --- a/src/flow-hash.h +++ b/src/flow-hash.h @@ -68,7 +68,7 @@ typedef struct FlowBucket_ { /* prototypes */ -Flow *FlowGetFlowFromHash(const Packet *); +Flow *FlowGetFlowFromHash(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *); /** enable to print stats on hash lookups in flow-debug.log */ //#define FLOW_DEBUG_STATS diff --git a/src/flow.c b/src/flow.c index 3ba14d7464..516e684e94 100644 --- a/src/flow.c +++ b/src/flow.c @@ -232,14 +232,15 @@ static inline int FlowUpdateSeenFlag(const Packet *p) * This is called for every packet. * * \param tv threadvars + * \param dtv decode thread vars (for flow output api thread data) * \param p packet to handle flow for */ -void FlowHandlePacket(ThreadVars *tv, Packet *p) +void FlowHandlePacket(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p) { /* Get this packet's flow from the hash. FlowHandlePacket() will setup * a new flow if nescesary. If we get NULL, we're out of flow memory. * The returned flow is locked. */ - Flow *f = FlowGetFlowFromHash(p); + Flow *f = FlowGetFlowFromHash(tv, dtv, p); if (f == NULL) return; diff --git a/src/flow.h b/src/flow.h index d1896d2019..1cec317603 100644 --- a/src/flow.h +++ b/src/flow.h @@ -401,7 +401,7 @@ typedef struct FlowProto_ { int (*GetProtoState)(void *); } FlowProto; -void FlowHandlePacket (ThreadVars *, Packet *); +void FlowHandlePacket (ThreadVars *, DecodeThreadVars *, Packet *); void FlowInitConfig (char); void FlowPrintQueueInfo (void); void FlowShutdown(void); diff --git a/src/util-unittest-helper.c b/src/util-unittest-helper.c index bac2083e21..f2c96b08e3 100644 --- a/src/util-unittest-helper.c +++ b/src/util-unittest-helper.c @@ -831,7 +831,7 @@ uint32_t UTHBuildPacketOfFlows(uint32_t start, uint32_t end, uint8_t dir) { p->src.addr_data32[0] = i + 1; p->dst.addr_data32[0] = i; } - FlowHandlePacket(NULL, p); + FlowHandlePacket(NULL, NULL, p); if (p->flow != NULL) SC_ATOMIC_RESET(p->flow->use_cnt);