Improve RawInline reassembly: remove unnecessary segments from the stream in an earlier stage. Test this properly.

remotes/origin/master-1.1.x
Victor Julien 14 years ago
parent d00c6172c9
commit 4c82c0e750

@ -1785,6 +1785,16 @@ static void StreamTcpRemoveSegmentFromStream(TcpStream *stream, TcpSegment *seg)
stream->seg_list_tail = seg->prev;
}
/**
* \brief see if app layer is done with a segment
*
* \retval 1 app layer is done with this segment
* \retval 0 not done yet
*/
#define StreamTcpAppLayerSegmentProcessed(stream, segment) \
(((stream)->flags & STREAMTCP_STREAM_FLAG_GAP || \
(segment)->flags & SEGMENTTCP_FLAG_APPLAYER_PROCESSED) ? 1 :0)
/**
* \brief Update the stream reassembly upon receiving a data segment
*
@ -1903,7 +1913,7 @@ static int StreamTcpReassembleInlineAppLayer (TcpReassemblyThreadCtx *ra_ctx,
which has been previously processed and reassembled */
if ((ssn->flags & STREAMTCP_FLAG_APPPROTO_DETECTION_COMPLETED) &&
(seg->flags & SEGMENTTCP_FLAG_RAW_PROCESSED) &&
(seg->flags & SEGMENTTCP_FLAG_APPLAYER_PROCESSED))
StreamTcpAppLayerSegmentProcessed(stream, seg))
{
SCLogDebug("segment(%p) of length %"PRIu16" has been processed,"
" so return it to pool", seg, seg->payload_len);
@ -1931,7 +1941,7 @@ static int StreamTcpReassembleInlineAppLayer (TcpReassemblyThreadCtx *ra_ctx,
if ((SEQ_LEQ((seg->seq + seg->payload_len), (ra_base_seq+1)) ||
SEQ_LEQ(stream->last_ack, (ra_base_seq + (ra_base_seq - seg->seq)))) &&
(seg->flags & SEGMENTTCP_FLAG_RAW_PROCESSED) &&
(seg->flags & SEGMENTTCP_FLAG_APPLAYER_PROCESSED))
StreamTcpAppLayerSegmentProcessed(stream, seg))
{
SCLogDebug("removing pre ra_base_seq %"PRIu32" seg %p seq %"PRIu32
" len %"PRIu16"", ra_base_seq, seg, seg->seq, seg->payload_len);
@ -2027,7 +2037,7 @@ static int StreamTcpReassembleInlineAppLayer (TcpReassemblyThreadCtx *ra_ctx,
}
/* if the segment ends beyond ra_base_seq we need to consider it */
if (SEQ_GT((seg->seq + seg->payload_len), ra_base_seq)) {
if (SEQ_GT((seg->seq + seg->payload_len), (ra_base_seq + 1))) {
SCLogDebug("seg->seq %" PRIu32 ", seg->payload_len %" PRIu32 ", "
"ra_base_seq %" PRIu32 "", seg->seq,
seg->payload_len, ra_base_seq);
@ -2279,6 +2289,7 @@ static int StreamTcpReassembleInlineRaw (TcpReassemblyThreadCtx *ra_ctx,
uint16_t payload_len = 0;
TcpSegment *seg = stream->seg_list;
uint32_t next_seq = ra_base_seq + 1;
int gap = 0;
/* determine the left edge and right edge */
uint32_t right_edge = TCP_GET_SEQ(p) + p->payload_len;
@ -2312,9 +2323,7 @@ static int StreamTcpReassembleInlineRaw (TcpReassemblyThreadCtx *ra_ctx,
seg->payload_len);
/* only remove if app layer reassembly is ready too */
if (seg->flags & SEGMENTTCP_FLAG_APPLAYER_PROCESSED ||
stream->flags & STREAMTCP_STREAM_FLAG_GAP)
{
if (StreamTcpAppLayerSegmentProcessed(stream, seg)) {
TcpSegment *next_seg = seg->next;
StreamTcpRemoveSegmentFromStream(stream, seg);
StreamTcpSegmentReturntoPool(seg);
@ -2333,8 +2342,7 @@ static int StreamTcpReassembleInlineRaw (TcpReassemblyThreadCtx *ra_ctx,
* If the stream is in GAP state the app layer flag won't be set */
if ((ssn->flags & STREAMTCP_FLAG_APPPROTO_DETECTION_COMPLETED) &&
(seg->flags & SEGMENTTCP_FLAG_RAW_PROCESSED) &&
(seg->flags & SEGMENTTCP_FLAG_APPLAYER_PROCESSED ||
stream->flags & STREAMTCP_STREAM_FLAG_GAP))
StreamTcpAppLayerSegmentProcessed(stream, seg))
{
SCLogDebug("segment(%p) of length %"PRIu16" has been processed,"
" so return it to pool", seg, seg->payload_len);
@ -2354,6 +2362,8 @@ static int StreamTcpReassembleInlineRaw (TcpReassemblyThreadCtx *ra_ctx,
stream->ra_raw_base_seq = ra_base_seq;
smsg = NULL;
}
gap = 1;
}
/* if the segment ends beyond left_edge we need to consider it */
@ -2419,7 +2429,13 @@ static int StreamTcpReassembleInlineRaw (TcpReassemblyThreadCtx *ra_ctx,
memcpy(smsg->data.data + smsg_offset, seg->payload + payload_offset,
copy_size);
smsg_offset += copy_size;
ra_base_seq += copy_size;
SCLogDebug("seg total %u, seq %u off %u copy %u, ra_base_seq %u",
(seg->seq + payload_offset + copy_size), seg->seq,
payload_offset, copy_size, ra_base_seq);
if (gap == 0 && SEQ_GT((seg->seq + payload_offset + copy_size),ra_base_seq+1)) {
ra_base_seq += copy_size;
}
SCLogDebug("ra_base_seq %"PRIu32, ra_base_seq);
smsg->data.data_len += copy_size;
@ -2478,7 +2494,9 @@ static int StreamTcpReassembleInlineRaw (TcpReassemblyThreadCtx *ra_ctx,
memcpy(smsg->data.data + smsg_offset, seg->payload +
payload_offset, copy_size);
smsg_offset += copy_size;
ra_base_seq += copy_size;
if (gap == 0 && SEQ_GT((seg->seq + payload_offset + copy_size),ra_base_seq+1)) {
ra_base_seq += copy_size;
}
SCLogDebug("ra_base_seq %"PRIu32, ra_base_seq);
smsg->data.data_len += copy_size;
SCLogDebug("copied payload_offset %" PRIu32 ", "
@ -2531,6 +2549,24 @@ static int StreamTcpReassembleInlineRaw (TcpReassemblyThreadCtx *ra_ctx,
stream->ra_raw_base_seq = ra_base_seq;
}
/* see if we can clean up some segments */
left_edge = (ra_base_seq + 1) - stream_config.reassembly_inline_window;
SCLogDebug("left_edge %"PRIu32", ra_base_seq %"PRIu32, left_edge, ra_base_seq);
/* loop through the segments to remove unneeded segments */
for (seg = stream->seg_list; seg != NULL && SEQ_LEQ((seg->seq + p->payload_len), left_edge); ) {
SCLogDebug("seg %p seq %"PRIu32", len %"PRIu16", sum %"PRIu32, seg, seg->seq, seg->payload_len, seg->seq+seg->payload_len);
/* only remove if app layer reassembly is ready too */
if (StreamTcpAppLayerSegmentProcessed(stream, seg)) {
TcpSegment *next_seg = seg->next;
StreamTcpRemoveSegmentFromStream(stream, seg);
StreamTcpSegmentReturntoPool(seg);
seg = next_seg;
} else {
break;
}
}
SCReturnInt(0);
}
@ -8470,6 +8506,135 @@ end:
return ret;
}
/** \test 3 in order segments, then reassemble, add one more and reassemble again.
* test the sliding window reassembly with a small window size so that we
* cutting off at the start (left edge). Test if the first segment is
* removed from the list.
*/
static int StreamTcpReassembleInlineTest08(void) {
int ret = 0;
TcpReassemblyThreadCtx *ra_ctx = NULL;
ThreadVars tv;
TcpSession ssn;
Flow f;
memset(&tv, 0x00, sizeof(tv));
StreamTcpUTInit(&ra_ctx);
StreamTcpUTSetupSession(&ssn);
StreamTcpUTSetupStream(&ssn.client, 1);
FLOW_INITIALIZE(&f);
stream_config.reassembly_inline_window = 15;
ssn.client.flags |= STREAMTCP_STREAM_FLAG_GAP;
uint8_t stream_payload1[] = "AAAAABBBBBCCCCC";
uint8_t stream_payload2[] = "BBBBBCCCCCDDDDD";
uint8_t payload[] = { 'C', 'C', 'C', 'C', 'C' };
Packet *p = UTHBuildPacketReal(payload, 5, IPPROTO_TCP, "1.1.1.1", "2.2.2.2", 1024, 80);
if (p == NULL) {
printf("couldn't get a packet: ");
goto end;
}
p->tcph->th_seq = htonl(12);
p->flow = &f;
if (StreamTcpUTAddSegmentWithByte(&tv, ra_ctx, &ssn.client, 2, 'A', 5) == -1) {
printf("failed to add segment 1: ");
goto end;
}
if (StreamTcpUTAddSegmentWithByte(&tv, ra_ctx, &ssn.client, 7, 'B', 5) == -1) {
printf("failed to add segment 2: ");
goto end;
}
if (StreamTcpUTAddSegmentWithByte(&tv, ra_ctx, &ssn.client, 12, 'C', 5) == -1) {
printf("failed to add segment 3: ");
goto end;
}
ssn.client.next_seq = 17;
int r = StreamTcpReassembleInlineRaw(ra_ctx, &ssn, &ssn.client, p);
if (r < 0) {
printf("StreamTcpReassembleInlineRaw failed: ");
goto end;
}
if (ra_ctx->stream_q->len != 1) {
printf("expected a single stream message, got %u: ", ra_ctx->stream_q->len);
goto end;
}
StreamMsg *smsg = ra_ctx->stream_q->top;
if (smsg->data.data_len != 15) {
printf("expected data length to be 15, got %u: ", smsg->data.data_len);
goto end;
}
if (!(memcmp(stream_payload1, smsg->data.data, 15) == 0)) {
printf("data is not what we expected:\nExpected:\n");
PrintRawDataFp(stdout, stream_payload1, 15);
printf("Got:\n");
PrintRawDataFp(stdout, smsg->data.data, smsg->data.data_len);
goto end;
}
if (ssn.client.ra_raw_base_seq != 16) {
printf("ra_raw_base_seq %"PRIu32", expected 16: ", ssn.client.ra_raw_base_seq);
goto end;
}
if (StreamTcpUTAddSegmentWithByte(&tv, ra_ctx, &ssn.client, 17, 'D', 5) == -1) {
printf("failed to add segment 4: ");
goto end;
}
ssn.client.next_seq = 22;
p->tcph->th_seq = htonl(17);
r = StreamTcpReassembleInlineRaw(ra_ctx, &ssn, &ssn.client, p);
if (r < 0) {
printf("StreamTcpReassembleInlineRaw failed 2: ");
goto end;
}
if (ra_ctx->stream_q->len != 2) {
printf("expected a single stream message, got %u: ", ra_ctx->stream_q->len);
goto end;
}
smsg = ra_ctx->stream_q->top;
if (smsg->data.data_len != 15) {
printf("expected data length to be 15, got %u: ", smsg->data.data_len);
goto end;
}
if (!(memcmp(stream_payload2, smsg->data.data, 15) == 0)) {
printf("data is not what we expected:\nExpected:\n");
PrintRawDataFp(stdout, stream_payload2, 15);
printf("Got:\n");
PrintRawDataFp(stdout, smsg->data.data, smsg->data.data_len);
goto end;
}
if (ssn.client.ra_raw_base_seq != 21) {
printf("ra_raw_base_seq %"PRIu32", expected 21: ", ssn.client.ra_raw_base_seq);
goto end;
}
if (ssn.client.seg_list->seq != 7) {
printf("expected segment 2 (seq 7) to be first in the list, got seq %"PRIu32": ", ssn.client.seg_list->seq);
goto end;
}
ret = 1;
end:
FLOW_DESTROY(&f);
UTHFreePacket(p);
StreamTcpUTClearSession(&ssn);
StreamTcpUTDeinit(ra_ctx);
return ret;
}
#endif /* UNITTESTS */
/** \brief The Function Register the Unit tests to test the reassembly engine
@ -8526,13 +8691,14 @@ void StreamTcpReassembleRegisterTests(void) {
UtRegisterTest("StreamTcpReassembleTest46 -- Depth Test", StreamTcpReassembleTest46, 1);
UtRegisterTest("StreamTcpReassembleTest47 -- TCP Sequence Wraparound Test", StreamTcpReassembleTest47, 1);
UtRegisterTest("StreamTcpReassembleInlineTest01 -- inline RAW reassembly", StreamTcpReassembleInlineTest01, 1);
UtRegisterTest("StreamTcpReassembleInlineTest02 -- inline RAW reassembly 2", StreamTcpReassembleInlineTest02, 1);
UtRegisterTest("StreamTcpReassembleInlineTest03 -- inline RAW reassembly 3", StreamTcpReassembleInlineTest03, 1);
UtRegisterTest("StreamTcpReassembleInlineTest04 -- inline RAW reassembly 4", StreamTcpReassembleInlineTest04, 1);
UtRegisterTest("StreamTcpReassembleInlineTest05 -- inline RAW reassembly 5 GAP", StreamTcpReassembleInlineTest05, 1);
UtRegisterTest("StreamTcpReassembleInlineTest06 -- inline RAW reassembly 6 GAP", StreamTcpReassembleInlineTest06, 1);
UtRegisterTest("StreamTcpReassembleInlineTest07 -- inline RAW reassembly 7 GAP", StreamTcpReassembleInlineTest07, 1);
UtRegisterTest("StreamTcpReassembleInlineTest01 -- inline RAW ra", StreamTcpReassembleInlineTest01, 1);
UtRegisterTest("StreamTcpReassembleInlineTest02 -- inline RAW ra 2", StreamTcpReassembleInlineTest02, 1);
UtRegisterTest("StreamTcpReassembleInlineTest03 -- inline RAW ra 3", StreamTcpReassembleInlineTest03, 1);
UtRegisterTest("StreamTcpReassembleInlineTest04 -- inline RAW ra 4", StreamTcpReassembleInlineTest04, 1);
UtRegisterTest("StreamTcpReassembleInlineTest05 -- inline RAW ra 5 GAP", StreamTcpReassembleInlineTest05, 1);
UtRegisterTest("StreamTcpReassembleInlineTest06 -- inline RAW ra 6 GAP", StreamTcpReassembleInlineTest06, 1);
UtRegisterTest("StreamTcpReassembleInlineTest07 -- inline RAW ra 7 GAP", StreamTcpReassembleInlineTest07, 1);
UtRegisterTest("StreamTcpReassembleInlineTest08 -- inline RAW ra 8 cleanup", StreamTcpReassembleInlineTest08, 1);
StreamTcpInlineRegisterTests();
StreamTcpUtilRegisterTests();

@ -81,6 +81,11 @@ int StreamTcpUTAddSegmentWithByte(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx
memset(s->payload, byte, len);
Packet *p = UTHBuildPacketReal(s->payload, 5, IPPROTO_TCP, "1.1.1.1", "2.2.2.2", 1024, 80);
if (p == NULL) {
return -1;
}
p->tcph->th_seq = htonl(seq);
if (StreamTcpReassembleInsertSegment(tv, ra_ctx, stream, s, p) < 0)
return -1;
UTHFreePacket(p);

Loading…
Cancel
Save