From 261881fce23ab5a4f19a60ad1e15339793f755bb Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Tue, 17 Dec 2013 16:57:48 +0100 Subject: [PATCH] stream: remove per thread queue for stream msgs StreamMsgs would be stored in a per thread queue before being attached to the tcp ssn. This is unnecessary, so this patch removes this queue and puts the smsgs into the ssn directly. Large patch as it affects a lot of tests. --- src/app-layer.c | 73 +------ src/app-layer.h | 12 -- src/flow-timeout.c | 10 - src/stream-tcp-reassemble.c | 405 ++++++++++++++---------------------- src/stream-tcp-reassemble.h | 2 - src/stream-tcp.c | 14 -- 6 files changed, 159 insertions(+), 357 deletions(-) diff --git a/src/app-layer.c b/src/app-layer.c index e88f966581..6f2b85cd9a 100644 --- a/src/app-layer.c +++ b/src/app-layer.c @@ -352,80 +352,17 @@ int AppLayerHandleTCPData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx, } /** - * \brief Attach a stream message to the TCP session for inspection - * in the detection engine. + * \brief Handle a app layer UDP message + * + * If the protocol is yet unknown, the proto detection code is run first. * * \param dp_ctx Thread app layer detect context - * \param smsg Stream message + * \param f unlocked flow + * \param p UDP packet * * \retval 0 ok * \retval -1 error */ -int AppLayerHandleTCPMsg(StreamMsg *smsg, TcpSession *ssn) -{ - SCEnter(); - - StreamMsg *cur; - -#ifdef PRINT - printf("=> Stream Data (raw reassembly) -- start %s%s\n", - smsg->flags & STREAM_TOCLIENT ? "toclient" : "", - smsg->flags & STREAM_TOSERVER ? "toserver" : ""); - PrintRawDataFp(stdout, smsg->data, smsg->data_len); - printf("=> Stream Data -- end\n"); -#endif - SCLogDebug("smsg %p", smsg); - - if (ssn != NULL) { - SCLogDebug("storing smsg %p in the tcp session", smsg); - - /* store the smsg in the tcp stream */ - if (smsg->flags & STREAM_TOSERVER) { - SCLogDebug("storing smsg in the to_server"); - - /* put the smsg in the stream list */ - if (ssn->toserver_smsg_head == NULL) { - ssn->toserver_smsg_head = smsg; - ssn->toserver_smsg_tail = smsg; - smsg->next = NULL; - smsg->prev = NULL; - } else { - cur = ssn->toserver_smsg_tail; - cur->next = smsg; - smsg->prev = cur; - smsg->next = NULL; - ssn->toserver_smsg_tail = smsg; - } - } else { - SCLogDebug("storing smsg in the to_client"); - - /* put the smsg in the stream list */ - if (ssn->toclient_smsg_head == NULL) { - ssn->toclient_smsg_head = smsg; - ssn->toclient_smsg_tail = smsg; - smsg->next = NULL; - smsg->prev = NULL; - } else { - cur = ssn->toclient_smsg_tail; - cur->next = smsg; - smsg->prev = cur; - smsg->next = NULL; - ssn->toclient_smsg_tail = smsg; - } - } - - } else { /* no ssn ptr */ - /* if there is no ssn ptr we won't - * be inspecting this msg in detect - * so return it to the pool. */ - - /* return the used message to the queue */ - StreamMsgReturnToPool(smsg); - } - - SCReturnInt(0); -} - int AppLayerHandleUdp(AppLayerThreadCtx *tctx, Packet *p, Flow *f) { SCEnter(); diff --git a/src/app-layer.h b/src/app-layer.h index a5e49d94c1..eae16bfa74 100644 --- a/src/app-layer.h +++ b/src/app-layer.h @@ -45,18 +45,6 @@ int AppLayerHandleTCPData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx, uint8_t *data, uint32_t data_len, uint8_t flags); -/** - * \brief Attach a stream message to the TCP session for inspection - * in the detection engine. - * - * \param app_layer_tctx Pointer to the app layer thread context. - * \param smsg Stream message. - * - * \retval 0 On success. - * \retval -1 On failure. - */ -int AppLayerHandleTCPMsg(StreamMsg *smsg, TcpSession *ssn); - /** * \brief Handles an udp chunk. */ diff --git a/src/flow-timeout.c b/src/flow-timeout.c index 56c3832cdb..78f173b452 100644 --- a/src/flow-timeout.c +++ b/src/flow-timeout.c @@ -563,11 +563,6 @@ static inline void FlowForceReassemblyForHash(void) stt->ra_ctx, ssn, &ssn->server, reassemble_p, NULL); FlowDeReference(&reassemble_p->flow); - if (StreamTcpReassembleProcessAppLayer(stt->ra_ctx, ssn) < 0) { - SCLogDebug("shutdown flow timeout " - "StreamTcpReassembleProcessAppLayer() erroring " - "over something"); - } } /* oh oh! We have some unattended toclient segments */ if (server_ok == STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_REASSEMBLY) { @@ -581,11 +576,6 @@ static inline void FlowForceReassemblyForHash(void) stt->ra_ctx, ssn, &ssn->client, reassemble_p, NULL); FlowDeReference(&reassemble_p->flow); - if (StreamTcpReassembleProcessAppLayer(stt->ra_ctx, ssn) < 0) { - SCLogDebug("shutdown flow timeout " - "StreamTcpReassembleProcessAppLayer() erroring " - "over something"); - } } FLOWLOCK_UNLOCK(f); diff --git a/src/stream-tcp-reassemble.c b/src/stream-tcp-reassemble.c index 28e0b31fb5..d5a9dbcc2e 100644 --- a/src/stream-tcp-reassemble.c +++ b/src/stream-tcp-reassemble.c @@ -363,7 +363,6 @@ TcpReassemblyThreadCtx *StreamTcpReassembleInitThreadCtx(void) return NULL; memset(ra_ctx, 0x00, sizeof(TcpReassemblyThreadCtx)); - ra_ctx->stream_q = StreamMsgQueueGetNew(); ra_ctx->app_tctx = AppLayerGetCtxThread(); @@ -373,16 +372,6 @@ TcpReassemblyThreadCtx *StreamTcpReassembleInitThreadCtx(void) void StreamTcpReassembleFreeThreadCtx(TcpReassemblyThreadCtx *ra_ctx) { SCEnter(); - if (ra_ctx->stream_q != NULL) { - StreamMsg *smsg; - while ((smsg = StreamMsgGetFromQueue(ra_ctx->stream_q)) != NULL) { - StreamMsgReturnToPool(smsg); - } - - StreamMsgQueueFree(ra_ctx->stream_q); - } - - ra_ctx->stream_q = NULL; AppLayerDestroyCtxThread(ra_ctx->app_tctx); SCFree(ra_ctx); SCReturn; @@ -1612,6 +1601,55 @@ static uint32_t StreamTcpReassembleCheckDepth(TcpStream *stream, SCReturnUInt(0); } +static void StreamTcpStoreStreamChunk(TcpSession *ssn, StreamMsg *smsg, const Packet *p, int streaminline) { + uint8_t direction = 0; + + if ((!streaminline && (p->flowflags & FLOW_PKT_TOSERVER)) || + ( streaminline && (p->flowflags & FLOW_PKT_TOCLIENT))) + { + direction = STREAM_TOCLIENT; + SCLogDebug("stream chunk is to_client"); + } else { + direction = STREAM_TOSERVER; + SCLogDebug("stream chunk is to_server"); + } + + /* store the smsg in the tcp stream */ + if (direction == STREAM_TOSERVER) { + SCLogDebug("storing smsg in the to_server"); + + /* put the smsg in the stream list */ + if (ssn->toserver_smsg_head == NULL) { + ssn->toserver_smsg_head = smsg; + ssn->toserver_smsg_tail = smsg; + smsg->next = NULL; + smsg->prev = NULL; + } else { + StreamMsg *cur = ssn->toserver_smsg_tail; + cur->next = smsg; + smsg->prev = cur; + smsg->next = NULL; + ssn->toserver_smsg_tail = smsg; + } + } else { + SCLogDebug("storing smsg in the to_client"); + + /* put the smsg in the stream list */ + if (ssn->toclient_smsg_head == NULL) { + ssn->toclient_smsg_head = smsg; + ssn->toclient_smsg_tail = smsg; + smsg->next = NULL; + smsg->prev = NULL; + } else { + StreamMsg *cur = ssn->toclient_smsg_tail; + cur->next = smsg; + smsg->prev = cur; + smsg->next = NULL; + ssn->toclient_smsg_tail = smsg; + } + } +} + /** * \brief Insert a packets TCP data into the stream reassembly engine. * @@ -1739,17 +1777,6 @@ static void StreamTcpSetupMsg(TcpSession *ssn, TcpStream *stream, Packet *p, SCLogDebug("setting STREAM_EOF"); smsg->flags |= STREAM_EOF; } - - if ((!StreamTcpInlineMode() && (p->flowflags & FLOW_PKT_TOSERVER)) || - ( StreamTcpInlineMode() && (p->flowflags & FLOW_PKT_TOCLIENT))) - { - smsg->flags |= STREAM_TOCLIENT; - SCLogDebug("stream mesage is to_client"); - } else { - smsg->flags |= STREAM_TOSERVER; - SCLogDebug("stream mesage is to_server"); - } - smsg->data_len = 0; SCLogDebug("smsg %p", smsg); @@ -2292,7 +2319,7 @@ static int StreamTcpReassembleInlineRaw (TcpReassemblyThreadCtx *ra_ctx, if (SEQ_GT(seg->seq, next_seq)) { /* pass on pre existing smsg (if any) */ if (smsg != NULL && smsg->data_len > 0) { - StreamMsgPutInQueue(ra_ctx->stream_q, smsg); + StreamTcpStoreStreamChunk(ssn, smsg, p, 1); stream->ra_raw_base_seq = ra_base_seq; smsg = NULL; } @@ -2376,7 +2403,7 @@ static int StreamTcpReassembleInlineRaw (TcpReassemblyThreadCtx *ra_ctx, /* queue the smsg if it's full */ if (smsg->data_len == sizeof (smsg->data)) { - StreamMsgPutInQueue(ra_ctx->stream_q, smsg); + StreamTcpStoreStreamChunk(ssn, smsg, p, 1); stream->ra_raw_base_seq = ra_base_seq; smsg = NULL; } @@ -2437,7 +2464,7 @@ static int StreamTcpReassembleInlineRaw (TcpReassemblyThreadCtx *ra_ctx, "smsg_offset %" PRIu32 ", copy_size %" PRIu32 "", payload_offset, smsg_offset, copy_size); if (smsg->data_len == sizeof (smsg->data)) { - StreamMsgPutInQueue(ra_ctx->stream_q, smsg); + StreamTcpStoreStreamChunk(ssn, smsg, p, 1); stream->ra_raw_base_seq = ra_base_seq; smsg = NULL; } @@ -2476,7 +2503,7 @@ static int StreamTcpReassembleInlineRaw (TcpReassemblyThreadCtx *ra_ctx, /* put the partly filled smsg in the queue */ if (smsg != NULL) { - StreamMsgPutInQueue(ra_ctx->stream_q, smsg); + StreamTcpStoreStreamChunk(ssn, smsg, p, 1); smsg = NULL; stream->ra_raw_base_seq = ra_base_seq; } @@ -3051,7 +3078,7 @@ static int StreamTcpReassembleRaw (TcpReassemblyThreadCtx *ra_ctx, SCReturnInt(-1); } StreamTcpSetupMsg(ssn, stream, p, smsg); - StreamMsgPutInQueue(ra_ctx->stream_q,smsg); + StreamTcpStoreStreamChunk(ssn, smsg, p, 0); } else { SCLogDebug("no segments in the list to reassemble"); @@ -3150,7 +3177,7 @@ static int StreamTcpReassembleRaw (TcpReassemblyThreadCtx *ra_ctx, or not. If not then sent the message and set flag that first message has been sent. No more data till proto has not been detected */ - StreamMsgPutInQueue(ra_ctx->stream_q, smsg); + StreamTcpStoreStreamChunk(ssn, smsg, p, 0); stream->ra_raw_base_seq = ra_base_seq; smsg = NULL; } @@ -3258,7 +3285,7 @@ static int StreamTcpReassembleRaw (TcpReassemblyThreadCtx *ra_ctx, /* queue the smsg if it's full */ if (smsg->data_len == sizeof (smsg->data)) { - StreamMsgPutInQueue(ra_ctx->stream_q, smsg); + StreamTcpStoreStreamChunk(ssn, smsg, p, 0); stream->ra_raw_base_seq = ra_base_seq; smsg = NULL; } @@ -3318,7 +3345,7 @@ static int StreamTcpReassembleRaw (TcpReassemblyThreadCtx *ra_ctx, "smsg_offset %" PRIu32 ", copy_size %" PRIu32 "", payload_offset, smsg_offset, copy_size); if (smsg->data_len == sizeof (smsg->data)) { - StreamMsgPutInQueue(ra_ctx->stream_q, smsg); + StreamTcpStoreStreamChunk(ssn, smsg, p, 0); stream->ra_raw_base_seq = ra_base_seq; smsg = NULL; } @@ -3348,7 +3375,7 @@ static int StreamTcpReassembleRaw (TcpReassemblyThreadCtx *ra_ctx, /* put the partly filled smsg in the queue to the l7 handler */ if (smsg != NULL) { - StreamMsgPutInQueue(ra_ctx->stream_q, smsg); + StreamTcpStoreStreamChunk(ssn, smsg, p, 0); smsg = NULL; stream->ra_raw_base_seq = ra_base_seq; } @@ -3379,50 +3406,6 @@ int StreamTcpReassembleHandleSegmentUpdateACK (ThreadVars *tv, SCReturnInt(r); } -/** \brief Handle the queue'd smsgs containing reassembled app layer data when - * we're running the app layer handling as part of the stream threads. - * - * \param ra_ctx Reassembly thread ctx, contains the queue with stream msgs - * - * \todo Currently we process all msgs even if we encounter an error in one - * of them. We do this to make sure the thread ctx's queue is emptied. - * Maybe we should just clear & return the msgs in case of error. - * - * \retval 0 ok - * \retval -1 error - */ -int StreamTcpReassembleProcessAppLayer(TcpReassemblyThreadCtx *ra_ctx, TcpSession *ssn) -{ - SCEnter(); - - int r = 0; - if (ra_ctx != NULL && ra_ctx->stream_q && ra_ctx->stream_q->len > 0) { - StreamMsg *smsg = NULL; - do { - smsg = StreamMsgGetFromQueue(ra_ctx->stream_q); - if (smsg != NULL) { - SCLogDebug("smsg %p, next %p, prev %p, q->len %u, " - "smsg->datalen %u, direction %s%s", - smsg, smsg->next, smsg->prev, - ra_ctx->stream_q->len, smsg->data_len, - smsg->flags & STREAM_TOSERVER ? "toserver":"", - smsg->flags & STREAM_TOCLIENT ? "toclient":""); - - //PrintRawDataFp(stderr, smsg->data, smsg->data_len); - - /* Handle the stream msg. No need to use locking, flow is - * already locked at this point. Don't break out of the - * loop if we encounter an error. */ - if (AppLayerHandleTCPMsg(smsg, ssn) != 0) - r = -1; - } - - } while (ra_ctx->stream_q->len > 0); - } - - SCReturnInt(r); -} - int StreamTcpReassembleHandleSegment(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx, TcpSession *ssn, TcpStream *stream, Packet *p, PacketQueue *pq) @@ -3694,6 +3677,18 @@ void StreamTcpReassembleTriggerRawReassembly(TcpSession *ssn) { #ifdef UNITTESTS /** unit tests and it's support functions below */ +static uint32_t UtSsnSmsgCnt(TcpSession *ssn, uint8_t direction) { + uint32_t cnt = 0; + StreamMsg *smsg = (direction == STREAM_TOSERVER) ? + ssn->toserver_smsg_head : + ssn->toclient_smsg_head; + while (smsg) { + cnt++; + smsg = smsg->next; + } + return cnt; +} + /** \brief The Function tests the reassembly engine working for different * OSes supported. It includes all the OS cases and send * crafted packets to test the reassembly. @@ -3987,7 +3982,7 @@ int StreamTcpCheckStreamContents(uint8_t *stream_policy, uint16_t sp_size, TcpSt * * \retval On success the function returns 1, on failure 0. */ -static int StreamTcpCheckQueue (uint8_t *stream_contents, StreamMsgQueue *q) { +static int StreamTcpCheckChunks (TcpSession *ssn, uint8_t *stream_contents) { SCEnter(); StreamMsg *msg; @@ -3995,17 +3990,17 @@ static int StreamTcpCheckQueue (uint8_t *stream_contents, StreamMsgQueue *q) { uint8_t j; uint8_t cnt = 0; - if (q == NULL) { - printf("q == NULL, "); + if (ssn == NULL) { + printf("ssn == NULL, "); SCReturnInt(0); } - if (q->len == 0) { - printf("q->len == 0, "); + if (ssn->toserver_smsg_head == NULL) { + printf("ssn->toserver_smsg_head == NULL, "); SCReturnInt(0); } - msg = StreamMsgGetFromQueue(q); + msg = ssn->toserver_smsg_head; while(msg != NULL) { cnt++; j = 0; @@ -4019,11 +4014,7 @@ static int StreamTcpCheckQueue (uint8_t *stream_contents, StreamMsgQueue *q) { SCReturnInt(0); } } - if (q->len > 0) { - msg = StreamMsgGetFromQueue(q); - } else { - SCReturnInt(1); - } + msg = msg->next; } SCReturnInt(1); } @@ -5245,7 +5236,6 @@ static int StreamTcpReassembleTest28 (void) { TcpSession ssn; memset(&ssn, 0, sizeof (TcpSession)); TcpReassemblyThreadCtx *ra_ctx = StreamTcpReassembleInitThreadCtx(); - StreamMsgQueue *q = ra_ctx->stream_q; StreamTcpInitConfig(TRUE); StreamMsgQueueSetMinChunkLen(FLOW_PKT_TOSERVER, 4096); @@ -5276,12 +5266,6 @@ static int StreamTcpReassembleTest28 (void) { goto end; } - /* Process stream smsgs we may have in queue */ - if (StreamTcpReassembleProcessAppLayer(ra_ctx, &ssn) < 0) { - printf("failed in processing stream smsgs (3): "); - goto end; - } - flowflags = FLOW_PKT_TOSERVER; StreamTcpCreateTestPacket(payload, 0x42, 3, 4); /*BBB*/ seq = 12; @@ -5300,7 +5284,7 @@ static int StreamTcpReassembleTest28 (void) { goto end; } - if (StreamTcpCheckQueue(check_contents, q) == 0) { + if (StreamTcpCheckChunks(&ssn, check_contents) == 0) { printf("failed in stream matching (6): "); goto end; } @@ -5330,7 +5314,6 @@ static int StreamTcpReassembleTest29 (void) { uint8_t flowflags; uint8_t check_contents[5] = {0x41, 0x41, 0x42, 0x42, 0x42}; TcpReassemblyThreadCtx *ra_ctx = StreamTcpReassembleInitThreadCtx(); - StreamMsgQueue *q = ra_ctx->stream_q; TcpSession ssn; memset(&ssn, 0, sizeof (TcpSession)); @@ -5360,12 +5343,6 @@ static int StreamTcpReassembleTest29 (void) { goto end; } - /* Process stream smsgs we may have in queue */ - if (StreamTcpReassembleProcessAppLayer(ra_ctx, &ssn) < 0) { - printf("failed in processing stream smsgs\n"); - goto end; - } - flowflags = FLOW_PKT_TOSERVER; StreamTcpCreateTestPacket(payload, 0x42, 3, 4); /*BBB*/ seq = 15; @@ -5384,7 +5361,7 @@ static int StreamTcpReassembleTest29 (void) { goto end; } - if (StreamTcpCheckQueue(check_contents, q) == 0) { + if (StreamTcpCheckChunks(&ssn, check_contents) == 0) { printf("failed in stream matching: "); goto end; } @@ -5417,7 +5394,6 @@ static int StreamTcpReassembleTest30 (void) { memset(&ssn, 0, sizeof (TcpSession)); TcpReassemblyThreadCtx *ra_ctx = StreamTcpReassembleInitThreadCtx(); - StreamMsgQueue *q = ra_ctx->stream_q; flowflags = FLOW_PKT_TOSERVER; th_flag = TH_ACK|TH_PUSH; @@ -5445,12 +5421,6 @@ static int StreamTcpReassembleTest30 (void) { goto end; } - /* Process stream smsgs we may have in queue */ - if (StreamTcpReassembleProcessAppLayer(ra_ctx, &ssn) < 0) { - printf("failed in processing stream smsgs\n"); - goto end; - } - flowflags = FLOW_PKT_TOSERVER; StreamTcpCreateTestPacket(payload, 0x42, 3, 4); /*BBB*/ seq = 12; @@ -5469,12 +5439,6 @@ static int StreamTcpReassembleTest30 (void) { goto end; } - /* Process stream smsgs we may have in queue */ - if (StreamTcpReassembleProcessAppLayer(ra_ctx, &ssn) < 0) { - printf("failed in processing stream smsgs\n"); - goto end; - } - th_flag = TH_FIN|TH_ACK; seq = 18; ack = 20; @@ -5494,7 +5458,7 @@ static int StreamTcpReassembleTest30 (void) { goto end; } - if (StreamTcpCheckQueue(check_contents, q) == 0) { + if (StreamTcpCheckChunks(&ssn, check_contents) == 0) { printf("failed in stream matching: "); goto end; } @@ -6141,7 +6105,7 @@ static int StreamTcpReassembleTest38 (void) { } /* Check if we have stream smsgs in queue */ - if (ra_ctx->stream_q->len > 0) { + if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) > 0) { printf("there shouldn't be any stream smsgs in the queue (2): "); goto end; } @@ -6159,7 +6123,7 @@ static int StreamTcpReassembleTest38 (void) { } /* Check if we have stream smsgs in queue */ - if (ra_ctx->stream_q->len != 1) { + if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 1) { printf("there should one stream smsg in the queue (6): "); goto end; } @@ -6229,7 +6193,8 @@ static int StreamTcpReassembleTest39 (void) { FLOW_IS_PM_DONE(&f, STREAM_TOCLIENT) || FLOW_IS_PP_DONE(&f, STREAM_TOCLIENT) || ssn->client.seg_list != NULL || ssn->server.seg_list != NULL || - stt->ra_ctx->stream_q->len != 0 || + ssn->toserver_smsg_head != NULL || + ssn->toclient_smsg_head != NULL || ssn->data_first_seen_dir != 0) { printf("failure 1\n"); goto end; @@ -6255,7 +6220,8 @@ static int StreamTcpReassembleTest39 (void) { FLOW_IS_PM_DONE(&f, STREAM_TOCLIENT) || FLOW_IS_PP_DONE(&f, STREAM_TOCLIENT) || ssn->client.seg_list != NULL || ssn->server.seg_list != NULL || - stt->ra_ctx->stream_q->len != 0 || + ssn->toserver_smsg_head != NULL || + ssn->toclient_smsg_head != NULL || ssn->data_first_seen_dir != 0) { printf("failure 2\n"); goto end; @@ -6282,7 +6248,8 @@ static int StreamTcpReassembleTest39 (void) { FLOW_IS_PM_DONE(&f, STREAM_TOCLIENT) || FLOW_IS_PP_DONE(&f, STREAM_TOCLIENT) || ssn->client.seg_list != NULL || ssn->server.seg_list != NULL || - stt->ra_ctx->stream_q->len != 0 || + ssn->toserver_smsg_head != NULL || + ssn->toclient_smsg_head != NULL || ssn->data_first_seen_dir != 0) { printf("failure 3\n"); goto end; @@ -6311,7 +6278,8 @@ static int StreamTcpReassembleTest39 (void) { ssn->client.seg_list == NULL || ssn->client.seg_list->next != NULL || ssn->server.seg_list != NULL || - stt->ra_ctx->stream_q->len != 0 || + ssn->toserver_smsg_head != NULL || + ssn->toclient_smsg_head != NULL || ssn->data_first_seen_dir != STREAM_TOSERVER) { printf("failure 4\n"); goto end; @@ -6340,7 +6308,8 @@ static int StreamTcpReassembleTest39 (void) { ssn->client.seg_list == NULL || ssn->client.seg_list->next != NULL || ssn->server.seg_list != NULL || - stt->ra_ctx->stream_q->len != 0 || + ssn->toserver_smsg_head != NULL || + ssn->toclient_smsg_head != NULL || ssn->data_first_seen_dir != STREAM_TOSERVER) { printf("failure 5\n"); goto end; @@ -6381,7 +6350,8 @@ static int StreamTcpReassembleTest39 (void) { ssn->client.seg_list->next == NULL || ssn->client.seg_list->next->next != NULL || ssn->server.seg_list != NULL || - stt->ra_ctx->stream_q->len != 0 || + ssn->toserver_smsg_head != NULL || + ssn->toclient_smsg_head != NULL || ssn->data_first_seen_dir != STREAM_TOSERVER) { printf("failure 6\n"); goto end; @@ -6843,7 +6813,7 @@ static int StreamTcpReassembleTest40 (void) { } /* Check if we have stream smsgs in queue */ - if (ra_ctx->stream_q->len > 0) { + if (UtSsnSmsgCnt(&ssn, STREAM_TOCLIENT) > 0) { printf("there shouldn't be any stream smsgs in the queue, as we didn't" " processed any smsg from toserver side till yet (2): "); goto end; @@ -6862,12 +6832,6 @@ static int StreamTcpReassembleTest40 (void) { goto end; } - /* Process stream smsgs we may have in queue */ - if (StreamTcpReassembleProcessAppLayer(ra_ctx, &ssn) < 0) { - printf("failed in processing stream smsgs (4): "); - goto end; - } - p->flowflags = FLOW_PKT_TOSERVER; p->payload = httpbuf3; p->payload_len = httplen3; @@ -6902,19 +6866,6 @@ static int StreamTcpReassembleTest40 (void) { goto end; } - /* Check if we have stream smsgs in queue */ -#if 0 - if (ra_ctx->stream_q->len == 0) { - printf("there should be a stream smsgs in the queue, as we have detected" - " the app layer protocol and one smsg from toserver side has " - "been sent (8): "); - goto end; - /* Process stream smsgs we may have in queue */ - } else if (StreamTcpReassembleProcessAppLayer(ra_ctx, &ssn) < 0) { - printf("failed in processing stream smsgs (9): "); - goto end; - } -#endif p->flowflags = FLOW_PKT_TOSERVER; p->payload = httpbuf4; p->payload_len = httplen4; @@ -6941,19 +6892,6 @@ static int StreamTcpReassembleTest40 (void) { goto end; } - /* Check if we have stream smsgs in queue */ -#if 0 - if (ra_ctx->stream_q->len == 0) { - printf("there should be a stream smsgs in the queue, as we have detected" - " the app layer protocol and one smsg from toserver side has " - "been sent (12): "); - goto end; - /* Process stream smsgs we may have in queue */ - } else if (StreamTcpReassembleProcessAppLayer(ra_ctx, &ssn) < 0) { - printf("failed in processing stream smsgs (13): "); - goto end; - } -#endif p->flowflags = FLOW_PKT_TOSERVER; p->payload = httpbuf5; p->payload_len = httplen5; @@ -6981,17 +6919,11 @@ static int StreamTcpReassembleTest40 (void) { } /* Check if we have stream smsgs in queue */ - if (ra_ctx->stream_q->len == 0) { + if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) == 0) { printf("there should be a stream smsgs in the queue, as we have detected" " the app layer protocol and one smsg from toserver side has " "been sent (16): "); goto end; - /* Process stream smsgs we may have in queue */ - } - - if (StreamTcpReassembleProcessAppLayer(ra_ctx, &ssn) < 0) { - printf("failed in processing stream smsgs (17): "); - goto end; } if (f->alproto != ALPROTO_HTTP) { @@ -7085,7 +7017,7 @@ static int StreamTcpReassembleTest43 (void) { } /* Check if we have stream smsgs in queue */ - if (ra_ctx->stream_q->len > 0) { + if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) > 0) { printf("there shouldn't be any stream smsgs in the queue (2): "); goto end; } @@ -7103,7 +7035,7 @@ static int StreamTcpReassembleTest43 (void) { } /* Check if we have stream smsgs in queue */ - if (ra_ctx->stream_q->len > 0) { + if (UtSsnSmsgCnt(&ssn, STREAM_TOCLIENT) > 0) { printf("there shouldn't be any stream smsgs in the queue, as we didn't" " processed any smsg from toserver side till yet (4): "); goto end; @@ -7120,17 +7052,6 @@ static int StreamTcpReassembleTest43 (void) { printf("failed in segments reassembly, while processing toserver packet (5): "); goto end; } -#if 0 - /* Check if we have stream smsgs in queue */ - if (ra_ctx->stream_q->len == 0) { - printf("there should be a stream smsgs in the queue (6): "); - goto end; - /* Process stream smsgs we may have in queue */ - } else if (StreamTcpReassembleProcessAppLayer(ra_ctx, &ssn) < 0) { - printf("failed in processing stream smsgs (7): "); - goto end; - } -#endif if (!StreamTcpIsSetStreamFlagAppProtoDetectionCompleted(&ssn.client)) { printf("app layer detected flag isn't set, it should be (8): "); goto end; @@ -7156,7 +7077,7 @@ static int StreamTcpReassembleTest43 (void) { } /* Check if we have stream smsgs in queue */ - if (ra_ctx->stream_q->len > 0) { + if (UtSsnSmsgCnt(&ssn, STREAM_TOCLIENT) > 0) { printf("there shouldn't be any stream smsgs in the queue, as we didn't" " detected the app layer protocol till yet (10): "); goto end; @@ -7173,18 +7094,6 @@ static int StreamTcpReassembleTest43 (void) { printf("failed in segments reassembly, while processing toserver packet (11): "); goto end; } -#if 0 - /* Check if we have stream smsgs in queue */ - if (ra_ctx->stream_q->len == 0) { - printf("there should be a stream smsgs in the queue, as reassembling has" - " been unpaused now (12): "); - goto end; - /* Process stream smsgs we may have in queue */ - } else if (StreamTcpReassembleProcessAppLayer(ra_ctx, &ssn) < 0) { - printf("failed in processing stream smsgs (13): "); - goto end; - } -#endif /* the flag should be set, as the smsg scanned size has crossed the max. signature size for app proto detection */ if (!StreamTcpIsSetStreamFlagAppProtoDetectionCompleted(&ssn.client)) { @@ -7566,12 +7475,6 @@ static int StreamTcpReassembleTest47 (void) { "packet\n"); goto end; } - - /* Process stream smsgs we may have in queue */ - if (StreamTcpReassembleProcessAppLayer(ra_ctx, &ssn) < 0) { - printf("failed in processing stream smsgs\n"); - goto end; - } } if (f->alproto != ALPROTO_HTTP) { @@ -7635,12 +7538,12 @@ static int StreamTcpReassembleInlineTest01(void) { goto end; } - if (ra_ctx->stream_q->len != 1) { - printf("expected a single stream message, got %u: ", ra_ctx->stream_q->len); + if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 1) { + printf("expected a single stream message: "); goto end; } - StreamMsg *smsg = ra_ctx->stream_q->top; + StreamMsg *smsg = ssn.toserver_smsg_head; if (smsg->data_len != 15) { printf("expected data length to be 15, got %u: ", smsg->data_len); goto end; @@ -7713,12 +7616,12 @@ static int StreamTcpReassembleInlineTest02(void) { goto end; } - if (ra_ctx->stream_q->len != 1) { - printf("expected a single stream message, got %u: ", ra_ctx->stream_q->len); + if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 1) { + printf("expected a single stream message: "); goto end; } - StreamMsg *smsg = ra_ctx->stream_q->top; + StreamMsg *smsg = ssn.toserver_smsg_head; if (smsg->data_len != 15) { printf("expected data length to be 15, got %u: ", smsg->data_len); goto end; @@ -7744,12 +7647,12 @@ static int StreamTcpReassembleInlineTest02(void) { goto end; } - if (ra_ctx->stream_q->len != 2) { - printf("expected a single stream message, got %u: ", ra_ctx->stream_q->len); + if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 2) { + printf("expected a single stream message: "); goto end; } - smsg = ra_ctx->stream_q->top; + smsg = ssn.toserver_smsg_head->next; if (smsg->data_len != 20) { printf("expected data length to be 20, got %u: ", smsg->data_len); goto end; @@ -7826,12 +7729,12 @@ static int StreamTcpReassembleInlineTest03(void) { goto end; } - if (ra_ctx->stream_q->len != 1) { - printf("expected a single stream message, got %u: ", ra_ctx->stream_q->len); + if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 1) { + printf("expected a single stream message 1: "); goto end; } - StreamMsg *smsg = ra_ctx->stream_q->top; + StreamMsg *smsg = ssn.toserver_smsg_head; if (smsg->data_len != 15) { printf("expected data length to be 15, got %u: ", smsg->data_len); goto end; @@ -7859,12 +7762,12 @@ static int StreamTcpReassembleInlineTest03(void) { goto end; } - if (ra_ctx->stream_q->len != 2) { - printf("expected a single stream message, got %u: ", ra_ctx->stream_q->len); + if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 2) { + printf("expected two stream messages: "); goto end; } - smsg = ra_ctx->stream_q->top; + smsg = ssn.toserver_smsg_head->next; if (smsg->data_len != 15) { printf("expected data length to be 15, got %u: ", smsg->data_len); goto end; @@ -7941,12 +7844,12 @@ static int StreamTcpReassembleInlineTest04(void) { goto end; } - if (ra_ctx->stream_q->len != 1) { - printf("expected a single stream message, got %u: ", ra_ctx->stream_q->len); + if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 1) { + printf("expected a single stream message: "); goto end; } - StreamMsg *smsg = ra_ctx->stream_q->top; + StreamMsg *smsg = ssn.toserver_smsg_head; if (smsg->data_len != 15) { printf("expected data length to be 15, got %u: ", smsg->data_len); goto end; @@ -7974,12 +7877,12 @@ static int StreamTcpReassembleInlineTest04(void) { goto end; } - if (ra_ctx->stream_q->len != 2) { - printf("expected a single stream message, got %u: ", ra_ctx->stream_q->len); + if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 2) { + printf("expected a single stream message: "); goto end; } - smsg = ra_ctx->stream_q->top; + smsg = ssn.toserver_smsg_head->next; if (smsg->data_len != 16) { printf("expected data length to be 16, got %u: ", smsg->data_len); goto end; @@ -8053,12 +7956,12 @@ static int StreamTcpReassembleInlineTest05(void) { goto end; } - if (ra_ctx->stream_q->len != 2) { - printf("expected a single stream message, got %u: ", ra_ctx->stream_q->len); + if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 2) { + printf("expected a single stream message: "); goto end; } - StreamMsg *smsg = ra_ctx->stream_q->top->next; + StreamMsg *smsg = ssn.toserver_smsg_head; if (smsg->data_len != 10) { printf("expected data length to be 10, got %u: ", smsg->data_len); goto end; @@ -8072,7 +7975,7 @@ static int StreamTcpReassembleInlineTest05(void) { goto end; } - smsg = ra_ctx->stream_q->top; + smsg = ssn.toserver_smsg_head->next; if (smsg->data_len != 5) { printf("expected data length to be 5, got %u: ", smsg->data_len); goto end; @@ -8147,12 +8050,12 @@ static int StreamTcpReassembleInlineTest06(void) { goto end; } - if (ra_ctx->stream_q->len != 2) { - printf("expected a single stream message, got %u: ", ra_ctx->stream_q->len); + if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 2) { + printf("expected two stream messages: "); goto end; } - StreamMsg *smsg = ra_ctx->stream_q->top->next; + StreamMsg *smsg = ssn.toserver_smsg_head; if (smsg->data_len != 10) { printf("expected data length to be 10, got %u: ", smsg->data_len); goto end; @@ -8166,7 +8069,7 @@ static int StreamTcpReassembleInlineTest06(void) { goto end; } - smsg = ra_ctx->stream_q->top; + smsg = ssn.toserver_smsg_head->next; if (smsg->data_len != 5) { printf("expected data length to be 5, got %u: ", smsg->data_len); goto end; @@ -8194,12 +8097,12 @@ static int StreamTcpReassembleInlineTest06(void) { goto end; } - if (ra_ctx->stream_q->len != 3) { - printf("expected a single stream message, got %u: ", ra_ctx->stream_q->len); + if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 3) { + printf("expected a single stream message, got %u: ", UtSsnSmsgCnt(&ssn, STREAM_TOSERVER)); goto end; } - smsg = ra_ctx->stream_q->top; + smsg = ssn.toserver_smsg_head->next->next; if (smsg->data_len != 20) { printf("expected data length to be 20, got %u: ", smsg->data_len); goto end; @@ -8278,12 +8181,12 @@ static int StreamTcpReassembleInlineTest07(void) { goto end; } - if (ra_ctx->stream_q->len != 2) { - printf("expected a single stream message, got %u: ", ra_ctx->stream_q->len); + if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 2) { + printf("expected a single stream message, got %u: ", UtSsnSmsgCnt(&ssn, STREAM_TOSERVER)); goto end; } - StreamMsg *smsg = ra_ctx->stream_q->top->next; + StreamMsg *smsg = ssn.toserver_smsg_head; if (smsg->data_len != 6) { printf("expected data length to be 6, got %u: ", smsg->data_len); goto end; @@ -8297,7 +8200,7 @@ static int StreamTcpReassembleInlineTest07(void) { goto end; } - smsg = ra_ctx->stream_q->top; + smsg = ssn.toserver_smsg_head->next; if (smsg->data_len != 5) { printf("expected data length to be 5, got %u: ", smsg->data_len); goto end; @@ -8325,12 +8228,12 @@ static int StreamTcpReassembleInlineTest07(void) { goto end; } - if (ra_ctx->stream_q->len != 3) { - printf("expected a single stream message, got %u: ", ra_ctx->stream_q->len); + if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 3) { + printf("expected a single stream message, got %u: ", UtSsnSmsgCnt(&ssn, STREAM_TOSERVER)); goto end; } - smsg = ra_ctx->stream_q->top; + smsg = ssn.toserver_smsg_head->next->next; if (smsg->data_len != 16) { printf("expected data length to be 16, got %u: ", smsg->data_len); goto end; @@ -8409,12 +8312,12 @@ static int StreamTcpReassembleInlineTest08(void) { goto end; } - if (ra_ctx->stream_q->len != 1) { - printf("expected a single stream message, got %u: ", ra_ctx->stream_q->len); + if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 1) { + printf("expected a single stream message: "); goto end; } - StreamMsg *smsg = ra_ctx->stream_q->top; + StreamMsg *smsg = ssn.toserver_smsg_head; if (smsg->data_len != 15) { printf("expected data length to be 15, got %u: ", smsg->data_len); goto end; @@ -8447,12 +8350,12 @@ static int StreamTcpReassembleInlineTest08(void) { goto end; } - if (ra_ctx->stream_q->len != 2) { - printf("expected a single stream message, got %u: ", ra_ctx->stream_q->len); + if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 2) { + printf("expected a single stream message, got %u: ", UtSsnSmsgCnt(&ssn, STREAM_TOSERVER)); goto end; } - smsg = ra_ctx->stream_q->top; + smsg = ssn.toserver_smsg_head->next; if (smsg->data_len != 15) { printf("expected data length to be 15, got %u: ", smsg->data_len); goto end; @@ -8542,12 +8445,12 @@ static int StreamTcpReassembleInlineTest09(void) { goto end; } - if (ra_ctx->stream_q->len != 2) { - printf("expected 2 stream message2, got %u: ", ra_ctx->stream_q->len); + if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 2) { + printf("expected 2 stream message2, got %u: ", UtSsnSmsgCnt(&ssn, STREAM_TOSERVER)); goto end; } - StreamMsg *smsg = ra_ctx->stream_q->bot; + StreamMsg *smsg = ssn.toserver_smsg_head; if (smsg->data_len != 10) { printf("expected data length to be 10, got %u (bot): ", smsg->data_len); goto end; @@ -8561,7 +8464,7 @@ static int StreamTcpReassembleInlineTest09(void) { goto end; } - smsg = ra_ctx->stream_q->top; + smsg = ssn.toserver_smsg_head->next; if (smsg->data_len != 5) { printf("expected data length to be 5, got %u (top): ", smsg->data_len); goto end; @@ -8595,12 +8498,12 @@ static int StreamTcpReassembleInlineTest09(void) { goto end; } - if (ra_ctx->stream_q->len != 3) { - printf("expected 3 stream messages, got %u: ", ra_ctx->stream_q->len); + if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 3) { + printf("expected 3 stream messages: "); goto end; } - smsg = ra_ctx->stream_q->top; + smsg = ssn.toserver_smsg_head->next->next; if (smsg->data_len != 20) { printf("expected data length to be 20, got %u: ", smsg->data_len); goto end; @@ -8774,12 +8677,12 @@ static int StreamTcpReassembleInsertTest01(void) { goto end; } - if (ra_ctx->stream_q->len != 1) { - printf("expected a single stream message, got %u: ", ra_ctx->stream_q->len); + if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 1) { + printf("expected a single stream message: "); goto end; } - StreamMsg *smsg = ra_ctx->stream_q->top; + StreamMsg *smsg = ssn.toserver_smsg_head; if (smsg->data_len != 20) { printf("expected data length to be 20, got %u: ", smsg->data_len); goto end; diff --git a/src/stream-tcp-reassemble.h b/src/stream-tcp-reassemble.h index 17a1f9913f..5b89b1a72d 100644 --- a/src/stream-tcp-reassemble.h +++ b/src/stream-tcp-reassemble.h @@ -52,7 +52,6 @@ enum }; typedef struct TcpReassemblyThreadCtx_ { - StreamMsgQueue *stream_q; void *app_tctx; /** TCP segments which are not being reassembled due to memcap was reached */ uint16_t counter_tcp_segment_memcap; @@ -84,7 +83,6 @@ int StreamTcpReassembleInlineAppLayer(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx, TcpSession *ssn, TcpStream *stream, Packet *p); -int StreamTcpReassembleProcessAppLayer(TcpReassemblyThreadCtx *, TcpSession *); void StreamTcpCreateTestPacket(uint8_t *, uint8_t, uint8_t, uint8_t); diff --git a/src/stream-tcp.c b/src/stream-tcp.c index 26eb59cc10..4199af0df0 100644 --- a/src/stream-tcp.c +++ b/src/stream-tcp.c @@ -4369,11 +4369,6 @@ int StreamTcpPacket (ThreadVars *tv, Packet *p, StreamTcpThread *stt, SCLogDebug("processing pseudo packet / stream end done"); } - /* Process stream smsgs we may have in queue */ - if (StreamTcpReassembleProcessAppLayer(stt->ra_ctx, ssn) < 0) { - goto error; - } - /* recalc the csum on the packet if it was modified */ if (p->flags & PKT_STREAM_MODIFIED) { ReCalculateChecksum(p); @@ -5501,10 +5496,8 @@ static int StreamTcpTest02 (void) { uint8_t payload[4]; TCPHdr tcph; TcpReassemblyThreadCtx ra_ctx; - StreamMsgQueue stream_q; PacketQueue pq; memset(&pq,0,sizeof(PacketQueue)); - memset(&stream_q, 0, sizeof(StreamMsgQueue)); memset(&ra_ctx, 0, sizeof(TcpReassemblyThreadCtx)); memset(p, 0, SIZE_OF_PACKET); memset (&f, 0, sizeof(Flow)); @@ -5517,7 +5510,6 @@ static int StreamTcpTest02 (void) { p->tcph = &tcph; p->flowflags = FLOW_PKT_TOSERVER; int ret = 0; - ra_ctx.stream_q = &stream_q; stt.ra_ctx = &ra_ctx; StreamTcpInitConfig(TRUE); @@ -9312,10 +9304,8 @@ static int StreamTcpTest38 (void) { uint8_t payload[4]; TCPHdr tcph; TcpReassemblyThreadCtx ra_ctx; - StreamMsgQueue stream_q; PacketQueue pq; - memset(&stream_q, 0, sizeof(StreamMsgQueue)); memset(&ra_ctx, 0, sizeof(TcpReassemblyThreadCtx)); memset (&f, 0, sizeof(Flow)); memset(&tv, 0, sizeof (ThreadVars)); @@ -9333,7 +9323,6 @@ static int StreamTcpTest38 (void) { tcph.th_flags = TH_SYN; p->tcph = &tcph; p->flowflags = FLOW_PKT_TOSERVER; - ra_ctx.stream_q = &stream_q; stt.ra_ctx = &ra_ctx; StreamTcpInitConfig(TRUE); @@ -9430,10 +9419,8 @@ static int StreamTcpTest39 (void) { uint8_t payload[4]; TCPHdr tcph; TcpReassemblyThreadCtx ra_ctx; - StreamMsgQueue stream_q; PacketQueue pq; - memset(&stream_q, 0, sizeof(StreamMsgQueue)); memset(&ra_ctx, 0, sizeof(TcpReassemblyThreadCtx)); memset (&f, 0, sizeof(Flow)); memset(&tv, 0, sizeof (ThreadVars)); @@ -9452,7 +9439,6 @@ static int StreamTcpTest39 (void) { p->tcph = &tcph; p->flowflags = FLOW_PKT_TOSERVER; int ret = 0; - ra_ctx.stream_q = &stream_q; stt.ra_ctx = &ra_ctx; StreamTcpInitConfig(TRUE);