From 8e444f177207c34ab3ed1a367ac548f01b9470be Mon Sep 17 00:00:00 2001 From: Gurvinder Singh Date: Sat, 3 Apr 2010 11:54:20 +0200 Subject: [PATCH] stream and application layer improvements --- src/app-layer-detect-proto.c | 47 +- src/app-layer-htp.c | 12 + src/app-layer-parser.c | 5 - src/stream-tcp-private.h | 77 +- src/stream-tcp-reassemble.c | 1478 ++++++++++++++++++++++++++++++++-- src/stream-tcp-reassemble.h | 2 + src/stream-tcp.c | 6 + src/stream.c | 2 +- src/stream.h | 3 + 9 files changed, 1551 insertions(+), 81 deletions(-) diff --git a/src/app-layer-detect-proto.c b/src/app-layer-detect-proto.c index c3a867ed06..dfdb0d132a 100644 --- a/src/app-layer-detect-proto.c +++ b/src/app-layer-detect-proto.c @@ -54,8 +54,12 @@ typedef struct AlpProtoDetectDirection_ { uint32_t id; uint16_t map[ALP_DETECT_MAX]; /**< a mapping between condition id's and protocol */ - uint16_t max_depth; /**< max depth of all patterns, so we can + uint16_t max_len; /**< max length of all patterns, so we can limit the search */ + uint16_t min_len; /**< min length of all patterns, so we can + tell the stream engine to feed data + to app layer as soon as it has min + size data */ } AlpProtoDetectDirection; typedef struct AlpProtoDetectCtx_ { @@ -86,6 +90,8 @@ void AlpProtoInit(AlpProtoDetectCtx *ctx) { ctx->toserver.id = 0; ctx->toclient.id = 0; + ctx->toclient.min_len = INSPECT_BYTES; + ctx->toserver.min_len = INSPECT_BYTES; } void AlpProtoTestDestroy(AlpProtoDetectCtx *ctx) { @@ -131,8 +137,13 @@ void AlpProtoAdd(AlpProtoDetectCtx *ctx, uint16_t ip_proto, uint16_t al_proto, c dir->map[dir->id] = al_proto; dir->id++; - if (depth > dir->max_depth) - dir->max_depth = depth; + if (depth > dir->max_len) + dir->max_len = depth; + + /* set the min_len for the stream engine to set the min smsg size for app + layer*/ + if (depth < dir->min_len) + dir->min_len = depth; /* no longer need the cd */ DetectContentFree(cd); @@ -189,9 +200,10 @@ void AlpProtoFinalizeGlobal(AlpProtoDetectCtx *ctx) { exit(EXIT_FAILURE); #endif - /* tell the stream reassembler we only want chunks of size max_depth */ - StreamMsgQueueSetMinInitChunkLen(FLOW_PKT_TOCLIENT, ctx->toclient.max_depth); - StreamMsgQueueSetMinInitChunkLen(FLOW_PKT_TOSERVER, ctx->toserver.max_depth); + /* tell the stream reassembler, that initially we only want chunks of size + min_len */ + StreamMsgQueueSetMinInitChunkLen(FLOW_PKT_TOCLIENT, ctx->toclient.min_len); + StreamMsgQueueSetMinInitChunkLen(FLOW_PKT_TOSERVER, ctx->toserver.min_len); } void AppLayerDetectProtoThreadInit(void) { @@ -305,8 +317,8 @@ uint16_t AppLayerDetectGetProto(AlpProtoDetectCtx *ctx, AlpProtoDetectThreadCtx /* see if we can limit the data we inspect */ uint16_t searchlen = buflen; - if (searchlen > dir->max_depth) - searchlen = dir->max_depth; + if (searchlen > dir->max_len) + searchlen = dir->max_len; uint16_t proto = ALPROTO_UNKNOWN; uint32_t cnt = 0; @@ -448,21 +460,24 @@ int AppLayerHandleMsg(AlpProtoDetectThreadCtx *dp_ctx, StreamMsg *smsg) /* store the proto and setup the L7 data array */ StreamL7DataPtrInit(ssn); ssn->alproto = alproto; + ssn->flags |= STREAMTCP_FLAG_APPPROTO_DETECTION_COMPLETED; r = AppLayerParse(smsg->flow, alproto, smsg->flags, smsg->data.data, smsg->data.data_len); } else { - SCLogDebug("ALPROTO_UNKNOWN flow %p", smsg->flow); - - TcpSession *ssn = smsg->flow->protoctx; - if (ssn != NULL) { - if (smsg->flags & STREAM_TOCLIENT) { - StreamTcpSetSessionNoReassemblyFlag(ssn, 1); - } else if (smsg->flags & STREAM_TOSERVER) { + if (smsg->flags & STREAM_TOSERVER) { + if (smsg->data.data_len >= alp_proto_ctx.toserver.max_len) { + ssn->flags |= STREAMTCP_FLAG_APPPROTO_DETECTION_COMPLETED; + SCLogDebug("ALPROTO_UNKNOWN flow %p", smsg->flow); StreamTcpSetSessionNoReassemblyFlag(ssn, 0); } + } else if (smsg->flags & STREAM_TOCLIENT) { + if (smsg->data.data_len >= alp_proto_ctx.toclient.max_len) { + ssn->flags |= STREAMTCP_FLAG_APPPROTO_DETECTION_COMPLETED; + SCLogDebug("ALPROTO_UNKNOWN flow %p", smsg->flow); + StreamTcpSetSessionNoReassemblyFlag(ssn, 1); + } } - } } else { SCLogDebug("stream data (len %" PRIu32 " (%" PRIu32 ")), alproto " diff --git a/src/app-layer-htp.c b/src/app-layer-htp.c index bcd53cf6a1..fa5e00d0d3 100644 --- a/src/app-layer-htp.c +++ b/src/app-layer-htp.c @@ -156,6 +156,12 @@ static int HTPHandleRequestData(Flow *f, void *htp_state, HtpState *hstate = (HtpState *)htp_state; + if (hstate->connp->in_status == STREAM_STATE_ERROR) { + SCLogError(SC_ERR_ALPARSER, "Inbound parser is in error state, no" + " need to feed data to libhtp"); + SCReturnInt(-1); + } + /* Unset the body inspection (the callback should * reactivate it if necessary) */ hstate->flags &= ~HTP_FLAG_NEW_BODY_SET; @@ -242,6 +248,12 @@ static int HTPHandleResponseData(Flow *f, void *htp_state, HtpState *hstate = (HtpState *)htp_state; + if (hstate->connp->out_status == STREAM_STATE_ERROR) { + SCLogError(SC_ERR_ALPARSER, "Outbound parser is in error state, no" + " need to feed data to libhtp"); + SCReturnInt(-1); + } + /* Unset the body inspection (the callback should * reactivate it if necessary) */ hstate->flags &= ~HTP_FLAG_NEW_BODY_SET; diff --git a/src/app-layer-parser.c b/src/app-layer-parser.c index c8805e20a7..37c4338a7e 100644 --- a/src/app-layer-parser.c +++ b/src/app-layer-parser.c @@ -1077,11 +1077,6 @@ static int AppLayerParserTest01 (void) goto end; } - if (ssn.aldata != NULL) { - printf("App Layer state has not been cleared\n"); - result = 0; - goto end; - } end: StreamL7DataPtrFree(&ssn); StreamTcpFreeConfig(TRUE); diff --git a/src/stream-tcp-private.h b/src/stream-tcp-private.h index 4232999cd5..7a3743cb5e 100644 --- a/src/stream-tcp-private.h +++ b/src/stream-tcp-private.h @@ -7,6 +7,7 @@ typedef struct TcpSegment_ { uint16_t payload_len; /* actual size of the payload */ uint32_t seq; uint16_t pool_size; /* size of the memory */ + uint8_t flags; struct TcpSegment_ *next; struct TcpSegment_ *prev; } TcpSegment; @@ -30,6 +31,10 @@ typedef struct TcpStream_ { uint8_t os_policy; /**< target based OS policy used for reassembly and handling packets*/ uint16_t flags; /**< Flag specific to the stream e.g. Timestamp */ TcpSegment *seg_list_tail; /**< Last segment in the reassembled stream seg list*/ + uint32_t tmp_ra_base_seq; /**< Temporary reassembled seq, to be used until + app layer protocol has not been detected, + beacuse every smsg needs to contain all the + initial segments too */ } TcpStream; /* from /usr/include/netinet/tcp.h */ @@ -49,20 +54,64 @@ enum TCP_CLOSED, }; -#define STREAMTCP_FLAG_MIDSTREAM 0x0001 /**< Flag for mid stream session*/ -#define STREAMTCP_FLAG_MIDSTREAM_ESTABLISHED 0x0002 /**< Flag for mid stream established session*/ -#define STREAMTCP_FLAG_MIDSTREAM_SYNACK 0x0004 /**flags |= STREAMTCP_FLAG_PAUSE_TOSERVER_REASSEMBLY) : + (ssn->flags |= STREAMTCP_FLAG_PAUSE_TOCLIENT_REASSEMBLY); +} + +/** \brief Unpause the reassembling for the given stream direction for given TCP + * session. + * + * \param ssn TCP Session to set the flag in + * \param direction direction to set the flag in: 1 toserver, 0 toclient + */ + +void StreamTcpReassembleUnPause (TcpSession *ssn, char direction) +{ + direction ? (ssn->flags &= ~STREAMTCP_FLAG_PAUSE_TOSERVER_REASSEMBLY) : + (ssn->flags &= ~STREAMTCP_FLAG_PAUSE_TOCLIENT_REASSEMBLY); +} + int StreamTcpReassembleHandleSegmentUpdateACK (TcpReassemblyThreadCtx *ra_ctx, TcpSession *ssn, TcpStream *stream, Packet *p) { SCEnter(); - if (stream->seg_list == NULL) + if (stream->seg_list == NULL) { + SCLogDebug("no segments in the list to reassemble !!"); SCReturnInt(0); + } - SCLogDebug("start p %p", p); + /* check if reassembling has been paused for the moment or not */ + if (PKT_IS_TOSERVER(p)) { + if (ssn->flags & STREAMTCP_FLAG_PAUSE_TOCLIENT_REASSEMBLY) { + SCLogDebug("toclient reassembling has been paused, so no" + " reassembling at the moment !!"); + SCReturnInt(0); + } + } else { + if (ssn->flags & STREAMTCP_FLAG_PAUSE_TOSERVER_REASSEMBLY) { + SCLogDebug("toserver reassembling has been paused, so no" + " reassembling at the moment !!"); + SCReturnInt(0); + } + } - StreamMsg *smsg = NULL; - uint16_t smsg_offset = 0; - uint16_t payload_offset = 0; - uint16_t payload_len = 0; - TcpSegment *seg = stream->seg_list; - uint32_t next_seq = stream->ra_base_seq + 1; + uint32_t ra_base_seq; + + /* check if we have detected the app layer protocol or not. If it has been + detected then, process data normally, as we have sent one smsg from + toserver side already to the app layer */ + if (!(ssn->flags & STREAMTCP_FLAG_APPPROTO_DETECTION_COMPLETED)) { + /* Do not perform reassembling of data from server, until the app layer + proto has been detected and we have sent atleast one smsg from client + data to app layer */ + if (PKT_IS_TOSERVER(p)) { + SCLogDebug("we didn't detected the app layer protocol till " + "yet, so not doing toclient reassembling"); + SCReturnInt(0); + /* unset the queue init flag, as app layer protocol has not been + detected till yet and we need to send the initial smsg again to app + layer */ + } if (PKT_IS_TOCLIENT(p)) { + ra_ctx->stream_q->flags &= ~STREAMQUEUE_FLAG_INIT; + } + /* initialize the tmp_ra_base_seq for each new run */ + stream->tmp_ra_base_seq = stream->ra_base_seq; + ra_base_seq = stream->tmp_ra_base_seq; + /* if app layer protocol has been detected, then restore the reassembled + seq. to the value till reassembling has been done and unset the queue + init flag permanently for this tcp session */ + } else if (stream->tmp_ra_base_seq > stream->ra_base_seq) { + stream->ra_base_seq = stream->tmp_ra_base_seq; + ra_ctx->stream_q->flags &= ~STREAMQUEUE_FLAG_INIT; + ra_base_seq = stream->ra_base_seq; + SCLogDebug("the app layer protocol has been detected"); + /* set the ra_bas_seq to stream->ra_base_seq as now app layer protocol + has been detected */ + } else { + ra_base_seq = stream->ra_base_seq; + } /* check if we have enough data to send to L7 */ if (StreamTcpReassembleCheckLimit(ssn,stream,p) == 0) { @@ -1349,16 +1420,53 @@ int StreamTcpReassembleHandleSegmentUpdateACK (TcpReassemblyThreadCtx *ra_ctx, SCReturnInt(0); } + SCLogDebug("start p %p", p); + + StreamMsg *smsg = NULL; + uint16_t smsg_offset = 0; + uint16_t payload_offset = 0; + uint16_t payload_len = 0; + TcpSegment *seg = stream->seg_list; + uint32_t next_seq = ra_base_seq + 1; + /* loop through the segments and fill one or more msgs */ for (; seg != NULL && SEQ_LT(seg->seq, stream->last_ack);) { SCLogDebug("seg %p", seg); + /* if app layer protocol has been detected, then remove all the segments + which has been previously processed and reassembled */ + if ((ssn->flags & STREAMTCP_FLAG_APPPROTO_DETECTION_COMPLETED) && + (seg->flags & SEGMENTTCP_FLAG_PROCESSED)) + { + SCLogDebug("segment(%p) of length %"PRIu16" has been processed," + " so return it to pool", seg, seg->payload_len); + TcpSegment *next_seg = seg->next; + + if (seg->prev == NULL) { + stream->seg_list = seg->next; + if (stream->seg_list != NULL) + stream->seg_list->prev = NULL; + } else { + seg->prev->next = seg->next; + if (seg->next != NULL) + seg->next->prev = seg->prev; + } + + if (stream->seg_list_tail == seg) + stream->seg_list_tail = next_seg; + seg->flags &= ~SEGMENTTCP_FLAG_PROCESSED; + StreamTcpSegmentReturntoPool(seg); + seg = next_seg; + continue; + + } + /* If packets are fully before ra_base_seq, skip them. We do this * because we've reassembled up to the ra_base_seq point already, * so we won't do anything with segments before it anyway. */ SCLogDebug("checking for pre ra_base_seq %"PRIu32" seg %p seq %"PRIu32"" " len %"PRIu16", combined %"PRIu32" and stream->last_ack " - "%"PRIu32"", stream->ra_base_seq, seg, seg->seq, + "%"PRIu32"", ra_base_seq, seg, seg->seq, seg->payload_len, seg->seq+seg->payload_len, stream->last_ack); /* Remove the segments which are either completely before the @@ -1367,12 +1475,12 @@ int StreamTcpReassembleHandleSegmentUpdateACK (TcpReassemblyThreadCtx *ra_ctx, As we are copying until the stream->last_ack only */ /** \todo we should probably not even insert them into the seglist */ - if (SEQ_LEQ((seg->seq + seg->payload_len), (stream->ra_base_seq+1)) || - SEQ_LEQ(stream->last_ack, (stream->ra_base_seq + - (stream->ra_base_seq - seg->seq)))) + if (SEQ_LEQ((seg->seq + seg->payload_len), (ra_base_seq+1)) || + SEQ_LEQ(stream->last_ack, (ra_base_seq + + (ra_base_seq - seg->seq)))) { SCLogDebug("removing pre ra_base_seq %"PRIu32" seg %p seq %"PRIu32"" - " len %"PRIu16"", stream->ra_base_seq, seg, seg->seq, + " len %"PRIu16"", ra_base_seq, seg, seg->seq, seg->payload_len); TcpSegment *next_seg = seg->next; @@ -1408,9 +1516,30 @@ int StreamTcpReassembleHandleSegmentUpdateACK (TcpReassemblyThreadCtx *ra_ctx, /* pass on pre existing smsg (if any) */ if (smsg != NULL && smsg->data.data_len > 0) { - StreamMsgPutInQueue(ra_ctx->stream_q, smsg); - smsg = NULL; + /* if app layer protocol has not been detected till yet, + then check did we have sent message to app layer already + or not. If not then sent the message and set flag that first + message has been sent. No more data till proto has not + been detected */ + if (!(ssn->flags & STREAMTCP_FLAG_APPPROTO_DETECTION_COMPLETED)) { + if (!(ra_ctx->stream_q->flags & STREAMQUEUE_FLAG_INIT)) { + StreamMsgPutInQueue(ra_ctx->stream_q, smsg); + smsg = NULL; + ra_ctx->stream_q->flags |= STREAMQUEUE_FLAG_INIT; + SCLogDebug("queueing the stream data and setting the" + " queue init flag"); + } else { + StreamMsgPutInQueue(ra_ctx->stream_q, smsg); + smsg = NULL; + } + stream->tmp_ra_base_seq = ra_base_seq; + } else { + StreamMsgPutInQueue(ra_ctx->stream_q, smsg); + stream->ra_base_seq = ra_base_seq; + smsg = NULL; + } } + if (smsg == NULL) { smsg = StreamMsgGetFromPool(); if (smsg == NULL) { @@ -1418,13 +1547,23 @@ int StreamTcpReassembleHandleSegmentUpdateACK (TcpReassemblyThreadCtx *ra_ctx, return -1; } } + /* we need to update the ra_base_seq, if app layer proto has + been detected and we are setting new stream message. Otherwise + every smsg will be with flag STREAM_START set, which we + don't want :-) */ + if (ssn->flags & STREAMTCP_FLAG_APPPROTO_DETECTION_COMPLETED) { + stream->ra_base_seq = ra_base_seq; + } else { + stream->tmp_ra_base_seq = ra_base_seq; + } + StreamTcpSetupMsg(ssn, stream, p, smsg); /* We have missed the packet and end host has ack'd it, so * IDS should advance it's ra_base_seq and should not consider this * packet any longer, even if it is retransmitted, as end host will * drop it anyway */ - stream->ra_base_seq = seg->seq - 1; + ra_base_seq = seg->seq - 1; smsg->flags |= STREAM_GAP; smsg->gap.gap_size = gap_len; @@ -1435,10 +1574,10 @@ int StreamTcpReassembleHandleSegmentUpdateACK (TcpReassemblyThreadCtx *ra_ctx, } /* if the segment ends beyond ra_base_seq we need to consider it */ - if (SEQ_GT((seg->seq + seg->payload_len), stream->ra_base_seq)) { + if (SEQ_GT((seg->seq + seg->payload_len), ra_base_seq)) { SCLogDebug("seg->seq %" PRIu32 ", seg->payload_len %" PRIu32 ", " "stream->ra_base_seq %" PRIu32 "", seg->seq, - seg->payload_len, stream->ra_base_seq); + seg->payload_len, ra_base_seq); if (smsg == NULL) { smsg = StreamMsgGetFromPool(); if (smsg == NULL) { @@ -1447,16 +1586,17 @@ int StreamTcpReassembleHandleSegmentUpdateACK (TcpReassemblyThreadCtx *ra_ctx, } smsg_offset = 0; + StreamTcpSetupMsg(ssn, stream, p, smsg); } /* handle segments partly before ra_base_seq */ - if (SEQ_GT(stream->ra_base_seq, seg->seq)) { - payload_offset = stream->ra_base_seq - seg->seq; + if (SEQ_GT(ra_base_seq, seg->seq)) { + payload_offset = ra_base_seq - seg->seq; if (SEQ_LT(stream->last_ack, (seg->seq + seg->payload_len))) { - if (SEQ_LT(stream->last_ack, stream->ra_base_seq)) { + if (SEQ_LT(stream->last_ack, ra_base_seq)) { payload_len = (stream->last_ack - seg->seq); } else { payload_len = (stream->last_ack - seg->seq) - @@ -1494,14 +1634,35 @@ int StreamTcpReassembleHandleSegmentUpdateACK (TcpReassemblyThreadCtx *ra_ctx, memcpy(smsg->data.data + smsg_offset, seg->payload + payload_offset, copy_size); smsg_offset += copy_size; - stream->ra_base_seq += copy_size; - SCLogDebug("stream->ra_base_seq %"PRIu32"", stream->ra_base_seq); + ra_base_seq += copy_size; + SCLogDebug("stream->ra_base_seq %"PRIu32"", ra_base_seq); + smsg->data.data_len += copy_size; /* queue the smsg if it's full */ if (smsg->data.data_len == sizeof (smsg->data.data)) { - StreamMsgPutInQueue(ra_ctx->stream_q, smsg); - smsg = NULL; + /* if app layer protocol has not been detected till yet, + then check did we have sent message to app layer already + or not. If not then sent the message and set flag that first + message has been sent. No more data till proto has not + been detected */ + if (!(ssn->flags & STREAMTCP_FLAG_APPPROTO_DETECTION_COMPLETED)) { + if (!(ra_ctx->stream_q->flags & STREAMQUEUE_FLAG_INIT)) { + StreamMsgPutInQueue(ra_ctx->stream_q, smsg); + smsg = NULL; + ra_ctx->stream_q->flags |= STREAMQUEUE_FLAG_INIT; + SCLogDebug("queueing the stream data and setting the" + " queue init flag"); + } else { + StreamMsgPutInQueue(ra_ctx->stream_q, smsg); + smsg = NULL; + } + stream->tmp_ra_base_seq = ra_base_seq; + } else { + StreamMsgPutInQueue(ra_ctx->stream_q, smsg); + stream->ra_base_seq = ra_base_seq; + smsg = NULL; + } } /* if the payload len is bigger than what we copied, we handle the @@ -1551,17 +1712,35 @@ int StreamTcpReassembleHandleSegmentUpdateACK (TcpReassemblyThreadCtx *ra_ctx, memcpy(smsg->data.data + smsg_offset, seg->payload + payload_offset, copy_size); smsg_offset += copy_size; - stream->ra_base_seq += copy_size; - - SCLogDebug("stream->ra_base_seq %"PRIu32"", - stream->ra_base_seq); + ra_base_seq += copy_size; + SCLogDebug("stream->ra_base_seq %"PRIu32"", ra_base_seq); smsg->data.data_len += copy_size; SCLogDebug("copied payload_offset %" PRIu32 ", " "smsg_offset %" PRIu32 ", copy_size %" PRIu32 "", payload_offset, smsg_offset, copy_size); if (smsg->data.data_len == sizeof (smsg->data.data)) { - StreamMsgPutInQueue(ra_ctx->stream_q,smsg); - smsg = NULL; + /* if app layer protocol has not been detected till yet, + then check did we have sent message to app layer already + or not. If not then sent the message and set flag that first + message has been sent. No more data till proto has not + been detected */ + if (!(ssn->flags & STREAMTCP_FLAG_APPPROTO_DETECTION_COMPLETED)) { + if (!(ra_ctx->stream_q->flags & STREAMQUEUE_FLAG_INIT)) { + StreamMsgPutInQueue(ra_ctx->stream_q, smsg); + smsg = NULL; + ra_ctx->stream_q->flags |= STREAMQUEUE_FLAG_INIT; + SCLogDebug("queueing the stream data and setting" + " the queue init flag"); + } else { + StreamMsgPutInQueue(ra_ctx->stream_q, smsg); + smsg = NULL; + } + stream->tmp_ra_base_seq = ra_base_seq; + } else { + StreamMsgPutInQueue(ra_ctx->stream_q, smsg); + stream->ra_base_seq = ra_base_seq; + smsg = NULL; + } } /* see if we have segment payload left to process */ @@ -1585,27 +1764,53 @@ int StreamTcpReassembleHandleSegmentUpdateACK (TcpReassemblyThreadCtx *ra_ctx, /* done with this segment, return it to the pool */ TcpSegment *next_seg = seg->next; next_seq = seg->seq + seg->payload_len; - SCLogDebug("removing seg %p, seg->next %p", seg, seg->next); - if (SCLogDebugEnabled()) { - BUG_ON(seg->prev != NULL); /**< BUG if we aren't the top of the list */ - } - stream->seg_list = seg->next; - if (stream->seg_list != NULL) - stream->seg_list->prev = NULL; - - /* Update seg_list_tail, in case it also points to this segment*/ - if (stream->seg_list_tail == seg) - stream->seg_list_tail = next_seg; + if (ssn->flags & STREAMTCP_FLAG_APPPROTO_DETECTION_COMPLETED) { + /* untill app layer proto has not been detected, there will be + previous segments in the list. */ + if (SCLogDebugEnabled()) + { + BUG_ON(seg->prev != NULL); /**< BUG if we aren't the top of the + list */ + } + stream->seg_list = seg->next; + if (stream->seg_list != NULL) + stream->seg_list->prev = NULL; - StreamTcpSegmentReturntoPool(seg); + /* Update seg_list_tail, in case it also points to this segment*/ + if (stream->seg_list_tail == seg) + stream->seg_list_tail = next_seg; + SCLogDebug("removing seg %p, seg->next %p", seg, seg->next); + StreamTcpSegmentReturntoPool(seg); + } else { + seg->flags |= SEGMENTTCP_FLAG_PROCESSED; + /* if we have sent smsg to app layer and protocol has not been + detected, then we need to wait before processing more segments */ + if (ra_ctx->stream_q->flags & STREAMQUEUE_FLAG_INIT) + break; + } seg = next_seg; } /* put the partly filled smsg in the queue to the l7 handler */ if (smsg != NULL) { - StreamMsgPutInQueue(ra_ctx->stream_q,smsg); - smsg = NULL; + if (!(ssn->flags & STREAMTCP_FLAG_APPPROTO_DETECTION_COMPLETED)) { + if (!(ra_ctx->stream_q->flags & STREAMQUEUE_FLAG_INIT)) { + StreamMsgPutInQueue(ra_ctx->stream_q, smsg); + smsg = NULL; + ra_ctx->stream_q->flags |= STREAMQUEUE_FLAG_INIT; + SCLogDebug("queueing the stream data and setting the queue init" + " flag"); + } else { + StreamMsgPutInQueue(ra_ctx->stream_q, smsg); + smsg = NULL; + } + stream->tmp_ra_base_seq = ra_base_seq; + } else { + StreamMsgPutInQueue(ra_ctx->stream_q, smsg); + smsg = NULL; + stream->ra_base_seq = ra_base_seq; + } } SCReturnInt(0); @@ -1639,6 +1844,7 @@ int StreamTcpReassembleProcessAppLayer(TcpReassemblyThreadCtx *ra_ctx) if (AppLayerHandleMsg(&ra_ctx->dp_ctx, smsg) != 0) r = -1; } + } while (ra_ctx->stream_q->len > 0); } @@ -3073,6 +3279,7 @@ static int StreamTcpTestMissedPacket (TcpReassemblyThreadCtx *ra_ctx, f.dst = dst; f.sp = sp; f.dp = dp; + f.protoctx = ssn; p.flow = &f; tcph.th_win = htons(5480); @@ -3320,6 +3527,12 @@ static int StreamTcpReassembleTest28 (void) { goto end; } + /* Process stream smsgs we may have in queue */ + if (StreamTcpReassembleProcessAppLayer(ra_ctx) < 0) { + printf("failed in processing stream smsgs\n"); + goto end; + } + flowflags = FLOW_PKT_TOSERVER; StreamTcpCreateTestPacket(payload, 0x42, 3, 4); /*BBB*/ seq = 12; @@ -3398,6 +3611,12 @@ static int StreamTcpReassembleTest29 (void) { goto end; } + /* Process stream smsgs we may have in queue */ + if (StreamTcpReassembleProcessAppLayer(ra_ctx) < 0) { + printf("failed in processing stream smsgs\n"); + goto end; + } + flowflags = FLOW_PKT_TOSERVER; StreamTcpCreateTestPacket(payload, 0x42, 3, 4); /*BBB*/ seq = 15; @@ -3477,6 +3696,12 @@ static int StreamTcpReassembleTest30 (void) { goto end; } + /* Process stream smsgs we may have in queue */ + if (StreamTcpReassembleProcessAppLayer(ra_ctx) < 0) { + printf("failed in processing stream smsgs\n"); + goto end; + } + flowflags = FLOW_PKT_TOSERVER; StreamTcpCreateTestPacket(payload, 0x42, 3, 4); /*BBB*/ seq = 12; @@ -3495,6 +3720,12 @@ static int StreamTcpReassembleTest30 (void) { goto end; } + /* Process stream smsgs we may have in queue */ + if (StreamTcpReassembleProcessAppLayer(ra_ctx) < 0) { + printf("failed in processing stream smsgs\n"); + goto end; + } + th_flag = TH_FIN|TH_ACK; seq = 18; ack = 20; @@ -4001,6 +4232,1157 @@ static int StreamTcpReassembleTest37(void) { return 1; } +/** + * \test Test to make sure we don't send the smsg from toclient to app layer + * until the app layer protocol has been detected and one smsg from + * toserver side has been sent to app layer. + * + * \retval On success it returns 1 and on failure 0. + */ + +static int StreamTcpReassembleTest38 (void) { + int ret = 0; + Packet p; + Flow f; + TCPHdr tcph; + Port sp; + Port dp; + Address src; + Address dst; + struct in_addr in; + TcpSession ssn; + + memset(&p, 0, sizeof (Packet)); + memset(&f, 0, sizeof (Flow)); + memset(&tcph, 0, sizeof (TCPHdr)); + memset(&src, 0, sizeof(Address)); + memset(&dst, 0, sizeof(Address)); + memset(&ssn, 0, sizeof(TcpSession)); + + StreamTcpInitConfig(TRUE); + TcpReassemblyThreadCtx *ra_ctx = StreamTcpReassembleInitThreadCtx(); + AppLayerDetectProtoThreadInit(); + + uint8_t httpbuf1[] = "POST / HTTP/1.0\r\nUser-Agent: Victor/1.0\r\n\r\n"; + uint32_t httplen1 = sizeof(httpbuf1) - 1; /* minus the \0 */ + + uint8_t httpbuf2[] = "HTTP/1.0 200 OK\r\nServer: VictorServer/1.0\r\n\r\n"; + uint32_t httplen2 = sizeof(httpbuf2) - 1; /* minus the \0 */ + + inet_pton(AF_INET, "1.2.3.4", &in); + src.family = AF_INET; + src.addr_data32[0] = in.s_addr; + inet_pton(AF_INET, "1.2.3.5", &in); + dst.family = AF_INET; + dst.addr_data32[0] = in.s_addr; + sp = 200; + dp = 220; + + ssn.server.ra_base_seq = 9; + ssn.server.isn = 9; + ssn.server.last_ack = 60; + ssn.client.ra_base_seq = 9; + ssn.client.isn = 9; + ssn.client.last_ack = 60; + ssn.alproto = ALPROTO_UNKNOWN; + + f.src = src; + f.dst = dst; + f.sp = sp; + f.dp = dp; + f.protoctx = &ssn; + p.flow = &f; + + tcph.th_win = htons(5480); + tcph.th_seq = htonl(10); + tcph.th_ack = htonl(20); + tcph.th_flags = TH_ACK|TH_PUSH; + p.tcph = &tcph; + p.flowflags = FLOW_PKT_TOCLIENT; + + p.payload = httpbuf2; + p.payload_len = httplen2; + ssn.state = TCP_ESTABLISHED; + + TcpStream *s = NULL; + s = &ssn.server; + + if (StreamTcpReassembleHandleSegment(ra_ctx, &ssn, s, &p) == -1) { + printf("failed in segments reassembly, while processing toserver packet\n"); + goto end; + } + + /* Check if we have stream smsgs in queue */ + if (ra_ctx->stream_q->len > 0) { + printf("there shouldn't be any stream smsgs in the queue\n"); + goto end; + } + + p.flowflags = FLOW_PKT_TOSERVER; + p.payload = httpbuf1; + p.payload_len = httplen1; + tcph.th_seq = htonl(10); + tcph.th_ack = htonl(55); + s = &ssn.client; + + if (StreamTcpReassembleHandleSegment(ra_ctx, &ssn, s, &p) == -1) { + printf("failed in segments reassembly, while processing toserver packet\n"); + goto end; + } + + /* Check if we have stream smsgs in queue */ + if (ra_ctx->stream_q->len > 0) { + printf("there shouldn't be any stream smsgs in the queue, as we didn't" + " processed any smsg from toserver side till yet\n"); + goto end; + } + + p.flowflags = FLOW_PKT_TOCLIENT; + p.payload = httpbuf2; + p.payload_len = httplen2; + tcph.th_seq = htonl(55); + tcph.th_ack = htonl(53); + s = &ssn.server; + if (StreamTcpReassembleHandleSegment(ra_ctx, &ssn, s, &p) == -1) { + printf("failed in segments reassembly, while processing toserver packet\n"); + goto end; + } + + /* Check if we have stream smsgs in queue */ + if (ra_ctx->stream_q->len == 0) { + printf("there should be a stream smsgs in the queue\n"); + goto end; + /* Process stream smsgs we may have in queue */ + } else if (StreamTcpReassembleProcessAppLayer(ra_ctx) < 0) { + printf("failed in processing stream smsgs\n"); + goto end; + } + + p.flowflags = FLOW_PKT_TOSERVER; + p.payload = httpbuf1; + p.payload_len = httplen1; + tcph.th_seq = htonl(53); + tcph.th_ack = htonl(100); + s = &ssn.client; + if (StreamTcpReassembleHandleSegment(ra_ctx, &ssn, s, &p) == -1) { + printf("failed in segments reassembly, while processing toserver packet\n"); + goto end; + } + + /* Check if we have stream smsgs in queue */ + if (ra_ctx->stream_q->len == 0) { + printf("there should be a stream smsgs in the queue, as we have detected" + " the app layer protocol and one smsg from toserver side has " + "been sent\n"); + goto end; + /* Process stream smsgs we may have in queue */ + } else if (StreamTcpReassembleProcessAppLayer(ra_ctx) < 0) { + printf("failed in processing stream smsgs\n"); + goto end; + } + + ret = 1; +end: + StreamTcpFreeConfig(TRUE); + StreamTcpReassembleFreeThreadCtx(ra_ctx); + return ret; +} + +/** + * \test Test to make sure that we don't return the segments until the app + * layer proto has been detected and after that remove the processed + * segments. + * + * \retval On success it returns 1 and on failure 0. + */ + +static int StreamTcpReassembleTest39 (void) { + int ret = 0; + Packet p; + Flow f; + TCPHdr tcph; + Port sp; + Port dp; + Address src; + Address dst; + struct in_addr in; + TcpSession ssn; + + memset(&p, 0, sizeof (Packet)); + memset(&f, 0, sizeof (Flow)); + memset(&tcph, 0, sizeof (TCPHdr)); + memset(&src, 0, sizeof(Address)); + memset(&dst, 0, sizeof(Address)); + memset(&ssn, 0, sizeof(TcpSession)); + + StreamTcpInitConfig(TRUE); + TcpReassemblyThreadCtx *ra_ctx = StreamTcpReassembleInitThreadCtx(); + AppLayerDetectProtoThreadInit(); + StreamMsgQueueSetMinChunkLen(FLOW_PKT_TOSERVER, 7); + StreamMsgQueueSetMinChunkLen(FLOW_PKT_TOCLIENT, 7); + + uint8_t httpbuf1[] = "POST / HTTP/1.0\r\nUser-Agent: Victor/1.0\r\n\r\n"; + uint32_t httplen1 = sizeof(httpbuf1) - 1; /* minus the \0 */ + + uint8_t httpbuf2[] = "HTTP/1.0 200 OK\r\nServer: VictorServer/1.0\r\n\r\n"; + uint32_t httplen2 = sizeof(httpbuf2) - 1; /* minus the \0 */ + + ssn.server.ra_base_seq = 9; + ssn.server.isn = 9; + ssn.server.last_ack = 60; + ssn.client.ra_base_seq = 9; + ssn.client.isn = 9; + ssn.client.last_ack = 60; + ssn.alproto = ALPROTO_UNKNOWN; + + inet_pton(AF_INET, "1.2.3.4", &in); + src.family = AF_INET; + src.addr_data32[0] = in.s_addr; + inet_pton(AF_INET, "1.2.3.5", &in); + dst.family = AF_INET; + dst.addr_data32[0] = in.s_addr; + sp = 200; + dp = 220; + + f.src = src; + f.dst = dst; + f.sp = sp; + f.dp = dp; + f.protoctx = &ssn; + p.flow = &f; + + tcph.th_win = htons(5480); + tcph.th_seq = htonl(10); + tcph.th_ack = htonl(20); + tcph.th_flags = TH_ACK|TH_PUSH; + p.tcph = &tcph; + p.flowflags = FLOW_PKT_TOCLIENT; + + p.payload = httpbuf2; + p.payload_len = httplen2; + ssn.state = TCP_ESTABLISHED; + + TcpStream *s = NULL; + s = &ssn.server; + + if (StreamTcpReassembleHandleSegment(ra_ctx, &ssn, s, &p) == -1) { + printf("failed in segments reassembly, while processing toserver packet\n"); + goto end; + } + + /* Check if we have stream smsgs in queue */ + if (ra_ctx->stream_q->len > 0) { + printf("there shouldn't be any stream smsgs in the queue\n"); + goto end; + } + + p.flowflags = FLOW_PKT_TOSERVER; + p.payload = httpbuf1; + p.payload_len = httplen1; + tcph.th_seq = htonl(10); + tcph.th_ack = htonl(55); + s = &ssn.client; + + if (StreamTcpReassembleHandleSegment(ra_ctx, &ssn, s, &p) == -1) { + printf("failed in segments reassembly, while processing toserver packet\n"); + goto end; + } + + /* Check if we have stream smsgs in queue */ + if (ra_ctx->stream_q->len > 0) { + printf("there shouldn't be any stream smsgs in the queue, as we didn't" + " processed any smsg from toserver side till yet\n"); + goto end; + } + + p.flowflags = FLOW_PKT_TOCLIENT; + p.payload = httpbuf2; + p.payload_len = httplen2; + tcph.th_seq = htonl(55); + tcph.th_ack = htonl(53); + s = &ssn.server; + if (StreamTcpReassembleHandleSegment(ra_ctx, &ssn, s, &p) == -1) { + printf("failed in segments reassembly, while processing toserver packet\n"); + goto end; + } + + /* Check if we have stream smsgs in queue */ + if (ra_ctx->stream_q->len == 0) { + printf("there should be a stream smsgs in the queue\n"); + goto end; + /* Process stream smsgs we may have in queue */ + } else if (StreamTcpReassembleProcessAppLayer(ra_ctx) < 0) { + printf("failed in processing stream smsgs\n"); + goto end; + } + + /* check is have the segment in the list and flagged or not */ + if (ssn.client.seg_list == NULL || + !(ssn.client.seg_list->flags & SEGMENTTCP_FLAG_PROCESSED)) + { + printf("the list is NULL or the processed segment has not been flaged\n"); + goto end; + } + + p.flowflags = FLOW_PKT_TOSERVER; + p.payload = httpbuf1; + p.payload_len = httplen1; + tcph.th_seq = htonl(53); + tcph.th_ack = htonl(100); + s = &ssn.client; + if (StreamTcpReassembleHandleSegment(ra_ctx, &ssn, s, &p) == -1) { + printf("failed in segments reassembly, while processing toserver packet\n"); + goto end; + } + + /* Check if we have stream smsgs in queue */ + if (ra_ctx->stream_q->len == 0 && + !(ssn.flags & STREAMTCP_FLAG_APPPROTO_DETECTION_COMPLETED)) { + printf("there should be a stream smsgs in the queue, as we have detected" + " the app layer protocol and one smsg from toserver side has " + "been sent\n"); + goto end; + /* Process stream smsgs we may have in queue */ + } else if (StreamTcpReassembleProcessAppLayer(ra_ctx) < 0) { + printf("failed in processing stream smsgs\n"); + goto end; + } + + p.flowflags = FLOW_PKT_TOCLIENT; + p.payload = httpbuf2; + p.payload_len = httplen2; + tcph.th_seq = htonl(100); + tcph.th_ack = htonl(96); + s = &ssn.server; + if (StreamTcpReassembleHandleSegment(ra_ctx, &ssn, s, &p) == -1) { + printf("failed in segments reassembly, while processing toserver packet\n"); + goto end; + } + + /* check if the segment in the list is flagged or not */ + if ((ssn.client.seg_list != NULL) && + (ssn.flags & STREAMTCP_FLAG_APPPROTO_DETECTION_COMPLETED)) { + printf("the segments should have been removed and there should be " + "no more segments in the list as they have been sent to app layer\n"); + goto end; + } + + ret = 1; +end: + StreamTcpFreeConfig(TRUE); + StreamTcpReassembleFreeThreadCtx(ra_ctx); + return ret; +} + +/** + * \test Test to make sure that we sent all the segments from the initial + * segments to app layer until we have detected the app layer proto. + * + * \retval On success it returns 1 and on failure 0. + */ + +static int StreamTcpReassembleTest40 (void) { + int ret = 0; + Packet p; + Flow f; + TCPHdr tcph; + Port sp; + Port dp; + Address src; + Address dst; + struct in_addr in; + TcpSession ssn; + + memset(&p, 0, sizeof (Packet)); + memset(&f, 0, sizeof (Flow)); + memset(&tcph, 0, sizeof (TCPHdr)); + memset(&src, 0, sizeof(Address)); + memset(&dst, 0, sizeof(Address)); + memset(&ssn, 0, sizeof(TcpSession)); + + StreamTcpInitConfig(TRUE); + TcpReassemblyThreadCtx *ra_ctx = StreamTcpReassembleInitThreadCtx(); + AppLayerDetectProtoThreadInit(); + + uint8_t httpbuf1[] = "P"; + uint32_t httplen1 = sizeof(httpbuf1) - 1; /* minus the \0 */ + uint8_t httpbuf3[] = "O"; + uint32_t httplen3 = sizeof(httpbuf3) - 1; /* minus the \0 */ + uint8_t httpbuf4[] = "S"; + uint32_t httplen4 = sizeof(httpbuf4) - 1; /* minus the \0 */ + uint8_t httpbuf5[] = "T\r\n"; + uint32_t httplen5 = sizeof(httpbuf5) - 1; /* minus the \0 */ + + uint8_t httpbuf2[] = "HTTP/1.0 200 OK\r\nServer: VictorServer/1.0\r\n\r\n"; + uint32_t httplen2 = sizeof(httpbuf2) - 1; /* minus the \0 */ + + ssn.server.ra_base_seq = 9; + ssn.server.isn = 9; + ssn.server.last_ack = 10; + ssn.client.ra_base_seq = 9; + ssn.client.isn = 9; + ssn.client.last_ack = 10; + ssn.alproto = ALPROTO_UNKNOWN; + + inet_pton(AF_INET, "1.2.3.4", &in); + src.family = AF_INET; + src.addr_data32[0] = in.s_addr; + inet_pton(AF_INET, "1.2.3.5", &in); + dst.family = AF_INET; + dst.addr_data32[0] = in.s_addr; + sp = 200; + dp = 220; + + f.src = src; + f.dst = dst; + f.sp = sp; + f.dp = dp; + f.protoctx = &ssn; + p.flow = &f; + + tcph.th_win = htons(5480); + tcph.th_seq = htonl(10); + tcph.th_ack = htonl(10); + tcph.th_flags = TH_ACK|TH_PUSH; + p.tcph = &tcph; + p.flowflags = FLOW_PKT_TOSERVER; + + p.payload = httpbuf1; + p.payload_len = httplen1; + ssn.state = TCP_ESTABLISHED; + + TcpStream *s = NULL; + s = &ssn.client; + + if (StreamTcpReassembleHandleSegment(ra_ctx, &ssn, s, &p) == -1) { + printf("failed in segments reassembly, while processing toserver packet\n"); + goto end; + } + + /* Check if we have stream smsgs in queue */ + if (ra_ctx->stream_q->len > 0) { + printf("there shouldn't be any stream smsgs in the queue, as we didn't" + " processed any smsg from toserver side till yet\n"); + goto end; + } + + p.flowflags = FLOW_PKT_TOCLIENT; + p.payload = httpbuf2; + p.payload_len = httplen2; + tcph.th_seq = htonl(10); + tcph.th_ack = htonl(11); + s = &ssn.server; + ssn.server.last_ack = 11; + + if (StreamTcpReassembleHandleSegment(ra_ctx, &ssn, s, &p) == -1) { + printf("failed in segments reassembly, while processing toserver packet\n"); + goto end; + } + + /* Process stream smsgs we may have in queue */ + if (StreamTcpReassembleProcessAppLayer(ra_ctx) < 0) { + printf("failed in processing stream smsgs\n"); + goto end; + } + + p.flowflags = FLOW_PKT_TOSERVER; + p.payload = httpbuf3; + p.payload_len = httplen3; + tcph.th_seq = htonl(11); + tcph.th_ack = htonl(55); + s = &ssn.client; + ssn.client.last_ack = 55; + + if (StreamTcpReassembleHandleSegment(ra_ctx, &ssn, s, &p) == -1) { + printf("failed in segments reassembly, while processing toserver packet\n"); + goto end; + } + + p.flowflags = FLOW_PKT_TOCLIENT; + p.payload = httpbuf2; + p.payload_len = httplen2; + tcph.th_seq = htonl(55); + tcph.th_ack = htonl(12); + s = &ssn.server; + ssn.server.last_ack = 12; + + if (StreamTcpReassembleHandleSegment(ra_ctx, &ssn, s, &p) == -1) { + printf("failed in segments reassembly, while processing toserver packet\n"); + goto end; + } + + /* check is have the segment in the list and flagged or not */ + if (ssn.client.seg_list == NULL || + !(ssn.client.seg_list->flags & SEGMENTTCP_FLAG_PROCESSED)) + { + printf("the list is NULL or the processed segment has not been flaged\n"); + goto end; + } + + /* Check if we have stream smsgs in queue */ + if (ra_ctx->stream_q->len == 0) { + printf("there should be a stream smsgs in the queue, as we have detected" + " the app layer protocol and one smsg from toserver side has " + "been sent\n"); + goto end; + /* Process stream smsgs we may have in queue */ + } else if (StreamTcpReassembleProcessAppLayer(ra_ctx) < 0) { + printf("failed in processing stream smsgs\n"); + goto end; + } + + p.flowflags = FLOW_PKT_TOSERVER; + p.payload = httpbuf4; + p.payload_len = httplen4; + tcph.th_seq = htonl(12); + tcph.th_ack = htonl(100); + s = &ssn.client; + ssn.client.last_ack = 100; + + if (StreamTcpReassembleHandleSegment(ra_ctx, &ssn, s, &p) == -1) { + printf("failed in segments reassembly, while processing toserver packet\n"); + goto end; + } + + p.flowflags = FLOW_PKT_TOCLIENT; + p.payload = httpbuf2; + p.payload_len = httplen2; + tcph.th_seq = htonl(100); + tcph.th_ack = htonl(13); + s = &ssn.server; + ssn.server.last_ack = 13; + + if (StreamTcpReassembleHandleSegment(ra_ctx, &ssn, s, &p) == -1) { + printf("failed in segments reassembly, while processing toserver packet\n"); + goto end; + } + + /* Check if we have stream smsgs in queue */ + if (ra_ctx->stream_q->len == 0) { + printf("there should be a stream smsgs in the queue, as we have detected" + " the app layer protocol and one smsg from toserver side has " + "been sent\n"); + goto end; + /* Process stream smsgs we may have in queue */ + } else if (StreamTcpReassembleProcessAppLayer(ra_ctx) < 0) { + printf("failed in processing stream smsgs\n"); + goto end; + } + + p.flowflags = FLOW_PKT_TOSERVER; + p.payload = httpbuf5; + p.payload_len = httplen5; + tcph.th_seq = htonl(13); + tcph.th_ack = htonl(145); + s = &ssn.client; + ssn.client.last_ack = 145; + + if (StreamTcpReassembleHandleSegment(ra_ctx, &ssn, s, &p) == -1) { + printf("failed in segments reassembly, while processing toserver packet\n"); + goto end; + } + + p.flowflags = FLOW_PKT_TOCLIENT; + p.payload = httpbuf2; + p.payload_len = httplen2; + tcph.th_seq = htonl(145); + tcph.th_ack = htonl(16); + s = &ssn.server; + ssn.server.last_ack = 16; + + if (StreamTcpReassembleHandleSegment(ra_ctx, &ssn, s, &p) == -1) { + printf("failed in segments reassembly, while processing toserver packet\n"); + goto end; + } + + /* Check if we have stream smsgs in queue */ + if (ra_ctx->stream_q->len == 0) { + printf("there should be a stream smsgs in the queue, as we have detected" + " the app layer protocol and one smsg from toserver side has " + "been sent\n"); + goto end; + /* Process stream smsgs we may have in queue */ + } else if (StreamTcpReassembleProcessAppLayer(ra_ctx) < 0) { + printf("failed in processing stream smsgs\n"); + goto end; + } + + if (ssn.alproto != ALPROTO_HTTP) { + printf("app layer proto has not been detected\n"); + goto end; + } + + ret = 1; +end: + StreamTcpFreeConfig(TRUE); + StreamTcpReassembleFreeThreadCtx(ra_ctx); + return ret; +} + +/** + * \test Test to make sure we don't send more than one smsg from toserver to + * app layer until the app layer protocol has not been detected. After + * protocol has been detected the processed segments should be returned + * to pool. + * + * \retval On success it returns 1 and on failure 0. + */ + +static int StreamTcpReassembleTest41 (void) { + int ret = 0; + Packet p; + Flow f; + TCPHdr tcph; + Port sp; + Port dp; + Address src; + Address dst; + struct in_addr in; + TcpSession ssn; + + memset(&p, 0, sizeof (Packet)); + memset(&f, 0, sizeof (Flow)); + memset(&tcph, 0, sizeof (TCPHdr)); + memset(&src, 0, sizeof(Address)); + memset(&dst, 0, sizeof(Address)); + memset(&ssn, 0, sizeof(TcpSession)); + + StreamTcpInitConfig(TRUE); + TcpReassemblyThreadCtx *ra_ctx = StreamTcpReassembleInitThreadCtx(); + AppLayerDetectProtoThreadInit(); + + uint8_t httpbuf1[] = "GET / HTTP/1.0\r\nUser-Agent: Victor/1.0" + "W2dyb3VwMV0NCnBob25lMT1wMDB3ODgyMTMxMzAyMTINCmxvZ2lu" + "MT0NCnBhc3N3b3JkMT0NCnBob25lMj1wMDB3ODgyMTMxMzAyMTIN" + "CmxvZ2luMj0NCnBhc3N3b3JkMj0NCnBob25lMz0NCmxvZ2luMz0N" + "CnBhc3N3b3JkMz0NCnBob25lND0NCmxvZ2luND0NCnBhc3N3b3Jk" + "ND0NCnBob25lNT0NCmxvZ2luNT0NCnBhc3N3b3JkNT0NCnBob25l" + "Nj0NCmxvZ2luNj0NCnBhc3N3b3JkNj0NCmNhbGxfdGltZTE9MzIN" + "CmNhbGxfdGltZTI9MjMyDQpkYXlfbGltaXQ9NQ0KbW9udGhfbGlt" + "aXQ9MTUNCltncm91cDJdDQpwaG9uZTE9DQpsb2dpbjE9DQpwYXNz" + "d29yZDE9DQpwaG9uZTI9DQpsb2dpbjI9DQpwYXNzd29yZDI9DQpw" + "aG9uZT"; + uint32_t httplen1 = sizeof(httpbuf1) - 1; /* minus the \0 */ + + uint8_t httpbuf3[] = "psb2dpbjM9DQpwYXNzd29yZDM9DQpwaG9uZTQ9DQps" + "b2dpbjQ9DQpwYXNzd29yZDQ9DQpwaG9uZTU9DQpsb2dpbjU9DQpw" + "YXNzd29yZDU9DQpwaG9uZTY9DQpsb2dpbjY9DQpwYXNzd29yZDY9" + "DQpjYWxsX3RpbWUxPQ0KY2FsbF90aW1lMj0NCmRheV9saW1pdD0N" + "\r\n\r\n"; + uint32_t httplen3 = sizeof(httpbuf3) - 1; /* minus the \0 */ + + uint8_t httpbuf2[] = "HTTP/1.0 200 OK\r\nServer: VictorServer/1.0\r\n\r\n"; + uint32_t httplen2 = sizeof(httpbuf2) - 1; /* minus the \0 */ + + ssn.server.ra_base_seq = 9; + ssn.server.isn = 9; + ssn.server.last_ack = 600; + ssn.client.ra_base_seq = 9; + ssn.client.isn = 9; + ssn.client.last_ack = 600; + ssn.alproto = ALPROTO_UNKNOWN; + + inet_pton(AF_INET, "1.2.3.4", &in); + src.family = AF_INET; + src.addr_data32[0] = in.s_addr; + inet_pton(AF_INET, "1.2.3.5", &in); + dst.family = AF_INET; + dst.addr_data32[0] = in.s_addr; + sp = 200; + dp = 220; + + f.src = src; + f.dst = dst; + f.sp = sp; + f.dp = dp; + f.protoctx = &ssn; + p.flow = &f; + + tcph.th_win = htons(5480); + tcph.th_seq = htonl(10); + tcph.th_ack = htonl(20); + tcph.th_flags = TH_ACK|TH_PUSH; + p.tcph = &tcph; + p.flowflags = FLOW_PKT_TOCLIENT; + + p.payload = httpbuf2; + p.payload_len = httplen2; + ssn.state = TCP_ESTABLISHED; + + TcpStream *s = NULL; + s = &ssn.server; + + if (StreamTcpReassembleHandleSegment(ra_ctx, &ssn, s, &p) == -1) { + printf("failed in segments reassembly, while processing toserver packet\n"); + goto end; + } + + /* Check if we have stream smsgs in queue */ + if (ra_ctx->stream_q->len > 0) { + printf("there shouldn't be any stream smsgs in the queue\n"); + goto end; + } + + p.flowflags = FLOW_PKT_TOSERVER; + p.payload = httpbuf1; + p.payload_len = httplen1; + tcph.th_seq = htonl(10); + tcph.th_ack = htonl(55); + s = &ssn.client; + + if (StreamTcpReassembleHandleSegment(ra_ctx, &ssn, s, &p) == -1) { + printf("failed in segments reassembly, while processing toserver packet\n"); + goto end; + } + + p.flowflags = FLOW_PKT_TOSERVER; + p.payload = httpbuf3; + p.payload_len = httplen3; + tcph.th_seq = htonl(522); + tcph.th_ack = htonl(100); + s = &ssn.client; + + if (StreamTcpReassembleHandleSegment(ra_ctx, &ssn, s, &p) == -1) { + printf("failed in segments reassembly, while processing toserver packet\n"); + goto end; + } + + /* Check if we have stream smsgs in queue */ + if (ra_ctx->stream_q->len > 0) { + printf("there shouldn't be any stream smsgs in the queue, as we didn't" + " processed any smsg from toserver side till yet\n"); + goto end; + } + + p.flowflags = FLOW_PKT_TOCLIENT; + p.payload = httpbuf2; + p.payload_len = httplen2; + tcph.th_seq = htonl(55); + tcph.th_ack = htonl(522); + s = &ssn.server; + + if (StreamTcpReassembleHandleSegment(ra_ctx, &ssn, s, &p) == -1) { + printf("failed in segments reassembly, while processing toserver packet\n"); + goto end; + } + + /* Check if we have stream smsgs in queue */ + if (ra_ctx->stream_q->len == 0) { + printf("there should be a stream smsgs in the queue\n"); + goto end; + /* Process stream smsgs we may have in queue */ + } else if (ra_ctx->stream_q->len > 1) { + printf("there should be only one stream smsgs in the queue\n"); + goto end; + } else if (StreamTcpReassembleProcessAppLayer(ra_ctx) < 0) { + printf("failed in processing stream smsgs\n"); + goto end; + } + + p.flowflags = FLOW_PKT_TOCLIENT; + p.payload = httpbuf2; + p.payload_len = httplen2; + tcph.th_seq = htonl(100); + tcph.th_ack = htonl(522); + s = &ssn.server; + + if (StreamTcpReassembleHandleSegment(ra_ctx, &ssn, s, &p) == -1) { + printf("failed in segments reassembly, while processing toserver packet\n"); + goto end; + } + + if (ssn.client.seg_list != NULL) { + printf("seg_list should be null\n"); + goto end; + } + + ret = 1; +end: + StreamTcpFreeConfig(TRUE); + StreamTcpReassembleFreeThreadCtx(ra_ctx); + return ret; +} + +/** + * \test Test to make sure that Pause/Unpause API is working. + * + * \retval On success it returns 1 and on failure 0. + */ + +static int StreamTcpReassembleTest42 (void) { + int ret = 0; + Packet p; + Flow f; + TCPHdr tcph; + Port sp; + Port dp; + Address src; + Address dst; + struct in_addr in; + TcpSession ssn; + + memset(&p, 0, sizeof (Packet)); + memset(&f, 0, sizeof (Flow)); + memset(&tcph, 0, sizeof (TCPHdr)); + memset(&src, 0, sizeof(Address)); + memset(&dst, 0, sizeof(Address)); + memset(&ssn, 0, sizeof(TcpSession)); + + StreamTcpInitConfig(TRUE); + TcpReassemblyThreadCtx *ra_ctx = StreamTcpReassembleInitThreadCtx(); + AppLayerDetectProtoThreadInit(); + + uint8_t httpbuf1[] = "POST / HTTP/1.0\r\nUser-Agent: Victor/1.0\r\n\r\n"; + uint32_t httplen1 = sizeof(httpbuf1) - 1; /* minus the \0 */ + + uint8_t httpbuf2[] = "HTTP/1.0 200 OK\r\nServer: VictorServer/1.0\r\n\r\n"; + uint32_t httplen2 = sizeof(httpbuf2) - 1; /* minus the \0 */ + + ssn.server.ra_base_seq = 9; + ssn.server.isn = 9; + ssn.server.last_ack = 60; + ssn.client.ra_base_seq = 9; + ssn.client.isn = 9; + ssn.client.last_ack = 60; + ssn.alproto = ALPROTO_UNKNOWN; + + inet_pton(AF_INET, "1.2.3.4", &in); + src.family = AF_INET; + src.addr_data32[0] = in.s_addr; + inet_pton(AF_INET, "1.2.3.5", &in); + dst.family = AF_INET; + dst.addr_data32[0] = in.s_addr; + sp = 200; + dp = 220; + + f.src = src; + f.dst = dst; + f.sp = sp; + f.dp = dp; + f.protoctx = &ssn; + p.flow = &f; + + tcph.th_win = htons(5480); + tcph.th_seq = htonl(10); + tcph.th_ack = htonl(20); + tcph.th_flags = TH_ACK|TH_PUSH; + p.tcph = &tcph; + p.flowflags = FLOW_PKT_TOCLIENT; + + p.payload = httpbuf2; + p.payload_len = httplen2; + ssn.state = TCP_ESTABLISHED; + + TcpStream *s = NULL; + s = &ssn.server; + + if (StreamTcpReassembleHandleSegment(ra_ctx, &ssn, s, &p) == -1) { + printf("failed in segments reassembly, while processing toserver packet\n"); + goto end; + } + + /* Check if we have stream smsgs in queue */ + if (ra_ctx->stream_q->len > 0) { + printf("there shouldn't be any stream smsgs in the queue\n"); + goto end; + } + + p.flowflags = FLOW_PKT_TOSERVER; + p.payload = httpbuf1; + p.payload_len = httplen1; + tcph.th_seq = htonl(10); + tcph.th_ack = htonl(55); + s = &ssn.client; + + if (StreamTcpReassembleHandleSegment(ra_ctx, &ssn, s, &p) == -1) { + printf("failed in segments reassembly, while processing toserver packet\n"); + goto end; + } + + /* Check if we have stream smsgs in queue */ + if (ra_ctx->stream_q->len > 0) { + printf("there shouldn't be any stream smsgs in the queue, as we didn't" + " processed any smsg from toserver side till yet\n"); + goto end; + } + + /* pause the reassembling */ + StreamTcpReassemblePause(&ssn, 1); + + p.flowflags = FLOW_PKT_TOCLIENT; + p.payload = httpbuf2; + p.payload_len = httplen2; + tcph.th_seq = htonl(55); + tcph.th_ack = htonl(53); + s = &ssn.server; + + if (StreamTcpReassembleHandleSegment(ra_ctx, &ssn, s, &p) == -1) { + printf("failed in segments reassembly, while processing toserver packet\n"); + goto end; + } + + /* Check if we have stream smsgs in queue */ + if (ra_ctx->stream_q->len > 0) { + printf("there shouldn't be any stream smsgs in the queue, as we have" + " paused the reassembling\n"); + goto end; + } + + /* Unpause the reassembling */ + StreamTcpReassembleUnPause(&ssn, 1); + + p.flowflags = FLOW_PKT_TOCLIENT; + p.payload = httpbuf2; + p.payload_len = httplen2; + tcph.th_seq = htonl(100); + tcph.th_ack = htonl(53); + s = &ssn.server; + + if (StreamTcpReassembleHandleSegment(ra_ctx, &ssn, s, &p) == -1) { + printf("failed in segments reassembly, while processing toserver packet\n"); + goto end; + } + + /* Check if we have stream smsgs in queue */ + if (ra_ctx->stream_q->len == 0) { + printf("there should be a stream smsgs in the queue, as reassembling has" + " been unpaused now\n"); + goto end; + /* Process stream smsgs we may have in queue */ + } else if (StreamTcpReassembleProcessAppLayer(ra_ctx) < 0) { + printf("failed in processing stream smsgs\n"); + goto end; + } + + ret = 1; +end: + StreamTcpFreeConfig(TRUE); + StreamTcpReassembleFreeThreadCtx(ra_ctx); + return ret; +} + +/** + * \test Test to make sure that Pause/Unpause API is working. + * + * \retval On success it returns 1 and on failure 0. + */ + +static int StreamTcpReassembleTest43 (void) { + int ret = 0; + Packet p; + Flow f; + TCPHdr tcph; + Port sp; + Port dp; + Address src; + Address dst; + struct in_addr in; + TcpSession ssn; + + memset(&p, 0, sizeof (Packet)); + memset(&f, 0, sizeof (Flow)); + memset(&tcph, 0, sizeof (TCPHdr)); + memset(&src, 0, sizeof(Address)); + memset(&dst, 0, sizeof(Address)); + memset(&ssn, 0, sizeof(TcpSession)); + + StreamTcpInitConfig(TRUE); + TcpReassemblyThreadCtx *ra_ctx = StreamTcpReassembleInitThreadCtx(); + AppLayerDetectProtoThreadInit(); + + uint8_t httpbuf1[] = "/ HTTP/1.0\r\nUser-Agent: Victor/1.0"; + + uint32_t httplen1 = sizeof(httpbuf1) - 1; /* minus the \0 */ + + uint8_t httpbuf2[] = "HTTP/1.0 200 OK\r\nServer: VictorServer/1.0\r\n\r\n"; + uint32_t httplen2 = sizeof(httpbuf2) - 1; /* minus the \0 */ + + uint8_t httpbuf3[] = "W2dyb3VwMV0NCnBob25lMT1wMDB3ODgyMTMxMzAyMTINCmxvZ2lu" + "MT0NCnBhc3N3b3JkMT0NCnBob25lMj1wMDB3ODgyMTMxMzAyMTIN" + "CmxvZ2luMj0NCnBhc3N3b3JkMj0NCnBob25lMz0NCmxvZ2luMz0N" + "CnBhc3N3b3JkMz0NCnBob25lND0NCmxvZ2luND0NCnBhc3N3b3Jk" + "ND0NCnBob25lNT0NCmxvZ2luNT0NCnBhc3N3b3JkNT0NCnBob25l" + "Nj0NCmxvZ2luNj0NCnBhc3N3b3JkNj0NCmNhbGxfdGltZTE9MzIN" + "CmNhbGxfdGltZTI9MjMyDQpkYXlfbGltaXQ9NQ0KbW9udGhfbGlt" + "aXQ9MTUNCltncm91cDJdDQpwaG9uZTE9DQpsb2dpbjE9DQpwYXNz" + "d29yZDE9DQpwaG9uZTI9DQpsb2dpbjI9DQpwYXNzd29yZDI9DQpw" + "aG9uZT\r\n\r\n"; + uint32_t httplen3 = sizeof(httpbuf3) - 1; /* minus the \0 */ + + ssn.server.ra_base_seq = 9; + ssn.server.isn = 9; + ssn.server.last_ack = 600; + ssn.client.ra_base_seq = 9; + ssn.client.isn = 9; + ssn.client.last_ack = 600; + ssn.alproto = ALPROTO_UNKNOWN; + + /* Check the minimum init smsg length. It should be equal to the min length + of given signature in toserver direction. */ + if (StreamMsgQueueGetMinInitChunkLen(FLOW_PKT_TOSERVER) != 2) { + printf("the min init length should be equal to 2, not %"PRIu16"\n", + StreamMsgQueueGetMinInitChunkLen(FLOW_PKT_TOSERVER)); + goto end; + } + + /* Check the minimum init smsg length. It should be equal to the min length + of given signature in toclient direction. */ + if (StreamMsgQueueGetMinInitChunkLen(FLOW_PKT_TOCLIENT) != 2) { + printf("the min init length should be equal to 2, not %"PRIu16"\n", + StreamMsgQueueGetMinInitChunkLen(FLOW_PKT_TOCLIENT)); + goto end; + } + + inet_pton(AF_INET, "1.2.3.4", &in); + src.family = AF_INET; + src.addr_data32[0] = in.s_addr; + inet_pton(AF_INET, "1.2.3.5", &in); + dst.family = AF_INET; + dst.addr_data32[0] = in.s_addr; + sp = 200; + dp = 220; + + f.src = src; + f.dst = dst; + f.sp = sp; + f.dp = dp; + f.protoctx = &ssn; + p.flow = &f; + + tcph.th_win = htons(5480); + tcph.th_seq = htonl(10); + tcph.th_ack = htonl(10); + tcph.th_flags = TH_ACK|TH_PUSH; + p.tcph = &tcph; + p.flowflags = FLOW_PKT_TOCLIENT; + + p.payload = httpbuf2; + p.payload_len = httplen2; + ssn.state = TCP_ESTABLISHED; + + TcpStream *s = NULL; + s = &ssn.server; + + if (StreamTcpReassembleHandleSegment(ra_ctx, &ssn, s, &p) == -1) { + printf("failed in segments reassembly, while processing toserver packet\n"); + goto end; + } + + /* Check if we have stream smsgs in queue */ + if (ra_ctx->stream_q->len > 0) { + printf("there shouldn't be any stream smsgs in the queue\n"); + goto end; + } + + p.flowflags = FLOW_PKT_TOSERVER; + p.payload = httpbuf1; + p.payload_len = httplen1; + tcph.th_seq = htonl(10); + tcph.th_ack = htonl(55); + s = &ssn.client; + + if (StreamTcpReassembleHandleSegment(ra_ctx, &ssn, s, &p) == -1) { + printf("failed in segments reassembly, while processing toserver packet\n"); + goto end; + } + + /* Check if we have stream smsgs in queue */ + if (ra_ctx->stream_q->len > 0) { + printf("there shouldn't be any stream smsgs in the queue, as we didn't" + " processed any smsg from toserver side till yet\n"); + goto end; + } + + p.flowflags = FLOW_PKT_TOCLIENT; + p.payload = httpbuf2; + p.payload_len = httplen2; + tcph.th_seq = htonl(55); + tcph.th_ack = htonl(44); + s = &ssn.server; + + if (StreamTcpReassembleHandleSegment(ra_ctx, &ssn, s, &p) == -1) { + printf("failed in segments reassembly, while processing toserver packet\n"); + goto end; + } + + /* Check if we have stream smsgs in queue */ + if (ra_ctx->stream_q->len == 0) { + printf("there should be a stream smsgs in the queue\n"); + goto end; + /* Process stream smsgs we may have in queue */ + } else if (StreamTcpReassembleProcessAppLayer(ra_ctx) < 0) { + printf("failed in processing stream smsgs\n"); + goto end; + } + + if (ssn.flags & STREAMTCP_FLAG_APPPROTO_DETECTION_COMPLETED) { + printf("app layer detected flag is set, it shouldn't be\n"); + goto end; + } + + /* This packets induces a packet gap and also shows why we need to + process the current segment completely, even if it results in sending more + than one smsg to the app layer. If we don't send more than one smsg in + this case, then the first segment of lentgh 34 bytes will be sent to + app layer and protocol can not be detected in that message and moreover + the segment lentgh is less than the max. signature size for protocol + detection, so this will keep looping !! */ + p.flowflags = FLOW_PKT_TOSERVER; + p.payload = httpbuf3; + p.payload_len = httplen3; + tcph.th_seq = htonl(54); + tcph.th_ack = htonl(100); + s = &ssn.client; + + if (StreamTcpReassembleHandleSegment(ra_ctx, &ssn, s, &p) == -1) { + printf("failed in segments reassembly, while processing toserver packet\n"); + goto end; + } + + /* Check if we have stream smsgs in queue */ + if (ra_ctx->stream_q->len > 0) { + printf("there shouldn't be any stream smsgs in the queue, as we didn't" + " detected the app layer protocol till yet\n"); + goto end; + } + + p.flowflags = FLOW_PKT_TOCLIENT; + p.payload = httpbuf2; + p.payload_len = httplen2; + tcph.th_seq = htonl(100); + tcph.th_ack = htonl(53); + s = &ssn.server; + + if (StreamTcpReassembleHandleSegment(ra_ctx, &ssn, s, &p) == -1) { + printf("failed in segments reassembly, while processing toserver packet\n"); + goto end; + } + + /* Check if we have stream smsgs in queue */ + if (ra_ctx->stream_q->len == 0) { + printf("there should be a stream smsgs in the queue, as reassembling has" + " been unpaused now\n"); + goto end; + /* Process stream smsgs we may have in queue */ + } else if (StreamTcpReassembleProcessAppLayer(ra_ctx) < 0) { + printf("failed in processing stream smsgs\n"); + goto end; + } + + /* the flag should be set, as the smsg scanned size has crossed the max. + signature size for app proto detection */ + if (! (ssn.flags & STREAMTCP_FLAG_APPPROTO_DETECTION_COMPLETED)) { + printf("app layer detected flag is not set, it should be\n"); + goto end; + } + + ret = 1; +end: + StreamTcpFreeConfig(TRUE); + StreamTcpReassembleFreeThreadCtx(ra_ctx); + return ret; +} + #endif /* UNITTESTS */ /** \brief The Function Register the Unit tests to test the reassembly engine @@ -4046,5 +5428,11 @@ void StreamTcpReassembleRegisterTests(void) { UtRegisterTest("StreamTcpReassembleTest35 -- Bug56 test", StreamTcpReassembleTest35, 1); UtRegisterTest("StreamTcpReassembleTest36 -- Bug57 test", StreamTcpReassembleTest36, 1); UtRegisterTest("StreamTcpReassembleTest37 -- Bug76 test", StreamTcpReassembleTest37, 1); + UtRegisterTest("StreamTcpReassembleTest38 -- app proto test", StreamTcpReassembleTest38, 1); + UtRegisterTest("StreamTcpReassembleTest39 -- app proto test", StreamTcpReassembleTest39, 1); + UtRegisterTest("StreamTcpReassembleTest40 -- app proto test", StreamTcpReassembleTest40, 1); + UtRegisterTest("StreamTcpReassembleTest41 -- app proto test", StreamTcpReassembleTest41, 1); + UtRegisterTest("StreamTcpReassembleTest42 -- pause/unpause reassembly test", StreamTcpReassembleTest42, 1); + UtRegisterTest("StreamTcpReassembleTest43 -- min smsg size test", StreamTcpReassembleTest43, 1); #endif /* UNITTESTS */ } diff --git a/src/stream-tcp-reassemble.h b/src/stream-tcp-reassemble.h index 576b36209c..9e519ccff9 100644 --- a/src/stream-tcp-reassemble.h +++ b/src/stream-tcp-reassemble.h @@ -54,6 +54,8 @@ void StreamL7DataPtrFree(TcpSession *); void StreamTcpSetSessionNoReassemblyFlag (TcpSession *, char ); void StreamTcpSetOSPolicy(TcpStream *, Packet *); +void StreamTcpReassemblePause (TcpSession *, char ); +void StreamTcpReassembleUnPause (TcpSession *, char ); #endif /* __STREAM_TCP_REASSEMBLE_H__ */ diff --git a/src/stream-tcp.c b/src/stream-tcp.c index 1bf4983b64..ca76b50a8e 100644 --- a/src/stream-tcp.c +++ b/src/stream-tcp.c @@ -2964,6 +2964,10 @@ static int StreamTcpTest02 (void) { StreamTcpThread stt; uint8_t payload[4]; TCPHdr tcph; + TcpReassemblyThreadCtx ra_ctx; + StreamMsgQueue stream_q; + memset(&stream_q, 0, sizeof(StreamMsgQueue)); + memset(&ra_ctx, 0, sizeof(TcpReassemblyThreadCtx)); memset (&p, 0, sizeof(Packet)); memset (&f, 0, sizeof(Flow)); memset(&tv, 0, sizeof (ThreadVars)); @@ -2975,6 +2979,8 @@ static int StreamTcpTest02 (void) { p.tcph = &tcph; p.flowflags = FLOW_PKT_TOSERVER; int ret = 0; + ra_ctx.stream_q = &stream_q; + stt.ra_ctx = &ra_ctx; StreamTcpInitConfig(TRUE); diff --git a/src/stream.c b/src/stream.c index fcf277a792..4f68f70fae 100644 --- a/src/stream.c +++ b/src/stream.c @@ -144,7 +144,7 @@ void StreamMsgQueuesInit(void) { stream_msg_pool = PoolInit(5000,250,StreamMsgAlloc,NULL,StreamMsgFree); if (stream_msg_pool == NULL) - exit(1); /* XXX */ + exit(EXIT_FAILURE); /* XXX */ } void StreamMsgQueuesDeinit(char quiet) { diff --git a/src/stream.h b/src/stream.h index 78271c805a..ceea6a4f63 100644 --- a/src/stream.h +++ b/src/stream.h @@ -13,6 +13,8 @@ #define MSG_DATA_SIZE 512 +#define STREAMQUEUE_FLAG_INIT 0x01 + typedef struct StreamMsg_ { uint32_t id; /* unique stream id */ uint8_t flags; /* msg flags */ @@ -42,6 +44,7 @@ typedef struct StreamMsgQueue_ { uint16_t len; SCMutex mutex_q; SCCondT cond_q; + uint8_t flags; #ifdef DBG_PERF uint16_t dbg_maxlen; #endif /* DBG_PERF */