Tie app layer parsing to the stream engine.

remotes/origin/master-1.0.x
Victor Julien 16 years ago
parent f0d556b9e3
commit 5ecd187b6f

@ -292,11 +292,65 @@ end:
return proto;
}
int AppLayerHandleMsg(StreamMsg *smsg) {
uint16_t alproto = ALPROTO_UNKNOWN;
mutex_lock(&smsg->flow->m);
TcpSession *ssn = smsg->flow->protoctx;
if (ssn != NULL) {
alproto = ssn->alproto;
}
mutex_unlock(&smsg->flow->m);
if (ssn != NULL) {
if (smsg->flags & STREAM_START) {
//printf("L7AppDetectThread: stream initializer (len %" PRIu32 " (%" PRIu32 "))\n", smsg->data.data_len, MSG_DATA_SIZE);
//printf("=> Init Stream Data -- start\n");
//PrintRawDataFp(stdout, smsg->init.data, smsg->init.data_len);
//printf("=> Init Stream Data -- end\n");
alproto = AppLayerDetectGetProto(&alp_proto_ctx, &alp_proto_tctx, smsg->data.data, smsg->data.data_len, smsg->flags);
if (alproto != ALPROTO_UNKNOWN) {
/* store the proto and setup the L7 data array */
mutex_lock(&smsg->flow->m);
StreamL7DataPtrInit(ssn,StreamL7GetStorageSize());
ssn->alproto = alproto;
mutex_unlock(&smsg->flow->m);
AppLayerParse(smsg->flow, alproto, smsg->flags, smsg->data.data, smsg->data.data_len);
}
} else {
//printf("AppLayerDetectThread: stream data (len %" PRIu32 " (%" PRIu32 ")), alproto %"PRIu16"\n", smsg->data.data_len, MSG_DATA_SIZE, alproto);
//printf("=> Stream Data -- start\n");
//PrintRawDataFp(stdout, smsg->data.data, smsg->data.data_len);
//printf("=> Stream Data -- end\n");
/* if we don't have a data object here we are not getting it
* a start msg should have gotten us one */
if (alproto != ALPROTO_UNKNOWN) {
AppLayerParse(smsg->flow, alproto, smsg->flags, smsg->data.data, smsg->data.data_len);
} else {
//printf("AppLayerDetectThread: smsg not start, but no l7 data? Weird\n");
}
}
}
mutex_lock(&smsg->flow->m);
smsg->flow->use_cnt--;
mutex_unlock(&smsg->flow->m);
/* return the used message to the queue */
StreamMsgReturnToPool(smsg);
return 0;
}
void *AppLayerDetectProtoThread(void *td)
{
ThreadVars *tv = (ThreadVars *)td;
char run = TRUE;
uint16_t alproto = ALPROTO_UNKNOWN;
/* get the stream msg queue for this thread */
StreamMsgQueue *stream_q = StreamMsgQueueGetByPort(0);
@ -313,54 +367,7 @@ void *AppLayerDetectProtoThread(void *td)
/* grab a msg, can return NULL on signals */
StreamMsg *smsg = StreamMsgGetFromQueue(stream_q);
if (smsg != NULL) {
mutex_lock(&smsg->flow->m);
TcpSession *ssn = smsg->flow->protoctx;
if (ssn != NULL) {
alproto = ssn->alproto;
}
mutex_unlock(&smsg->flow->m);
if (ssn != NULL) {
if (smsg->flags & STREAM_START) {
//printf("L7AppDetectThread: stream initializer (len %" PRIu32 " (%" PRIu32 "))\n", smsg->data.data_len, MSG_DATA_SIZE);
//printf("=> Init Stream Data -- start\n");
//PrintRawDataFp(stdout, smsg->init.data, smsg->init.data_len);
//printf("=> Init Stream Data -- end\n");
alproto = AppLayerDetectGetProto(&alp_proto_ctx, &alp_proto_tctx, smsg->data.data, smsg->data.data_len, smsg->flags);
if (alproto != ALPROTO_UNKNOWN) {
/* store the proto and setup the L7 data array */
mutex_lock(&smsg->flow->m);
StreamL7DataPtrInit(ssn,StreamL7GetStorageSize());
ssn->alproto = alproto;
mutex_unlock(&smsg->flow->m);
AppLayerParse(smsg->flow, alproto, smsg->flags, smsg->data.data, smsg->data.data_len);
}
} else {
//printf("AppLayerDetectThread: stream data (len %" PRIu32 " (%" PRIu32 ")), alproto %"PRIu16"\n", smsg->data.data_len, MSG_DATA_SIZE, alproto);
//printf("=> Stream Data -- start\n");
//PrintRawDataFp(stdout, smsg->data.data, smsg->data.data_len);
//printf("=> Stream Data -- end\n");
/* if we don't have a data object here we are not getting it
* a start msg should have gotten us one */
if (alproto != ALPROTO_UNKNOWN) {
AppLayerParse(smsg->flow, alproto, smsg->flags, smsg->data.data, smsg->data.data_len);
} else {
//printf("AppLayerDetectThread: smsg not start, but no l7 data? Weird\n");
}
}
}
mutex_lock(&smsg->flow->m);
smsg->flow->use_cnt--;
mutex_unlock(&smsg->flow->m);
/* return the used message to the queue */
StreamMsgReturnToPool(smsg);
AppLayerHandleMsg(smsg);
}
if (TmThreadsCheckFlag(tv, THV_KILL)) {

@ -1,6 +1,9 @@
#ifndef __APP_LAYER_DETECT_PROTO_H__
#define __APP_LAYER_DETECT_PROTO_H__
#include "stream.h"
int AppLayerHandleMsg(StreamMsg *smsg);
void *AppLayerDetectProtoThread(void *td);
void AppLayerDetectProtoThreadInit(void);

@ -517,9 +517,9 @@ int AppLayerParse(Flow *f, uint8_t proto, uint8_t flags, uint8_t *input, uint32_
if (parser_state_store == NULL)
return -1;
mutex_lock(&f->m);
//mutex_lock(&f->m);
ssn->aldata[app_layer_sid] = (void *)parser_state_store;
mutex_unlock(&f->m);
//mutex_unlock(&f->m);
}
AppLayerParserState *parser_state = NULL;
@ -563,9 +563,9 @@ int AppLayerParse(Flow *f, uint8_t proto, uint8_t flags, uint8_t *input, uint32_
if (app_layer_state == NULL)
return -1;
mutex_lock(&f->m);
//mutex_lock(&f->m);
ssn->aldata[p->storage_id] = app_layer_state;
mutex_unlock(&f->m);
//mutex_unlock(&f->m);
}
/* invoke the recursive parser */

@ -461,7 +461,7 @@ int main(int argc, char **argv)
StreamTcpInitConfig(STREAM_VERBOSE);
/* Spawn the L7 App Detect thread */
AppLayerDetectProtoThreadSpawn();
//AppLayerDetectProtoThreadSpawn();
/* Spawn the perf counter threads. Let these be the last one spawned */
PerfSpawnThreads();

@ -31,6 +31,8 @@
#include "stream.h"
#include "app-layer-detect-proto.h"
//#define DEBUG
#ifdef DEBUG
@ -154,6 +156,19 @@ void StreamTcpReassembleFree(char quiet) {
StreamMsgQueuesDeinit(quiet);
}
TcpReassemblyThreadCtx *StreamTcpReassembleInitThreadCtx(void) {
TcpReassemblyThreadCtx *ra_ctx = malloc(sizeof(TcpReassemblyThreadCtx));
if (ra_ctx == NULL) {
return NULL;
}
memset(ra_ctx, 0x00, sizeof(TcpReassemblyThreadCtx));
ra_ctx->stream_q = StreamMsgQueueGetNew();
return ra_ctx;
}
void PrintList2(TcpSegment *seg) {
TcpSegment *prev_seg = NULL;
@ -979,7 +994,7 @@ static int StreamTcpReassembleCheckLimit(TcpSession *ssn, TcpStream *stream, Pac
return 1;
}
int StreamTcpReassembleHandleSegmentUpdateACK (TcpSession *ssn, TcpStream *stream, Packet *p) {
int StreamTcpReassembleHandleSegmentUpdateACK (TcpReassemblyThreadCtx *ra_ctx, TcpSession *ssn, TcpStream *stream, Packet *p) {
if (stream->seg_list == NULL)
return 0;
@ -1050,7 +1065,7 @@ int StreamTcpReassembleHandleSegmentUpdateACK (TcpSession *ssn, TcpStream *strea
/* pass on pre existing smsg (if any) */
if (smsg != NULL && smsg->data.data_len > 0) {
StreamMsgPutInQueue(smsg);
StreamMsgPutInQueue(ra_ctx->stream_q, smsg);
smsg = NULL;
}
if (smsg == NULL) {
@ -1071,7 +1086,7 @@ int StreamTcpReassembleHandleSegmentUpdateACK (TcpSession *ssn, TcpStream *strea
smsg->flags |= STREAM_GAP;
smsg->gap.gap_size = gap_len;
StreamMsgPutInQueue(smsg);
StreamMsgPutInQueue(ra_ctx->stream_q,smsg);
smsg = NULL;
smsg_offset = 0;
}
@ -1130,7 +1145,7 @@ int StreamTcpReassembleHandleSegmentUpdateACK (TcpSession *ssn, TcpStream *strea
/* queue the smsg if it's full */
if (smsg->data.data_len == sizeof (smsg->data.data)) {
StreamMsgPutInQueue(smsg);
StreamMsgPutInQueue(ra_ctx->stream_q, smsg);
smsg = NULL;
}
@ -1190,7 +1205,7 @@ int StreamTcpReassembleHandleSegmentUpdateACK (TcpSession *ssn, TcpStream *strea
payload_offset, smsg_offset, copy_size);
#endif
if (smsg->data.data_len == sizeof (smsg->data.data)) {
StreamMsgPutInQueue(smsg);
StreamMsgPutInQueue(ra_ctx->stream_q,smsg);
smsg = NULL;
}
@ -1229,18 +1244,18 @@ int StreamTcpReassembleHandleSegmentUpdateACK (TcpSession *ssn, TcpStream *strea
/* put the partly filled smsg in the queue to the l7 handler */
if (smsg != NULL) {
StreamMsgPutInQueue(smsg);
StreamMsgPutInQueue(ra_ctx->stream_q,smsg);
smsg = NULL;
}
return 0;
}
int StreamTcpReassembleHandleSegment(TcpSession *ssn, TcpStream *stream, Packet *p) {
int StreamTcpReassembleHandleSegment(TcpReassemblyThreadCtx *ra_ctx, TcpSession *ssn, TcpStream *stream, Packet *p) {
//printf("StreamTcpReassembleHandleSegment: ssn %p, stream %p, p %p, p->payload_len %"PRIu16"\n", ssn, stream, p, p->payload_len);
/* handle ack received */
if (StreamTcpReassembleHandleSegmentUpdateACK(ssn, stream, p) != 0) {
if (StreamTcpReassembleHandleSegmentUpdateACK(ra_ctx, ssn, stream, p) != 0) {
#ifdef DEBUG
printf("StreamTcpReassembleHandleSegment: StreamTcpReassembleHandleSegmentUpdateACK error\n");
#endif
@ -1256,6 +1271,20 @@ int StreamTcpReassembleHandleSegment(TcpSession *ssn, TcpStream *stream, Packet
}
}
/* Handle smsgs */
if (ra_ctx != NULL && ra_ctx->stream_q) {
printf("StreamTcpReassembleHandleSegment: ra_ctx %p, %u\n", ra_ctx, ra_ctx->stream_q->len);
StreamMsg *smsg = NULL;
while (ra_ctx->stream_q->len > 0) {
smsg = StreamMsgGetFromQueue(ra_ctx->stream_q);
if (smsg != NULL) {
printf("ra_ctx %p, smsg %p, %u\n", ra_ctx, smsg, ra_ctx->stream_q->len);
AppLayerHandleMsg(smsg);
}
}
}
return 0;
}
@ -1430,6 +1459,7 @@ static int StreamTcpReassembleStreamTest(TcpStream *stream) {
Flow f;
uint8_t payload[4];
TCPHdr tcph;
TcpReassemblyThreadCtx *ra_ctx = StreamTcpReassembleInitThreadCtx();
/* prevent L7 from kicking in */
StreamMsgQueueSetMinInitChunkLen(FLOW_PKT_TOSERVER, 4096);
@ -1455,126 +1485,126 @@ static int StreamTcpReassembleStreamTest(TcpStream *stream) {
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 3;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
StreamTcpCreateTestPacket(payload, 0x42, 2); /*BB*/
p.tcph->th_seq = htonl(16);
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 2;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
StreamTcpCreateTestPacket(payload, 0x43, 3); /*CCC*/
p.tcph->th_seq = htonl(18);
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 3;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
StreamTcpCreateTestPacket(payload, 0x44, 1); /*D*/
p.tcph->th_seq = htonl(22);
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 1;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
StreamTcpCreateTestPacket(payload, 0x45, 2); /*EE*/
p.tcph->th_seq = htonl(25);
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 2;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
StreamTcpCreateTestPacket(payload, 0x46, 3); /*FFF*/
p.tcph->th_seq = htonl(27);
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 3;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
StreamTcpCreateTestPacket(payload, 0x47, 2); /*GG*/
p.tcph->th_seq = htonl(30);
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 2;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
StreamTcpCreateTestPacket(payload, 0x48, 2); /*HH*/
p.tcph->th_seq = htonl(32);
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 2;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
StreamTcpCreateTestPacket(payload, 0x49, 1); /*I*/
p.tcph->th_seq = htonl(34);
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 1;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
StreamTcpCreateTestPacket(payload, 0x4a, 4); /*JJJJ*/
p.tcph->th_seq = htonl(13);
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 4;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
StreamTcpCreateTestPacket(payload, 0x4b, 3); /*KKK*/
p.tcph->th_seq = htonl(18);
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 3;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
StreamTcpCreateTestPacket(payload, 0x4c, 3); /*LLL*/
p.tcph->th_seq = htonl(21);
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 3;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
StreamTcpCreateTestPacket(payload, 0x4d, 3); /*MMM*/
p.tcph->th_seq = htonl(24);
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 3;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
StreamTcpCreateTestPacket(payload, 0x4e, 1); /*N*/
p.tcph->th_seq = htonl(28);
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 1;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
StreamTcpCreateTestPacket(payload, 0x4f, 1); /*O*/
p.tcph->th_seq = htonl(31);
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 1;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
StreamTcpCreateTestPacket(payload, 0x50, 1); /*P*/
p.tcph->th_seq = htonl(32);
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 1;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
StreamTcpCreateTestPacket(payload, 0x51, 2); /*QQ*/
p.tcph->th_seq = htonl(34);
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 2;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
StreamTcpCreateTestPacket(payload, 0x30, 1); /*0*/
p.tcph->th_seq = htonl(11);
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 1;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
return 1;
@ -1705,6 +1735,7 @@ static int StreamTcpTestStartsBeforeListSegment(TcpStream *stream) {
Flow f;
uint8_t payload[4];
TCPHdr tcph;
TcpReassemblyThreadCtx *ra_ctx = StreamTcpReassembleInitThreadCtx();
/* prevent L7 from kicking in */
StreamMsgQueueSetMinInitChunkLen(FLOW_PKT_TOSERVER, 4096);
@ -1731,49 +1762,49 @@ static int StreamTcpTestStartsBeforeListSegment(TcpStream *stream) {
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 1;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
StreamTcpCreateTestPacket(payload, 0x44, 1); /*D*/
p.tcph->th_seq = htonl(22);
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 1;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
StreamTcpCreateTestPacket(payload, 0x45, 2); /*EE*/
p.tcph->th_seq = htonl(25);
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 2;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
StreamTcpCreateTestPacket(payload, 0x41, 2); /*AA*/
p.tcph->th_seq = htonl(15);
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 2;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
StreamTcpCreateTestPacket(payload, 0x4a, 4); /*JJJJ*/
p.tcph->th_seq = htonl(14);
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 4;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
StreamTcpCreateTestPacket(payload, 0x4c, 3); /*LLL*/
p.tcph->th_seq = htonl(21);
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 3;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
StreamTcpCreateTestPacket(payload, 0x4d, 3); /*MMM*/
p.tcph->th_seq = htonl(24);
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 3;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
return 1;
@ -1792,6 +1823,7 @@ static int StreamTcpTestStartsAtSameListSegment(TcpStream *stream) {
Flow f;
uint8_t payload[4];
TCPHdr tcph;
TcpReassemblyThreadCtx *ra_ctx = StreamTcpReassembleInitThreadCtx();
/* prevent L7 from kicking in */
StreamMsgQueueSetMinInitChunkLen(FLOW_PKT_TOSERVER, 4096);
@ -1818,49 +1850,49 @@ static int StreamTcpTestStartsAtSameListSegment(TcpStream *stream) {
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 3;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
StreamTcpCreateTestPacket(payload, 0x48, 2); /*HH*/
p.tcph->th_seq = htonl(32);
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 2;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
StreamTcpCreateTestPacket(payload, 0x49, 1); /*I*/
p.tcph->th_seq = htonl(34);
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 1;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
StreamTcpCreateTestPacket(payload, 0x4b, 3); /*KKK*/
p.tcph->th_seq = htonl(18);
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 3;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
StreamTcpCreateTestPacket(payload, 0x4c, 4); /*LLLL*/
p.tcph->th_seq = htonl(18);
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 4;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
StreamTcpCreateTestPacket(payload, 0x50, 1); /*P*/
p.tcph->th_seq = htonl(32);
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 1;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
StreamTcpCreateTestPacket(payload, 0x51, 2); /*QQ*/
p.tcph->th_seq = htonl(34);
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 2;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
return 1;
@ -1880,6 +1912,7 @@ static int StreamTcpTestStartsAfterListSegment(TcpStream *stream) {
Flow f;
uint8_t payload[4];
TCPHdr tcph;
TcpReassemblyThreadCtx *ra_ctx = StreamTcpReassembleInitThreadCtx();
/* prevent L7 from kicking in */
StreamMsgQueueSetMinInitChunkLen(FLOW_PKT_TOSERVER, 4096);
@ -1906,42 +1939,42 @@ static int StreamTcpTestStartsAfterListSegment(TcpStream *stream) {
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 2;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
StreamTcpCreateTestPacket(payload, 0x46, 3); /*FFF*/
p.tcph->th_seq = htonl(27);
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 3;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
StreamTcpCreateTestPacket(payload, 0x47, 2); /*GG*/
p.tcph->th_seq = htonl(30);
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 2;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
StreamTcpCreateTestPacket(payload, 0x4a, 2); /*JJ*/
p.tcph->th_seq = htonl(13);
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 2;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
StreamTcpCreateTestPacket(payload, 0x4f, 1); /*O*/
p.tcph->th_seq = htonl(31);
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 1;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
StreamTcpCreateTestPacket(payload, 0x4e, 1); /*N*/
p.tcph->th_seq = htonl(28);
p.tcph->th_ack = htonl(31);
p.payload = payload;
p.payload_len = 1;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return 0;
return 1;
@ -2568,6 +2601,7 @@ static int StreamTcpTestMissedPacket (TcpStream *stream, uint32_t seq, uint32_t
Address src;
Address dst;
struct in_addr in;
TcpReassemblyThreadCtx *ra_ctx = StreamTcpReassembleInitThreadCtx();
memset(&ssn, 0, sizeof (TcpSession));
memset(&p, 0, sizeof (Packet));
@ -2603,7 +2637,7 @@ static int StreamTcpTestMissedPacket (TcpStream *stream, uint32_t seq, uint32_t
p.payload_len = len;
ssn.state = state;
if (StreamTcpReassembleHandleSegment(&ssn, stream, &p) == -1)
if (StreamTcpReassembleHandleSegment(ra_ctx,&ssn, stream, &p) == -1)
return -1;
return 0;

@ -8,6 +8,8 @@
#ifndef __STREAM_TCP_REASSEMBLE_H__
#define __STREAM_TCP_REASSEMBLE_H__
#include "stream.h"
/** Supported OS list and default OS policy is BSD */
enum
{
@ -27,12 +29,19 @@ enum
OS_POLICY_LAST
};
typedef struct TcpReassemblyThreadCtx_ {
StreamMsgQueue *stream_q;
} TcpReassemblyThreadCtx;
#define OS_POLICY_DEFAULT OS_POLICY_BSD
int StreamTcpReassembleHandleSegment(TcpSession *, TcpStream *, Packet *);
int StreamTcpReassembleHandleSegment(TcpReassemblyThreadCtx *, TcpSession *, TcpStream *, Packet *);
int StreamTcpReassembleInit(char);
void StreamTcpReassembleFree(char);
void StreamTcpReassembleRegisterTests(void);
TcpReassemblyThreadCtx *StreamTcpReassembleInitThreadCtx(void);
void StreamTcpCreateTestPacket(u_int8_t *, u_int8_t, u_int8_t);
void StreamL7DataPtrInit(TcpSession *ssn, uint8_t cnt);

@ -25,12 +25,20 @@
//#define DEBUG
typedef struct StreamTcpThread_ {
uint64_t pkts;
uint16_t counter_tcp_sessions;
TcpReassemblyThreadCtx *ra_ctx;
} StreamTcpThread;
int StreamTcp (ThreadVars *, Packet *, void *, PacketQueue *);
int StreamTcpThreadInit(ThreadVars *, void *, void **);
int StreamTcpThreadDeinit(ThreadVars *, void *);
void StreamTcpExitPrintStats(ThreadVars *, void *);
static int ValidReset(TcpSession * , Packet *);
static int StreamTcpHandleFin(TcpSession *, Packet *);
static int StreamTcpHandleFin(StreamTcpThread *, TcpSession *, Packet *);
void StreamTcpRegisterTests (void);
void StreamTcpReturnStreamSegments (TcpStream *);
void StreamTcpInitConfig(char);
@ -57,12 +65,6 @@ static uint64_t ssn_pool_cnt;
static pthread_mutex_t ssn_pool_cnt_mutex;
#endif
typedef struct StreamTcpThread_ {
uint64_t pkts;
uint16_t counter_tcp_sessions;
} StreamTcpThread;
void TmModuleStreamTcpRegister (void) {
tmm_modules[TMM_STREAMTCP].name = "StreamTcp";
tmm_modules[TMM_STREAMTCP].ThreadInit = StreamTcpThreadInit;
@ -452,7 +454,7 @@ static int StreamTcpPacketStateNone(ThreadVars *tv, Packet *p, StreamTcpThread *
ssn->client.last_ts = 0;
}
StreamTcpReassembleHandleSegment(ssn, &ssn->client, p);
StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->client, p);
break;
case TH_RST:
case TH_RST|TH_ACK:
@ -674,7 +676,7 @@ static int StreamTcpPacketStateSynRecv(ThreadVars *tv, Packet *p, StreamTcpThrea
return -1;
}
if((StreamTcpHandleFin(ssn, p)) == -1)
if((StreamTcpHandleFin(stt, ssn, p)) == -1)
return -1;
break;
default:
@ -759,7 +761,7 @@ static int StreamTcpPacketStateEstablished(ThreadVars *tv, Packet *p, StreamTcpT
#endif
}
StreamTcpReassembleHandleSegment(ssn, &ssn->client, p);
StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->client, p);
} else {
//#define DEBUG
#ifdef DEBUG
@ -826,7 +828,7 @@ static int StreamTcpPacketStateEstablished(ThreadVars *tv, Packet *p, StreamTcpT
#endif
}
StreamTcpReassembleHandleSegment(ssn, &ssn->server, p);
StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->server, p);
} else {
#ifdef DEBUG
printf("StreamTcpPacketStateEstablished (%p): client => SEQ out of window, packet SEQ %" PRIu32 ", payload size %" PRIu32 " (%" PRIu32 "), ssn->server.last_ack %" PRIu32 ", ssn->server.next_win %" PRIu32 "(%"PRIu32") (ssn->server.ra_base_seq %"PRIu32")\n", ssn, TCP_GET_SEQ(p), p->payload_len, TCP_GET_SEQ(p) + p->payload_len, ssn->server.last_ack, ssn->server.next_win, TCP_GET_SEQ(p) + p->payload_len - ssn->server.next_win, ssn->server.ra_base_seq);
@ -856,7 +858,7 @@ static int StreamTcpPacketStateEstablished(ThreadVars *tv, Packet *p, StreamTcpT
printf("StreamTcpPacketStateEstablished (%p): FIN received SEQ %" PRIu32 ", last ACK %" PRIu32 ", next win %" PRIu32 ", win %" PRIu32 "\n",
ssn, ssn->server.next_seq, ssn->client.last_ack, ssn->server.next_win, ssn->server.window);
#endif
if((StreamTcpHandleFin(ssn, p)) == -1)
if((StreamTcpHandleFin(stt, ssn, p)) == -1)
return -1;
break;
case TH_RST:
@ -878,7 +880,7 @@ static int StreamTcpPacketStateEstablished(ThreadVars *tv, Packet *p, StreamTcpT
if (SEQ_GT(TCP_GET_ACK(p),ssn->server.last_ack))
ssn->server.last_ack = TCP_GET_ACK(p);
StreamTcpReassembleHandleSegment(ssn, &ssn->client, p);
StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->client, p);
#ifdef DEBUG
printf("StreamTcpPacketStateEstablished (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n",
ssn, ssn->client.next_seq, ssn->server.last_ack);
@ -899,7 +901,7 @@ static int StreamTcpPacketStateEstablished(ThreadVars *tv, Packet *p, StreamTcpT
if (SEQ_GT(TCP_GET_ACK(p),ssn->client.last_ack))
ssn->client.last_ack = TCP_GET_ACK(p);
StreamTcpReassembleHandleSegment(ssn, &ssn->server, p);
StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->server, p);
#ifdef DEBUG
printf("StreamTcpPacketStateEstablished (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n",
ssn, ssn->server.next_seq, ssn->client.last_ack);
@ -926,7 +928,7 @@ static int StreamTcpPacketStateEstablished(ThreadVars *tv, Packet *p, StreamTcpT
* \param stt Strean Thread module registered to handle the stream handling
*/
static int StreamTcpHandleFin(TcpSession *ssn, Packet *p) {
static int StreamTcpHandleFin(StreamTcpThread *stt, TcpSession *ssn, Packet *p) {
if (PKT_IS_TOSERVER(p)) {
#ifdef DEBUG
@ -956,7 +958,7 @@ static int StreamTcpHandleFin(TcpSession *ssn, Packet *p) {
if (SEQ_GT(TCP_GET_ACK(p),ssn->server.last_ack))
ssn->server.last_ack = TCP_GET_ACK(p);
StreamTcpReassembleHandleSegment(ssn, &ssn->client, p);
StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->client, p);
#ifdef DEBUG
printf("StreamTcpHandleFin (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n",
@ -989,7 +991,7 @@ static int StreamTcpHandleFin(TcpSession *ssn, Packet *p) {
if (SEQ_GT(TCP_GET_ACK(p),ssn->client.last_ack))
ssn->client.last_ack = TCP_GET_ACK(p);
StreamTcpReassembleHandleSegment(ssn, &ssn->server, p);
StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->server, p);
#ifdef DEBUG
printf("StreamTcpHandleFin (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n",
@ -1034,7 +1036,7 @@ static int StreamTcpPacketStateFinWait1(ThreadVars *tv, Packet *p, StreamTcpThre
if (SEQ_GT(TCP_GET_ACK(p),ssn->server.last_ack))
ssn->server.last_ack = TCP_GET_ACK(p);
StreamTcpReassembleHandleSegment(ssn, &ssn->client, p);
StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->client, p);
#ifdef DEBUG
printf("StreamTcpPacketStateFinWait1 (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n",
ssn, ssn->client.next_seq, ssn->server.last_ack);
@ -1051,7 +1053,7 @@ static int StreamTcpPacketStateFinWait1(ThreadVars *tv, Packet *p, StreamTcpThre
if (SEQ_GT(TCP_GET_ACK(p),ssn->client.last_ack))
ssn->client.last_ack = TCP_GET_ACK(p);
StreamTcpReassembleHandleSegment(ssn, &ssn->server, p);
StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->server, p);
#ifdef DEBUG
printf("StreamTcpPacketStateFinWait1 (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n",
ssn, ssn->server.next_seq, ssn->client.last_ack);
@ -1089,7 +1091,7 @@ static int StreamTcpPacketStateFinWait1(ThreadVars *tv, Packet *p, StreamTcpThre
if (SEQ_GT(TCP_GET_ACK(p),ssn->server.last_ack))
ssn->server.last_ack = TCP_GET_ACK(p);
StreamTcpReassembleHandleSegment(ssn, &ssn->client, p);
StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->client, p);
#ifdef DEBUG
printf("StreamTcpPacketStateFinWait1 (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n",
ssn, ssn->client.next_seq, ssn->server.last_ack);
@ -1116,7 +1118,7 @@ static int StreamTcpPacketStateFinWait1(ThreadVars *tv, Packet *p, StreamTcpThre
if (SEQ_GT(TCP_GET_ACK(p),ssn->client.last_ack))
ssn->client.last_ack = TCP_GET_ACK(p);
StreamTcpReassembleHandleSegment(ssn, &ssn->server, p);
StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->server, p);
#ifdef DEBUG
printf("StreamTcpPacketStateFinWait1 (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n",
ssn, ssn->server.next_seq, ssn->client.last_ack);
@ -1188,7 +1190,7 @@ static int StreamTcpPacketStateFinWait2(ThreadVars *tv, Packet *p, StreamTcpThre
if (SEQ_GT(TCP_GET_ACK(p),ssn->server.last_ack))
ssn->server.last_ack = TCP_GET_ACK(p);
StreamTcpReassembleHandleSegment(ssn, &ssn->client, p);
StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->client, p);
#ifdef DEBUG
printf("StreamTcpPacketStateFinWait2 (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n",
ssn, ssn->client.next_seq, ssn->server.last_ack);
@ -1214,7 +1216,7 @@ static int StreamTcpPacketStateFinWait2(ThreadVars *tv, Packet *p, StreamTcpThre
if (SEQ_GT(TCP_GET_ACK(p),ssn->client.last_ack))
ssn->client.last_ack = TCP_GET_ACK(p);
StreamTcpReassembleHandleSegment(ssn, &ssn->server, p);
StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->server, p);
#ifdef DEBUG
printf("StreamTcpPacketStateFinWait2 (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n",
ssn, ssn->server.next_seq, ssn->client.last_ack);
@ -1260,7 +1262,7 @@ static int StreamTcpPacketStateFinWait2(ThreadVars *tv, Packet *p, StreamTcpThre
if (SEQ_GT(TCP_GET_ACK(p),ssn->server.last_ack))
ssn->server.last_ack = TCP_GET_ACK(p);
StreamTcpReassembleHandleSegment(ssn, &ssn->client, p);
StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->client, p);
#ifdef DEBUG
printf("StreamTcpPacketStateFinWait2 (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n",
@ -1288,7 +1290,7 @@ static int StreamTcpPacketStateFinWait2(ThreadVars *tv, Packet *p, StreamTcpThre
if (SEQ_GT(TCP_GET_ACK(p),ssn->client.last_ack))
ssn->client.last_ack = TCP_GET_ACK(p);
StreamTcpReassembleHandleSegment(ssn, &ssn->server, p);
StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->server, p);
#ifdef DEBUG
printf("StreamTcpPacketStateFinWait2 (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n",
ssn, ssn->server.next_seq, ssn->client.last_ack);
@ -1350,7 +1352,7 @@ static int StreamTcpPacketStateClosing(ThreadVars *tv, Packet *p, StreamTcpThrea
if (SEQ_GT(TCP_GET_ACK(p),ssn->server.last_ack))
ssn->server.last_ack = TCP_GET_ACK(p);
StreamTcpReassembleHandleSegment(ssn, &ssn->client, p);
StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->client, p);
#ifdef DEBUG
printf("StreamTcpPacketStateClosing (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n",
ssn, ssn->client.next_seq, ssn->server.last_ack);
@ -1378,7 +1380,7 @@ static int StreamTcpPacketStateClosing(ThreadVars *tv, Packet *p, StreamTcpThrea
if (SEQ_GT(TCP_GET_ACK(p),ssn->client.last_ack))
ssn->client.last_ack = TCP_GET_ACK(p);
StreamTcpReassembleHandleSegment(ssn, &ssn->server, p);
StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->server, p);
#ifdef DEBUG
printf("StreamTcpPacketStateClosing (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n",
ssn, ssn->server.next_seq, ssn->client.last_ack);
@ -1439,7 +1441,7 @@ static int StreamTcpPacketStateCloseWait(ThreadVars *tv, Packet *p, StreamTcpThr
if (SEQ_GT(TCP_GET_ACK(p),ssn->client.last_ack))
ssn->client.last_ack = TCP_GET_ACK(p);
StreamTcpReassembleHandleSegment(ssn, &ssn->server, p);
StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->server, p);
#ifdef DEBUG
printf("StreamTcpPacketStateCloseWait (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n",
ssn, ssn->server.next_seq, ssn->client.last_ack);
@ -1498,7 +1500,7 @@ static int StreamTcpPakcetStateLastAck(ThreadVars *tv, Packet *p, StreamTcpThrea
if (SEQ_GT(TCP_GET_ACK(p),ssn->server.last_ack))
ssn->server.last_ack = TCP_GET_ACK(p);
StreamTcpReassembleHandleSegment(ssn, &ssn->client, p);
StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->client, p);
#ifdef DEBUG
printf("StreamTcpPacketStateLastAck (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n",
ssn, ssn->client.next_seq, ssn->server.last_ack);
@ -1558,7 +1560,7 @@ static int StreamTcpPacketStateTimeWait(ThreadVars *tv, Packet *p, StreamTcpThre
if (SEQ_GT(TCP_GET_ACK(p),ssn->server.last_ack))
ssn->server.last_ack = TCP_GET_ACK(p);
StreamTcpReassembleHandleSegment(ssn, &ssn->client, p);
StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->client, p);
#ifdef DEBUG
printf("StreamTcpPacketStateTimeWait (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n",
ssn, ssn->client.next_seq, ssn->server.last_ack);
@ -1584,7 +1586,7 @@ static int StreamTcpPacketStateTimeWait(ThreadVars *tv, Packet *p, StreamTcpThre
if (SEQ_GT(TCP_GET_ACK(p),ssn->client.last_ack))
ssn->client.last_ack = TCP_GET_ACK(p);
StreamTcpReassembleHandleSegment(ssn, &ssn->server, p);
StreamTcpReassembleHandleSegment(stt->ra_ctx, ssn, &ssn->server, p);
#ifdef DEBUG
printf("StreamTcpPacketStateTimeWait (%p): =+ next SEQ %" PRIu32 ", last ACK %" PRIu32 "\n",
ssn, ssn->server.next_seq, ssn->client.last_ack);
@ -1696,14 +1698,19 @@ int StreamTcpThreadInit(ThreadVars *tv, void *initdata, void **data)
}
memset(stt, 0, sizeof(StreamTcpThread));
/* XXX */
*data = (void *)stt;
stt->counter_tcp_sessions = PerfTVRegisterCounter("tcp.sessions", tv, TYPE_UINT64, "NULL");
tv->pca = PerfGetAllCountersArray(&tv->pctx);
PerfAddToClubbedTMTable(tv->name, &tv->pctx);
/* init reassembly ctx */
stt->ra_ctx = StreamTcpReassembleInitThreadCtx();
if (stt->ra_ctx == NULL)
return -1;
#ifdef DEBUG
printf("StreamTcp thread specific ctx online at %p, reassembly ctx %p\n", stt, stt->ra_ctx);
#endif
return 0;
}
@ -1716,6 +1723,9 @@ int StreamTcpThreadDeinit(ThreadVars *tv, void *data)
/* XXX */
/* free reassembly ctx */
/* clear memory */
memset(stt, 0, sizeof(StreamTcpThread));

@ -130,10 +130,8 @@ StreamMsg *StreamMsgGetFromQueue(StreamMsgQueue *q)
}
/* Used by stream reassembler to fill the queue for l7inspect reading */
void StreamMsgPutInQueue(StreamMsg *s)
void StreamMsgPutInQueue(StreamMsgQueue *q, StreamMsg *s)
{
StreamMsgQueue *q = &stream_q;
mutex_lock(&q->mutex_q);
StreamMsgEnqueue(q, s);
#ifdef DEBUG
@ -160,6 +158,18 @@ void StreamMsgQueuesDeinit(char quiet) {
printf("StreamMsgQueuesDeinit: stream_pool_memuse %"PRIu64", stream_pool_memcnt %"PRIu64"\n", stream_pool_memuse, stream_pool_memcnt);
}
/** \brief alloc a stream msg queue
* \retval smq ptr to the queue or NULL */
StreamMsgQueue *StreamMsgQueueGetNew(void) {
StreamMsgQueue *smq = malloc(sizeof(StreamMsgQueue));
if (smq == NULL) {
return NULL;
}
memset(smq, 0x00, sizeof(StreamMsgQueue));
return smq;
}
StreamMsgQueue *StreamMsgQueueGetByPort(uint16_t port) {
/* XXX implement this */
return &stream_q;

@ -54,8 +54,9 @@ void StreamMsgQueuesDeinit(char);
StreamMsg *StreamMsgGetFromPool(void);
void StreamMsgReturnToPool(StreamMsg *);
StreamMsg *StreamMsgGetFromQueue(StreamMsgQueue *);
void StreamMsgPutInQueue(StreamMsg *);
void StreamMsgPutInQueue(StreamMsgQueue *, StreamMsg *);
StreamMsgQueue *StreamMsgQueueGetNew(void);
StreamMsgQueue *StreamMsgQueueGetByPort(uint16_t);
void StreamMsgQueueSetMinInitChunkLen(uint8_t, uint16_t);

Loading…
Cancel
Save