From 5ecd187b6f06ad2a6307d7911860a785f9c13270 Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Mon, 21 Sep 2009 14:40:58 +0200 Subject: [PATCH] Tie app layer parsing to the stream engine. --- src/app-layer-detect-proto.c | 105 ++++++++++++++-------------- src/app-layer-detect-proto.h | 3 + src/app-layer-parser.c | 8 +-- src/eidps.c | 2 +- src/stream-tcp-reassemble.c | 128 ++++++++++++++++++++++------------- src/stream-tcp-reassemble.h | 11 ++- src/stream-tcp.c | 76 ++++++++++++--------- src/stream.c | 16 ++++- src/stream.h | 3 +- 9 files changed, 213 insertions(+), 139 deletions(-) diff --git a/src/app-layer-detect-proto.c b/src/app-layer-detect-proto.c index 11074e3496..c8241c60cd 100644 --- a/src/app-layer-detect-proto.c +++ b/src/app-layer-detect-proto.c @@ -292,11 +292,65 @@ end: return proto; } +int AppLayerHandleMsg(StreamMsg *smsg) { + uint16_t alproto = ALPROTO_UNKNOWN; + + mutex_lock(&smsg->flow->m); + TcpSession *ssn = smsg->flow->protoctx; + if (ssn != NULL) { + alproto = ssn->alproto; + } + mutex_unlock(&smsg->flow->m); + + if (ssn != NULL) { + if (smsg->flags & STREAM_START) { + //printf("L7AppDetectThread: stream initializer (len %" PRIu32 " (%" PRIu32 "))\n", smsg->data.data_len, MSG_DATA_SIZE); + + //printf("=> Init Stream Data -- start\n"); + //PrintRawDataFp(stdout, smsg->init.data, smsg->init.data_len); + //printf("=> Init Stream Data -- end\n"); + + alproto = AppLayerDetectGetProto(&alp_proto_ctx, &alp_proto_tctx, smsg->data.data, smsg->data.data_len, smsg->flags); + if (alproto != ALPROTO_UNKNOWN) { + /* store the proto and setup the L7 data array */ + mutex_lock(&smsg->flow->m); + StreamL7DataPtrInit(ssn,StreamL7GetStorageSize()); + ssn->alproto = alproto; + mutex_unlock(&smsg->flow->m); + + AppLayerParse(smsg->flow, alproto, smsg->flags, smsg->data.data, smsg->data.data_len); + } + } else { + //printf("AppLayerDetectThread: stream data (len %" PRIu32 " (%" PRIu32 ")), alproto %"PRIu16"\n", smsg->data.data_len, MSG_DATA_SIZE, alproto); + + //printf("=> Stream Data -- start\n"); + //PrintRawDataFp(stdout, smsg->data.data, smsg->data.data_len); + //printf("=> Stream Data -- end\n"); + + /* if we don't have a data object here we are not getting it + * a start msg should have gotten us one */ + if (alproto != ALPROTO_UNKNOWN) { + AppLayerParse(smsg->flow, alproto, smsg->flags, smsg->data.data, smsg->data.data_len); + } else { + //printf("AppLayerDetectThread: smsg not start, but no l7 data? Weird\n"); + } + } + } + + mutex_lock(&smsg->flow->m); + smsg->flow->use_cnt--; + mutex_unlock(&smsg->flow->m); + + /* return the used message to the queue */ + StreamMsgReturnToPool(smsg); + + return 0; +} + void *AppLayerDetectProtoThread(void *td) { ThreadVars *tv = (ThreadVars *)td; char run = TRUE; - uint16_t alproto = ALPROTO_UNKNOWN; /* get the stream msg queue for this thread */ StreamMsgQueue *stream_q = StreamMsgQueueGetByPort(0); @@ -313,54 +367,7 @@ void *AppLayerDetectProtoThread(void *td) /* grab a msg, can return NULL on signals */ StreamMsg *smsg = StreamMsgGetFromQueue(stream_q); if (smsg != NULL) { - mutex_lock(&smsg->flow->m); - TcpSession *ssn = smsg->flow->protoctx; - if (ssn != NULL) { - alproto = ssn->alproto; - } - mutex_unlock(&smsg->flow->m); - - if (ssn != NULL) { - if (smsg->flags & STREAM_START) { - //printf("L7AppDetectThread: stream initializer (len %" PRIu32 " (%" PRIu32 "))\n", smsg->data.data_len, MSG_DATA_SIZE); - - //printf("=> Init Stream Data -- start\n"); - //PrintRawDataFp(stdout, smsg->init.data, smsg->init.data_len); - //printf("=> Init Stream Data -- end\n"); - - alproto = AppLayerDetectGetProto(&alp_proto_ctx, &alp_proto_tctx, smsg->data.data, smsg->data.data_len, smsg->flags); - if (alproto != ALPROTO_UNKNOWN) { - /* store the proto and setup the L7 data array */ - mutex_lock(&smsg->flow->m); - StreamL7DataPtrInit(ssn,StreamL7GetStorageSize()); - ssn->alproto = alproto; - mutex_unlock(&smsg->flow->m); - - AppLayerParse(smsg->flow, alproto, smsg->flags, smsg->data.data, smsg->data.data_len); - } - } else { - //printf("AppLayerDetectThread: stream data (len %" PRIu32 " (%" PRIu32 ")), alproto %"PRIu16"\n", smsg->data.data_len, MSG_DATA_SIZE, alproto); - - //printf("=> Stream Data -- start\n"); - //PrintRawDataFp(stdout, smsg->data.data, smsg->data.data_len); - //printf("=> Stream Data -- end\n"); - - /* if we don't have a data object here we are not getting it - * a start msg should have gotten us one */ - if (alproto != ALPROTO_UNKNOWN) { - AppLayerParse(smsg->flow, alproto, smsg->flags, smsg->data.data, smsg->data.data_len); - } else { - //printf("AppLayerDetectThread: smsg not start, but no l7 data? Weird\n"); - } - } - } - - mutex_lock(&smsg->flow->m); - smsg->flow->use_cnt--; - mutex_unlock(&smsg->flow->m); - - /* return the used message to the queue */ - StreamMsgReturnToPool(smsg); + AppLayerHandleMsg(smsg); } if (TmThreadsCheckFlag(tv, THV_KILL)) { diff --git a/src/app-layer-detect-proto.h b/src/app-layer-detect-proto.h index 041e5fba6d..a9a3266ad4 100644 --- a/src/app-layer-detect-proto.h +++ b/src/app-layer-detect-proto.h @@ -1,6 +1,9 @@ #ifndef __APP_LAYER_DETECT_PROTO_H__ #define __APP_LAYER_DETECT_PROTO_H__ +#include "stream.h" + +int AppLayerHandleMsg(StreamMsg *smsg); void *AppLayerDetectProtoThread(void *td); void AppLayerDetectProtoThreadInit(void); diff --git a/src/app-layer-parser.c b/src/app-layer-parser.c index 08a5a8d582..123d918fd6 100644 --- a/src/app-layer-parser.c +++ b/src/app-layer-parser.c @@ -517,9 +517,9 @@ int AppLayerParse(Flow *f, uint8_t proto, uint8_t flags, uint8_t *input, uint32_ if (parser_state_store == NULL) return -1; - mutex_lock(&f->m); + //mutex_lock(&f->m); ssn->aldata[app_layer_sid] = (void *)parser_state_store; - mutex_unlock(&f->m); + //mutex_unlock(&f->m); } AppLayerParserState *parser_state = NULL; @@ -563,9 +563,9 @@ int AppLayerParse(Flow *f, uint8_t proto, uint8_t flags, uint8_t *input, uint32_ if (app_layer_state == NULL) return -1; - mutex_lock(&f->m); + //mutex_lock(&f->m); ssn->aldata[p->storage_id] = app_layer_state; - mutex_unlock(&f->m); + //mutex_unlock(&f->m); } /* invoke the recursive parser */ diff --git a/src/eidps.c b/src/eidps.c index 8fcad3fd98..a90bd1af81 100644 --- a/src/eidps.c +++ b/src/eidps.c @@ -461,7 +461,7 @@ int main(int argc, char **argv) StreamTcpInitConfig(STREAM_VERBOSE); /* Spawn the L7 App Detect thread */ - AppLayerDetectProtoThreadSpawn(); + //AppLayerDetectProtoThreadSpawn(); /* Spawn the perf counter threads. Let these be the last one spawned */ PerfSpawnThreads(); diff --git a/src/stream-tcp-reassemble.c b/src/stream-tcp-reassemble.c index f0049e53f5..c6889d0bad 100644 --- a/src/stream-tcp-reassemble.c +++ b/src/stream-tcp-reassemble.c @@ -31,6 +31,8 @@ #include "stream.h" +#include "app-layer-detect-proto.h" + //#define DEBUG #ifdef DEBUG @@ -154,6 +156,19 @@ void StreamTcpReassembleFree(char quiet) { StreamMsgQueuesDeinit(quiet); } +TcpReassemblyThreadCtx *StreamTcpReassembleInitThreadCtx(void) { + TcpReassemblyThreadCtx *ra_ctx = malloc(sizeof(TcpReassemblyThreadCtx)); + if (ra_ctx == NULL) { + return NULL; + } + + memset(ra_ctx, 0x00, sizeof(TcpReassemblyThreadCtx)); + + ra_ctx->stream_q = StreamMsgQueueGetNew(); + + return ra_ctx; +} + void PrintList2(TcpSegment *seg) { TcpSegment *prev_seg = NULL; @@ -979,7 +994,7 @@ static int StreamTcpReassembleCheckLimit(TcpSession *ssn, TcpStream *stream, Pac return 1; } -int StreamTcpReassembleHandleSegmentUpdateACK (TcpSession *ssn, TcpStream *stream, Packet *p) { +int StreamTcpReassembleHandleSegmentUpdateACK (TcpReassemblyThreadCtx *ra_ctx, TcpSession *ssn, TcpStream *stream, Packet *p) { if (stream->seg_list == NULL) return 0; @@ -1050,7 +1065,7 @@ int StreamTcpReassembleHandleSegmentUpdateACK (TcpSession *ssn, TcpStream *strea /* pass on pre existing smsg (if any) */ if (smsg != NULL && smsg->data.data_len > 0) { - StreamMsgPutInQueue(smsg); + StreamMsgPutInQueue(ra_ctx->stream_q, smsg); smsg = NULL; } if (smsg == NULL) { @@ -1071,7 +1086,7 @@ int StreamTcpReassembleHandleSegmentUpdateACK (TcpSession *ssn, TcpStream *strea smsg->flags |= STREAM_GAP; smsg->gap.gap_size = gap_len; - StreamMsgPutInQueue(smsg); + StreamMsgPutInQueue(ra_ctx->stream_q,smsg); smsg = NULL; smsg_offset = 0; } @@ -1130,7 +1145,7 @@ int StreamTcpReassembleHandleSegmentUpdateACK (TcpSession *ssn, TcpStream *strea /* queue the smsg if it's full */ if (smsg->data.data_len == sizeof (smsg->data.data)) { - StreamMsgPutInQueue(smsg); + StreamMsgPutInQueue(ra_ctx->stream_q, smsg); smsg = NULL; } @@ -1190,7 +1205,7 @@ int StreamTcpReassembleHandleSegmentUpdateACK (TcpSession *ssn, TcpStream *strea payload_offset, smsg_offset, copy_size); #endif if (smsg->data.data_len == sizeof (smsg->data.data)) { - StreamMsgPutInQueue(smsg); + StreamMsgPutInQueue(ra_ctx->stream_q,smsg); smsg = NULL; } @@ -1229,18 +1244,18 @@ int StreamTcpReassembleHandleSegmentUpdateACK (TcpSession *ssn, TcpStream *strea /* put the partly filled smsg in the queue to the l7 handler */ if (smsg != NULL) { - StreamMsgPutInQueue(smsg); + StreamMsgPutInQueue(ra_ctx->stream_q,smsg); smsg = NULL; } return 0; } -int StreamTcpReassembleHandleSegment(TcpSession *ssn, TcpStream *stream, Packet *p) { +int StreamTcpReassembleHandleSegment(TcpReassemblyThreadCtx *ra_ctx, TcpSession *ssn, TcpStream *stream, Packet *p) { //printf("StreamTcpReassembleHandleSegment: ssn %p, stream %p, p %p, p->payload_len %"PRIu16"\n", ssn, stream, p, p->payload_len); /* handle ack received */ - if (StreamTcpReassembleHandleSegmentUpdateACK(ssn, stream, p) != 0) { + if (StreamTcpReassembleHandleSegmentUpdateACK(ra_ctx, ssn, stream, p) != 0) { #ifdef DEBUG printf("StreamTcpReassembleHandleSegment: StreamTcpReassembleHandleSegmentUpdateACK error\n"); #endif @@ -1256,6 +1271,20 @@ int StreamTcpReassembleHandleSegment(TcpSession *ssn, TcpStream *stream, Packet } } + /* Handle smsgs */ + if (ra_ctx != NULL && ra_ctx->stream_q) { + printf("StreamTcpReassembleHandleSegment: ra_ctx %p, %u\n", ra_ctx, ra_ctx->stream_q->len); + + StreamMsg *smsg = NULL; + while (ra_ctx->stream_q->len > 0) { + smsg = StreamMsgGetFromQueue(ra_ctx->stream_q); + if (smsg != NULL) { + printf("ra_ctx %p, smsg %p, %u\n", ra_ctx, smsg, ra_ctx->stream_q->len); + AppLayerHandleMsg(smsg); + } + } + } + return 0; } @@ -1430,6 +1459,7 @@ static int StreamTcpReassembleStreamTest(TcpStream *stream) { Flow f; uint8_t payload[4]; TCPHdr tcph; + TcpReassemblyThreadCtx *ra_ctx = StreamTcpReassembleInitThreadCtx(); /* prevent L7 from kicking in */ StreamMsgQueueSetMinInitChunkLen(FLOW_PKT_TOSERVER, 4096); @@ -1455,126 +1485,126 @@ static int StreamTcpReassembleStreamTest(TcpStream *stream) { p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 3; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; StreamTcpCreateTestPacket(payload, 0x42, 2); /*BB*/ p.tcph->th_seq = htonl(16); p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 2; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; StreamTcpCreateTestPacket(payload, 0x43, 3); /*CCC*/ p.tcph->th_seq = htonl(18); p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 3; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; StreamTcpCreateTestPacket(payload, 0x44, 1); /*D*/ p.tcph->th_seq = htonl(22); p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 1; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; StreamTcpCreateTestPacket(payload, 0x45, 2); /*EE*/ p.tcph->th_seq = htonl(25); p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 2; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; StreamTcpCreateTestPacket(payload, 0x46, 3); /*FFF*/ p.tcph->th_seq = htonl(27); p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 3; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; StreamTcpCreateTestPacket(payload, 0x47, 2); /*GG*/ p.tcph->th_seq = htonl(30); p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 2; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; StreamTcpCreateTestPacket(payload, 0x48, 2); /*HH*/ p.tcph->th_seq = htonl(32); p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 2; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; StreamTcpCreateTestPacket(payload, 0x49, 1); /*I*/ p.tcph->th_seq = htonl(34); p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 1; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; StreamTcpCreateTestPacket(payload, 0x4a, 4); /*JJJJ*/ p.tcph->th_seq = htonl(13); p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 4; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; StreamTcpCreateTestPacket(payload, 0x4b, 3); /*KKK*/ p.tcph->th_seq = htonl(18); p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 3; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; StreamTcpCreateTestPacket(payload, 0x4c, 3); /*LLL*/ p.tcph->th_seq = htonl(21); p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 3; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; StreamTcpCreateTestPacket(payload, 0x4d, 3); /*MMM*/ p.tcph->th_seq = htonl(24); p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 3; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; StreamTcpCreateTestPacket(payload, 0x4e, 1); /*N*/ p.tcph->th_seq = htonl(28); p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 1; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; StreamTcpCreateTestPacket(payload, 0x4f, 1); /*O*/ p.tcph->th_seq = htonl(31); p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 1; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; StreamTcpCreateTestPacket(payload, 0x50, 1); /*P*/ p.tcph->th_seq = htonl(32); p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 1; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; StreamTcpCreateTestPacket(payload, 0x51, 2); /*QQ*/ p.tcph->th_seq = htonl(34); p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 2; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; StreamTcpCreateTestPacket(payload, 0x30, 1); /*0*/ p.tcph->th_seq = htonl(11); p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 1; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; return 1; @@ -1705,6 +1735,7 @@ static int StreamTcpTestStartsBeforeListSegment(TcpStream *stream) { Flow f; uint8_t payload[4]; TCPHdr tcph; + TcpReassemblyThreadCtx *ra_ctx = StreamTcpReassembleInitThreadCtx(); /* prevent L7 from kicking in */ StreamMsgQueueSetMinInitChunkLen(FLOW_PKT_TOSERVER, 4096); @@ -1731,49 +1762,49 @@ static int StreamTcpTestStartsBeforeListSegment(TcpStream *stream) { p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 1; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; StreamTcpCreateTestPacket(payload, 0x44, 1); /*D*/ p.tcph->th_seq = htonl(22); p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 1; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; StreamTcpCreateTestPacket(payload, 0x45, 2); /*EE*/ p.tcph->th_seq = htonl(25); p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 2; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; StreamTcpCreateTestPacket(payload, 0x41, 2); /*AA*/ p.tcph->th_seq = htonl(15); p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 2; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; StreamTcpCreateTestPacket(payload, 0x4a, 4); /*JJJJ*/ p.tcph->th_seq = htonl(14); p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 4; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; StreamTcpCreateTestPacket(payload, 0x4c, 3); /*LLL*/ p.tcph->th_seq = htonl(21); p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 3; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; StreamTcpCreateTestPacket(payload, 0x4d, 3); /*MMM*/ p.tcph->th_seq = htonl(24); p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 3; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; return 1; @@ -1792,6 +1823,7 @@ static int StreamTcpTestStartsAtSameListSegment(TcpStream *stream) { Flow f; uint8_t payload[4]; TCPHdr tcph; + TcpReassemblyThreadCtx *ra_ctx = StreamTcpReassembleInitThreadCtx(); /* prevent L7 from kicking in */ StreamMsgQueueSetMinInitChunkLen(FLOW_PKT_TOSERVER, 4096); @@ -1818,49 +1850,49 @@ static int StreamTcpTestStartsAtSameListSegment(TcpStream *stream) { p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 3; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; StreamTcpCreateTestPacket(payload, 0x48, 2); /*HH*/ p.tcph->th_seq = htonl(32); p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 2; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; StreamTcpCreateTestPacket(payload, 0x49, 1); /*I*/ p.tcph->th_seq = htonl(34); p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 1; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; StreamTcpCreateTestPacket(payload, 0x4b, 3); /*KKK*/ p.tcph->th_seq = htonl(18); p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 3; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; StreamTcpCreateTestPacket(payload, 0x4c, 4); /*LLLL*/ p.tcph->th_seq = htonl(18); p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 4; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; StreamTcpCreateTestPacket(payload, 0x50, 1); /*P*/ p.tcph->th_seq = htonl(32); p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 1; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; StreamTcpCreateTestPacket(payload, 0x51, 2); /*QQ*/ p.tcph->th_seq = htonl(34); p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 2; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; return 1; @@ -1880,6 +1912,7 @@ static int StreamTcpTestStartsAfterListSegment(TcpStream *stream) { Flow f; uint8_t payload[4]; TCPHdr tcph; + TcpReassemblyThreadCtx *ra_ctx = StreamTcpReassembleInitThreadCtx(); /* prevent L7 from kicking in */ StreamMsgQueueSetMinInitChunkLen(FLOW_PKT_TOSERVER, 4096); @@ -1906,42 +1939,42 @@ static int StreamTcpTestStartsAfterListSegment(TcpStream *stream) { p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 2; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; StreamTcpCreateTestPacket(payload, 0x46, 3); /*FFF*/ p.tcph->th_seq = htonl(27); p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 3; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; StreamTcpCreateTestPacket(payload, 0x47, 2); /*GG*/ p.tcph->th_seq = htonl(30); p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 2; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; StreamTcpCreateTestPacket(payload, 0x4a, 2); /*JJ*/ p.tcph->th_seq = htonl(13); p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 2; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; StreamTcpCreateTestPacket(payload, 0x4f, 1); /*O*/ p.tcph->th_seq = htonl(31); p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 1; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; StreamTcpCreateTestPacket(payload, 0x4e, 1); /*N*/ p.tcph->th_seq = htonl(28); p.tcph->th_ack = htonl(31); p.payload = payload; p.payload_len = 1; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return 0; return 1; @@ -2568,6 +2601,7 @@ static int StreamTcpTestMissedPacket (TcpStream *stream, uint32_t seq, uint32_t Address src; Address dst; struct in_addr in; + TcpReassemblyThreadCtx *ra_ctx = StreamTcpReassembleInitThreadCtx(); memset(&ssn, 0, sizeof (TcpSession)); memset(&p, 0, sizeof (Packet)); @@ -2603,7 +2637,7 @@ static int StreamTcpTestMissedPacket (TcpStream *stream, uint32_t seq, uint32_t p.payload_len = len; ssn.state = state; - if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1) + if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1) return -1; return 0; diff --git a/src/stream-tcp-reassemble.h b/src/stream-tcp-reassemble.h index f1fd66558b..514cd83a02 100644 --- a/src/stream-tcp-reassemble.h +++ b/src/stream-tcp-reassemble.h @@ -8,6 +8,8 @@ #ifndef __STREAM_TCP_REASSEMBLE_H__ #define __STREAM_TCP_REASSEMBLE_H__ +#include "stream.h" + /** Supported OS list and default OS policy is BSD */ enum { @@ -27,12 +29,19 @@ enum OS_POLICY_LAST }; +typedef struct TcpReassemblyThreadCtx_ { + StreamMsgQueue *stream_q; +} TcpReassemblyThreadCtx; + #define OS_POLICY_DEFAULT OS_POLICY_BSD -int StreamTcpReassembleHandleSegment(TcpSession *, TcpStream *, Packet *); +int StreamTcpReassembleHandleSegment(TcpReassemblyThreadCtx *, TcpSession *, TcpStream *, Packet *); int StreamTcpReassembleInit(char); void StreamTcpReassembleFree(char); void StreamTcpReassembleRegisterTests(void); + +TcpReassemblyThreadCtx *StreamTcpReassembleInitThreadCtx(void); + void StreamTcpCreateTestPacket(u_int8_t *, u_int8_t, u_int8_t); void StreamL7DataPtrInit(TcpSession *ssn, uint8_t cnt); diff --git a/src/stream-tcp.c b/src/stream-tcp.c index 9e56eeb071..24105b67bc 100644 --- a/src/stream-tcp.c +++ b/src/stream-tcp.c @@ -25,12 +25,20 @@ //#define DEBUG +typedef struct StreamTcpThread_ { + uint64_t pkts; + + uint16_t counter_tcp_sessions; + + TcpReassemblyThreadCtx *ra_ctx; +} StreamTcpThread; + int StreamTcp (ThreadVars *, Packet *, void *, PacketQueue *); int StreamTcpThreadInit(ThreadVars *, void *, void **); int StreamTcpThreadDeinit(ThreadVars *, void *); void StreamTcpExitPrintStats(ThreadVars *, void *); static int ValidReset(TcpSession * , Packet *); -static int StreamTcpHandleFin(TcpSession *, Packet *); +static int StreamTcpHandleFin(StreamTcpThread *, TcpSession *, Packet *); void StreamTcpRegisterTests (void); void StreamTcpReturnStreamSegments (TcpStream *); void StreamTcpInitConfig(char); @@ -57,12 +65,6 @@ static uint64_t ssn_pool_cnt; static pthread_mutex_t ssn_pool_cnt_mutex; #endif -typedef struct StreamTcpThread_ { - uint64_t pkts; - - uint16_t counter_tcp_sessions; -} StreamTcpThread; - void TmModuleStreamTcpRegister (void) { tmm_modules[TMM_STREAMTCP].name = "StreamTcp"; tmm_modules[TMM_STREAMTCP].ThreadInit = StreamTcpThreadInit; @@ -452,7 +454,7 @@ static int StreamTcpPacketStateNone(ThreadVars *tv, Packet *p, StreamTcpThread * ssn->client.last_ts = 0; } - StreamTcpReassembleHandleSegment(ssn, &ssn->client, p); + StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->client, p); break; case TH_RST: case TH_RST|TH_ACK: @@ -674,7 +676,7 @@ static int StreamTcpPacketStateSynRecv(ThreadVars *tv, Packet *p, StreamTcpThrea return -1; } - if((StreamTcpHandleFin(ssn, p)) == -1) + if((StreamTcpHandleFin(stt, ssn, p)) == -1) return -1; break; default: @@ -759,7 +761,7 @@ static int StreamTcpPacketStateEstablished(ThreadVars *tv, Packet *p, StreamTcpT #endif } - StreamTcpReassembleHandleSegment(ssn, &ssn->client, p); + StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->client, p); } else { //#define DEBUG #ifdef DEBUG @@ -826,7 +828,7 @@ static int StreamTcpPacketStateEstablished(ThreadVars *tv, Packet *p, StreamTcpT #endif } - StreamTcpReassembleHandleSegment(ssn, &ssn->server, p); + StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->server, p); } else { #ifdef DEBUG printf("StreamTcpPacketStateEstablished (%p): client => SEQ out of window, packet SEQ %" PRIu32 ", payload size %" PRIu32 " (%" PRIu32 "), ssn->server.last_ack %" PRIu32 ", ssn->server.next_win %" PRIu32 "(%"PRIu32") (ssn->server.ra_base_seq %"PRIu32")\n", ssn, TCP_GET_SEQ(p), p->payload_len, TCP_GET_SEQ(p) + p->payload_len, ssn->server.last_ack, ssn->server.next_win, TCP_GET_SEQ(p) + p->payload_len - ssn->server.next_win, ssn->server.ra_base_seq); @@ -856,7 +858,7 @@ static int StreamTcpPacketStateEstablished(ThreadVars *tv, Packet *p, StreamTcpT printf("StreamTcpPacketStateEstablished (%p): FIN received SEQ %" PRIu32 ", last ACK %" PRIu32 ", next win %" PRIu32 ", win %" PRIu32 "\n", ssn, ssn->server.next_seq, ssn->client.last_ack, ssn->server.next_win, ssn->server.window); #endif - if((StreamTcpHandleFin(ssn, p)) == -1) + if((StreamTcpHandleFin(stt, ssn, p)) == -1) return -1; break; case TH_RST: @@ -878,7 +880,7 @@ static int StreamTcpPacketStateEstablished(ThreadVars *tv, Packet *p, StreamTcpT if (SEQ_GT(TCP_GET_ACK(p),ssn->server.last_ack)) ssn->server.last_ack = TCP_GET_ACK(p); - StreamTcpReassembleHandleSegment(ssn, &ssn->client, p); + StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->client, p); #ifdef DEBUG printf("StreamTcpPacketStateEstablished (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n", ssn, ssn->client.next_seq, ssn->server.last_ack); @@ -899,7 +901,7 @@ static int StreamTcpPacketStateEstablished(ThreadVars *tv, Packet *p, StreamTcpT if (SEQ_GT(TCP_GET_ACK(p),ssn->client.last_ack)) ssn->client.last_ack = TCP_GET_ACK(p); - StreamTcpReassembleHandleSegment(ssn, &ssn->server, p); + StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->server, p); #ifdef DEBUG printf("StreamTcpPacketStateEstablished (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n", ssn, ssn->server.next_seq, ssn->client.last_ack); @@ -926,7 +928,7 @@ static int StreamTcpPacketStateEstablished(ThreadVars *tv, Packet *p, StreamTcpT * \param stt Strean Thread module registered to handle the stream handling */ -static int StreamTcpHandleFin(TcpSession *ssn, Packet *p) { +static int StreamTcpHandleFin(StreamTcpThread *stt, TcpSession *ssn, Packet *p) { if (PKT_IS_TOSERVER(p)) { #ifdef DEBUG @@ -956,7 +958,7 @@ static int StreamTcpHandleFin(TcpSession *ssn, Packet *p) { if (SEQ_GT(TCP_GET_ACK(p),ssn->server.last_ack)) ssn->server.last_ack = TCP_GET_ACK(p); - StreamTcpReassembleHandleSegment(ssn, &ssn->client, p); + StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->client, p); #ifdef DEBUG printf("StreamTcpHandleFin (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n", @@ -989,7 +991,7 @@ static int StreamTcpHandleFin(TcpSession *ssn, Packet *p) { if (SEQ_GT(TCP_GET_ACK(p),ssn->client.last_ack)) ssn->client.last_ack = TCP_GET_ACK(p); - StreamTcpReassembleHandleSegment(ssn, &ssn->server, p); + StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->server, p); #ifdef DEBUG printf("StreamTcpHandleFin (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n", @@ -1034,7 +1036,7 @@ static int StreamTcpPacketStateFinWait1(ThreadVars *tv, Packet *p, StreamTcpThre if (SEQ_GT(TCP_GET_ACK(p),ssn->server.last_ack)) ssn->server.last_ack = TCP_GET_ACK(p); - StreamTcpReassembleHandleSegment(ssn, &ssn->client, p); + StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->client, p); #ifdef DEBUG printf("StreamTcpPacketStateFinWait1 (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n", ssn, ssn->client.next_seq, ssn->server.last_ack); @@ -1051,7 +1053,7 @@ static int StreamTcpPacketStateFinWait1(ThreadVars *tv, Packet *p, StreamTcpThre if (SEQ_GT(TCP_GET_ACK(p),ssn->client.last_ack)) ssn->client.last_ack = TCP_GET_ACK(p); - StreamTcpReassembleHandleSegment(ssn, &ssn->server, p); + StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->server, p); #ifdef DEBUG printf("StreamTcpPacketStateFinWait1 (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n", ssn, ssn->server.next_seq, ssn->client.last_ack); @@ -1089,7 +1091,7 @@ static int StreamTcpPacketStateFinWait1(ThreadVars *tv, Packet *p, StreamTcpThre if (SEQ_GT(TCP_GET_ACK(p),ssn->server.last_ack)) ssn->server.last_ack = TCP_GET_ACK(p); - StreamTcpReassembleHandleSegment(ssn, &ssn->client, p); + StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->client, p); #ifdef DEBUG printf("StreamTcpPacketStateFinWait1 (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n", ssn, ssn->client.next_seq, ssn->server.last_ack); @@ -1116,7 +1118,7 @@ static int StreamTcpPacketStateFinWait1(ThreadVars *tv, Packet *p, StreamTcpThre if (SEQ_GT(TCP_GET_ACK(p),ssn->client.last_ack)) ssn->client.last_ack = TCP_GET_ACK(p); - StreamTcpReassembleHandleSegment(ssn, &ssn->server, p); + StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->server, p); #ifdef DEBUG printf("StreamTcpPacketStateFinWait1 (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n", ssn, ssn->server.next_seq, ssn->client.last_ack); @@ -1188,7 +1190,7 @@ static int StreamTcpPacketStateFinWait2(ThreadVars *tv, Packet *p, StreamTcpThre if (SEQ_GT(TCP_GET_ACK(p),ssn->server.last_ack)) ssn->server.last_ack = TCP_GET_ACK(p); - StreamTcpReassembleHandleSegment(ssn, &ssn->client, p); + StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->client, p); #ifdef DEBUG printf("StreamTcpPacketStateFinWait2 (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n", ssn, ssn->client.next_seq, ssn->server.last_ack); @@ -1214,7 +1216,7 @@ static int StreamTcpPacketStateFinWait2(ThreadVars *tv, Packet *p, StreamTcpThre if (SEQ_GT(TCP_GET_ACK(p),ssn->client.last_ack)) ssn->client.last_ack = TCP_GET_ACK(p); - StreamTcpReassembleHandleSegment(ssn, &ssn->server, p); + StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->server, p); #ifdef DEBUG printf("StreamTcpPacketStateFinWait2 (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n", ssn, ssn->server.next_seq, ssn->client.last_ack); @@ -1260,7 +1262,7 @@ static int StreamTcpPacketStateFinWait2(ThreadVars *tv, Packet *p, StreamTcpThre if (SEQ_GT(TCP_GET_ACK(p),ssn->server.last_ack)) ssn->server.last_ack = TCP_GET_ACK(p); - StreamTcpReassembleHandleSegment(ssn, &ssn->client, p); + StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->client, p); #ifdef DEBUG printf("StreamTcpPacketStateFinWait2 (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n", @@ -1288,7 +1290,7 @@ static int StreamTcpPacketStateFinWait2(ThreadVars *tv, Packet *p, StreamTcpThre if (SEQ_GT(TCP_GET_ACK(p),ssn->client.last_ack)) ssn->client.last_ack = TCP_GET_ACK(p); - StreamTcpReassembleHandleSegment(ssn, &ssn->server, p); + StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->server, p); #ifdef DEBUG printf("StreamTcpPacketStateFinWait2 (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n", ssn, ssn->server.next_seq, ssn->client.last_ack); @@ -1350,7 +1352,7 @@ static int StreamTcpPacketStateClosing(ThreadVars *tv, Packet *p, StreamTcpThrea if (SEQ_GT(TCP_GET_ACK(p),ssn->server.last_ack)) ssn->server.last_ack = TCP_GET_ACK(p); - StreamTcpReassembleHandleSegment(ssn, &ssn->client, p); + StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->client, p); #ifdef DEBUG printf("StreamTcpPacketStateClosing (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n", ssn, ssn->client.next_seq, ssn->server.last_ack); @@ -1378,7 +1380,7 @@ static int StreamTcpPacketStateClosing(ThreadVars *tv, Packet *p, StreamTcpThrea if (SEQ_GT(TCP_GET_ACK(p),ssn->client.last_ack)) ssn->client.last_ack = TCP_GET_ACK(p); - StreamTcpReassembleHandleSegment(ssn, &ssn->server, p); + StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->server, p); #ifdef DEBUG printf("StreamTcpPacketStateClosing (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n", ssn, ssn->server.next_seq, ssn->client.last_ack); @@ -1439,7 +1441,7 @@ static int StreamTcpPacketStateCloseWait(ThreadVars *tv, Packet *p, StreamTcpThr if (SEQ_GT(TCP_GET_ACK(p),ssn->client.last_ack)) ssn->client.last_ack = TCP_GET_ACK(p); - StreamTcpReassembleHandleSegment(ssn, &ssn->server, p); + StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->server, p); #ifdef DEBUG printf("StreamTcpPacketStateCloseWait (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n", ssn, ssn->server.next_seq, ssn->client.last_ack); @@ -1498,7 +1500,7 @@ static int StreamTcpPakcetStateLastAck(ThreadVars *tv, Packet *p, StreamTcpThrea if (SEQ_GT(TCP_GET_ACK(p),ssn->server.last_ack)) ssn->server.last_ack = TCP_GET_ACK(p); - StreamTcpReassembleHandleSegment(ssn, &ssn->client, p); + StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->client, p); #ifdef DEBUG printf("StreamTcpPacketStateLastAck (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n", ssn, ssn->client.next_seq, ssn->server.last_ack); @@ -1558,7 +1560,7 @@ static int StreamTcpPacketStateTimeWait(ThreadVars *tv, Packet *p, StreamTcpThre if (SEQ_GT(TCP_GET_ACK(p),ssn->server.last_ack)) ssn->server.last_ack = TCP_GET_ACK(p); - StreamTcpReassembleHandleSegment(ssn, &ssn->client, p); + StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->client, p); #ifdef DEBUG printf("StreamTcpPacketStateTimeWait (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n", ssn, ssn->client.next_seq, ssn->server.last_ack); @@ -1584,7 +1586,7 @@ static int StreamTcpPacketStateTimeWait(ThreadVars *tv, Packet *p, StreamTcpThre if (SEQ_GT(TCP_GET_ACK(p),ssn->client.last_ack)) ssn->client.last_ack = TCP_GET_ACK(p); - StreamTcpReassembleHandleSegment(ssn, &ssn->server, p); + StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->server, p); #ifdef DEBUG printf("StreamTcpPacketStateTimeWait (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n", ssn, ssn->server.next_seq, ssn->client.last_ack); @@ -1696,14 +1698,19 @@ int StreamTcpThreadInit(ThreadVars *tv, void *initdata, void **data) } memset(stt, 0, sizeof(StreamTcpThread)); - /* XXX */ - *data = (void *)stt; stt->counter_tcp_sessions = PerfTVRegisterCounter("tcp.sessions", tv, TYPE_UINT64, "NULL"); tv->pca = PerfGetAllCountersArray(&tv->pctx); PerfAddToClubbedTMTable(tv->name, &tv->pctx); + /* init reassembly ctx */ + stt->ra_ctx = StreamTcpReassembleInitThreadCtx(); + if (stt->ra_ctx == NULL) + return -1; +#ifdef DEBUG + printf("StreamTcp thread specific ctx online at %p, reassembly ctx %p\n", stt, stt->ra_ctx); +#endif return 0; } @@ -1716,6 +1723,9 @@ int StreamTcpThreadDeinit(ThreadVars *tv, void *data) /* XXX */ + /* free reassembly ctx */ + + /* clear memory */ memset(stt, 0, sizeof(StreamTcpThread)); diff --git a/src/stream.c b/src/stream.c index 371e757250..db61e823bb 100644 --- a/src/stream.c +++ b/src/stream.c @@ -130,10 +130,8 @@ StreamMsg *StreamMsgGetFromQueue(StreamMsgQueue *q) } /* Used by stream reassembler to fill the queue for l7inspect reading */ -void StreamMsgPutInQueue(StreamMsg *s) +void StreamMsgPutInQueue(StreamMsgQueue *q, StreamMsg *s) { - StreamMsgQueue *q = &stream_q; - mutex_lock(&q->mutex_q); StreamMsgEnqueue(q, s); #ifdef DEBUG @@ -160,6 +158,18 @@ void StreamMsgQueuesDeinit(char quiet) { printf("StreamMsgQueuesDeinit: stream_pool_memuse %"PRIu64", stream_pool_memcnt %"PRIu64"\n", stream_pool_memuse, stream_pool_memcnt); } +/** \brief alloc a stream msg queue + * \retval smq ptr to the queue or NULL */ +StreamMsgQueue *StreamMsgQueueGetNew(void) { + StreamMsgQueue *smq = malloc(sizeof(StreamMsgQueue)); + if (smq == NULL) { + return NULL; + } + + memset(smq, 0x00, sizeof(StreamMsgQueue)); + return smq; +} + StreamMsgQueue *StreamMsgQueueGetByPort(uint16_t port) { /* XXX implement this */ return &stream_q; diff --git a/src/stream.h b/src/stream.h index 4baa764e72..16e0f140d2 100644 --- a/src/stream.h +++ b/src/stream.h @@ -54,8 +54,9 @@ void StreamMsgQueuesDeinit(char); StreamMsg *StreamMsgGetFromPool(void); void StreamMsgReturnToPool(StreamMsg *); StreamMsg *StreamMsgGetFromQueue(StreamMsgQueue *); -void StreamMsgPutInQueue(StreamMsg *); +void StreamMsgPutInQueue(StreamMsgQueue *, StreamMsg *); +StreamMsgQueue *StreamMsgQueueGetNew(void); StreamMsgQueue *StreamMsgQueueGetByPort(uint16_t); void StreamMsgQueueSetMinInitChunkLen(uint8_t, uint16_t);