diff --git a/src/flow-hash.c b/src/flow-hash.c index 1fea10a67d..47e7711a23 100644 --- a/src/flow-hash.c +++ b/src/flow-hash.c @@ -507,6 +507,121 @@ static Flow *FlowGetNew(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p) return f; } +Flow *FlowGetFlowFromHashByPacket(const Packet *p) +{ + Flow *f = NULL; + + /* get the key to our bucket */ + uint32_t key = FlowGetKey(p); + /* get our hash bucket and lock it */ + FlowBucket *fb = &flow_hash[key]; + FBLOCK_LOCK(fb); + + SCLogDebug("fb %p fb->head %p", fb, fb->head); + + f = FlowGetNew(NULL, NULL, p); + if (f != NULL) { + /* flow is locked */ + if (fb->head == NULL) { + fb->head = f; + fb->tail = f; + } else { + f->hprev = fb->tail; + fb->tail->hnext = f; + fb->tail = f; + } + + /* got one, now lock, initialize and return */ + FlowInit(f, p); + f->fb = fb; + /* update the last seen timestamp of this flow */ + COPY_TIMESTAMP(&p->ts,&f->lastts); + + } + FBLOCK_UNLOCK(fb); + return f; +} + +/** \brief Lookup flow based on packet + * + * Find the flow belonging to this packet. If not found, no new flow + * is set up. + * + * \param p packet to lookup the flow for + * + * \retval f flow or NULL if not found + */ +Flow *FlowLookupFlowFromHash(const Packet *p) +{ + Flow *f = NULL; + + /* get the key to our bucket */ + uint32_t key = FlowGetKey(p); + /* get our hash bucket and lock it */ + FlowBucket *fb = &flow_hash[key]; + FBLOCK_LOCK(fb); + + SCLogDebug("fb %p fb->head %p", fb, fb->head); + + /* see if the bucket already has a flow */ + if (fb->head == NULL) { + FBLOCK_UNLOCK(fb); + return NULL; + } + + /* ok, we have a flow in the bucket. Let's find out if it is our flow */ + f = fb->head; + + /* see if this is the flow we are looking for */ + if (FlowCompare(f, p) == 0) { + while (f) { + FlowHashCountIncr; + + f = f->hnext; + + if (f == NULL) { + FBLOCK_UNLOCK(fb); + return NULL; + } + + if (FlowCompare(f, p) != 0) { + /* we found our flow, lets put it on top of the + * hash list -- this rewards active flows */ + if (f->hnext) { + f->hnext->hprev = f->hprev; + } + if (f->hprev) { + f->hprev->hnext = f->hnext; + } + if (f == fb->tail) { + fb->tail = f->hprev; + } + + f->hnext = fb->head; + f->hprev = NULL; + fb->head->hprev = f; + fb->head = f; + + /* found our flow, lock & return */ + FLOWLOCK_WRLOCK(f); + /* update the last seen timestamp of this flow */ + COPY_TIMESTAMP(&p->ts,&f->lastts); + + FBLOCK_UNLOCK(fb); + return f; + } + } + } + + /* lock & return */ + FLOWLOCK_WRLOCK(f); + /* update the last seen timestamp of this flow */ + COPY_TIMESTAMP(&p->ts,&f->lastts); + + FBLOCK_UNLOCK(fb); + return f; +} + /** \brief Get Flow for packet * * Hash retrieval function for flows. Looks up the hash bucket containing the diff --git a/src/flow.h b/src/flow.h index 2ef95d724f..66df3561f7 100644 --- a/src/flow.h +++ b/src/flow.h @@ -575,9 +575,11 @@ int FlowClearMemory(Flow *,uint8_t ); AppProto FlowGetAppProtocol(Flow *f); void *FlowGetAppState(Flow *f); - void FlowHandlePacketUpdateRemove(Flow *f, Packet *p); void FlowHandlePacketUpdate(Flow *f, Packet *p); +Flow *FlowGetFlowFromHashByPacket(const Packet *p); +Flow *FlowLookupFlowFromHash(const Packet *p); + #endif /* __FLOW_H__ */ diff --git a/src/stream-tcp.c b/src/stream-tcp.c index 6f339ae13e..a24c86543f 100644 --- a/src/stream-tcp.c +++ b/src/stream-tcp.c @@ -4687,6 +4687,211 @@ static inline int StreamTcpValidateChecksum(Packet *p) return ret; } +/** \internal + * \brief check if a packet is a valid stream started + * \retval bool true/false */ +static int TcpSessionPacketIsStreamStarter(const Packet *p) +{ + uint8_t flag = TH_SYN; + + //SCLogInfo("Want: %02x, have %02x", flag, p->tcph->th_flags); + + if (p->tcph->th_flags == flag) { + SCLogDebug("packet %"PRIu64" is a stream starter: %02x", p->pcap_cnt, p->tcph->th_flags); + return 1; + } + return 0; +} + +/** \internal + * \brief Check if Flow and TCP SSN allow this flow/tuple to be reused + * \retval bool true yes reuse, false no keep tracking old ssn */ +int TcpSessionReuseDoneEnough(Packet *p, const TcpSession *ssn) +{ + if (FlowGetPacketDirection(p->flow, p) == TOSERVER) { + if (ssn == NULL) { + SCLogDebug("steam starter packet %"PRIu64", ssn %p null. No reuse.", p->pcap_cnt, ssn); + return 0; + } + if (SEQ_EQ(ssn->client.isn, TCP_GET_SEQ(p))) { + SCLogDebug("steam starter packet %"PRIu64", ssn %p. Packet SEQ == Stream ISN. Retransmission. Don't reuse.", p->pcap_cnt, ssn); + return 0; + } + if (ssn->state >= TCP_LAST_ACK) { + SCLogDebug("steam starter packet %"PRIu64", ssn %p state >= TCP_LAST_ACK (%u). Reuse.", p->pcap_cnt, ssn, ssn->state); + return 1; + } + if (ssn->state == TCP_NONE) { + SCLogDebug("steam starter packet %"PRIu64", ssn %p state == TCP_NONE (%u). Reuse.", p->pcap_cnt, ssn, ssn->state); + return 1; + } + if (ssn->state < TCP_LAST_ACK) { + SCLogDebug("steam starter packet %"PRIu64", ssn %p state < TCP_LAST_ACK (%u). Don't reuse.", p->pcap_cnt, ssn, ssn->state); + return 0; + } + + } else { + if (ssn == NULL) { + SCLogDebug("steam starter packet %"PRIu64", ssn %p null. Reuse.", p->pcap_cnt, ssn); + return 1; + } + if (ssn->state >= TCP_LAST_ACK) { + SCLogDebug("steam starter packet %"PRIu64", ssn %p state >= TCP_LAST_ACK (%u). Reuse.", p->pcap_cnt, ssn, ssn->state); + return 1; + } + if (ssn->state == TCP_NONE) { + SCLogDebug("steam starter packet %"PRIu64", ssn %p state == TCP_NONE (%u). Reuse.", p->pcap_cnt, ssn, ssn->state); + return 1; + } + if (ssn->state < TCP_LAST_ACK) { + SCLogDebug("steam starter packet %"PRIu64", ssn %p state < TCP_LAST_ACK (%u). Don't reuse.", p->pcap_cnt, ssn, ssn->state); + return 0; + } + } + + + SCLogDebug("default: how did we get here?"); + return 0; +} + +/** \brief Handle TCP reuse of tuple + * + * Logic: + * 1. see if packet could trigger a new session + * 2. see if the flow/ssn is in a state where we want to support the reuse + * 3. disconnect packet from the old flow + * -> at this point new packets can still find the old flow + * -> as the flow's reference count != 0, it can't disappear + * 4. setup a new flow unconditionally + * 5. attach packet to new flow + * 6. tag old flow as FLOW_TCP_REUSED + * -> NEW packets won't find it + * -> existing packets in our queues may still reference it + * 7. dereference the old flow (reference cnt *may* now be 0, + * if no other packets reference it) + * + * The packets that still hold a reference to the old flow are updated + * by HandleFlowReuseApplyToPacket() + */ +static void TcpSessionReuseHandle(Packet *p) { + if (likely(TcpSessionPacketIsStreamStarter(p) == 0)) + return; + + int reuse = 0; + FLOWLOCK_RDLOCK(p->flow); + reuse = TcpSessionReuseDoneEnough(p, p->flow->protoctx); + if (!reuse) { + SCLogDebug("steam starter packet %"PRIu64", but state not " + "ready to be reused", p->pcap_cnt); + FLOWLOCK_UNLOCK(p->flow); + return; + } + + /* ok, this packet needs a new flow */ + + /* first, get a reference to the old flow */ + Flow *old_f = NULL; + FlowReference(&old_f, p->flow); + + /* get some settings that we move over to the new flow */ + FlowThreadId thread_id = old_f->thread_id; + int autofp_tmqh_flow_qid = SC_ATOMIC_GET(old_f->autofp_tmqh_flow_qid); + + /* disconnect the packet from the old flow */ + FlowHandlePacketUpdateRemove(p->flow, p); + FLOWLOCK_UNLOCK(p->flow); + FlowDeReference(&p->flow); // < can't disappear while usecnt >0 + + /* Can't tag flow as reused yet, would be a race condition: + * new packets will not get old flow because of FLOW_TCP_REUSED, + * so new flow may be created. This new flow could be handled in + * a different thread. */ + + /* Get a flow. It will be either a locked flow or NULL */ + Flow *new_f = FlowGetFlowFromHashByPacket(p); + if (new_f == NULL) { + FlowDeReference(&old_f); // < can't disappear while usecnt >0 + return; + } + + /* update flow and packet */ + FlowHandlePacketUpdate(new_f, p); + BUG_ON(new_f != p->flow); + + /* copy flow balancing settings */ + new_f->thread_id = thread_id; + SC_ATOMIC_SET(new_f->autofp_tmqh_flow_qid, autofp_tmqh_flow_qid); + + FLOWLOCK_UNLOCK(new_f); + + /* tag original flow that it's now unused */ + FLOWLOCK_WRLOCK(old_f); + SCLogDebug("old flow %p tagged with FLOW_TCP_REUSED by packet %"PRIu64"!", old_f, p->pcap_cnt); + old_f->flags |= FLOW_TCP_REUSED; + FLOWLOCK_UNLOCK(old_f); + FlowDeReference(&old_f); // < can't disappear while usecnt >0 + + SCLogDebug("new flow %p set up for packet %"PRIu64"!", p->flow, p->pcap_cnt); +} + +/** \brief Handle packets that reference the wrong flow because of TCP reuse + * + * In the case of TCP reuse we can have many packets that were assigned + * a flow by the capture/decode threads before the stream engine decided + * that a new flow was needed for these packets. + * When HandleFlowReuse creates a new flow, the packets already processed + * by the flow engine will still reference the old flow. + * + * This function detects this case and replaces the flow for those packets. + * It's a fairly expensive operation, but it should be rare as it's only + * done for packets that were already in the engine when the TCP reuse + * case was handled. New packets are assigned the correct flow by the + * flow engine. + */ +static void TcpSessionReuseHandleApplyToPacket(Packet *p) +{ + int need_flow_replace = 0; + + FLOWLOCK_WRLOCK(p->flow); + if (p->flow->flags & FLOW_TCP_REUSED) { + SCLogDebug("packet %"PRIu64" attached to outdated flow and ssn", p->pcap_cnt); + need_flow_replace = 1; + } + + if (likely(need_flow_replace == 0)) { + /* Work around a race condition: if HandleFlowReuse has inserted a new flow, + * it will not have seen both sides of the session yet. The packet we have here + * may be the first that got the flow directly from the hash right after the + * flow was added. In this case it won't have FLOW_PKT_ESTABLISHED flag set. */ + if ((p->flow->flags & FLOW_TO_DST_SEEN) && (p->flow->flags & FLOW_TO_SRC_SEEN)) { + p->flowflags |= FLOW_PKT_ESTABLISHED; + SCLogDebug("packet %"PRIu64" / flow %p: p->flowflags |= FLOW_PKT_ESTABLISHED (%u/%u)", p->pcap_cnt, p->flow, p->flow->todstpktcnt, p->flow->tosrcpktcnt); + } else { + SCLogDebug("packet %"PRIu64" / flow %p: p->flowflags NOT FLOW_PKT_ESTABLISHED (%u/%u)", p->pcap_cnt, p->flow, p->flow->todstpktcnt, p->flow->tosrcpktcnt); + } + SCLogDebug("packet %"PRIu64" attached to regular flow %p and ssn", p->pcap_cnt, p->flow); + FLOWLOCK_UNLOCK(p->flow); + return; + } + + /* disconnect packet from old flow */ + FlowHandlePacketUpdateRemove(p->flow, p); + FLOWLOCK_UNLOCK(p->flow); + FlowDeReference(&p->flow); // < can't disappear while usecnt >0 + + /* find the new flow that does belong to this packet */ + Flow *new_f = FlowLookupFlowFromHash(p); + if (new_f == NULL) { + // TODO reset packet flag wrt flow: direction, HAS_FLOW etc + p->flags &= ~PKT_HAS_FLOW; + return; + } + FlowHandlePacketUpdate(new_f, p); + BUG_ON(new_f != p->flow); + FLOWLOCK_UNLOCK(new_f); + SCLogDebug("packet %"PRIu64" switched over to new flow %p!", p->pcap_cnt, p->flow); +} + TmEcode StreamTcp (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) { StreamTcpThread *stt = (StreamTcpThread *)data; @@ -4709,6 +4914,20 @@ TmEcode StreamTcp (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, Packe p->flags |= PKT_IGNORE_CHECKSUM; } +// TODO autofp only somehow + /* "autofp" handling of TCP session/flow reuse */ + if (!(p->flags & PKT_PSEUDO_STREAM_END)) { + /* apply previous reuses to this packet */ + TcpSessionReuseHandleApplyToPacket(p); + if (p->flow == NULL) + return ret; + + /* after that, check for 'new' reuse */ + TcpSessionReuseHandle(p); + if (p->flow == NULL) + return ret; + } + AppLayerProfilingReset(stt->ra_ctx->app_tctx); FLOWLOCK_WRLOCK(p->flow);