stream: fix midstream reverse flow handling

When a TCP session is picked up from the response the flow is
reversed by the protocol detection code.

This would lead to duplicate logging of the response. The reason this
happened was that the per stream app progress tracker was not handled
correctly by the direction reversing code. While the streams were
swapped the stream engine would continue to use a now outdated pointer
to what had become the wrong direction.

This patches fixes this by making the stream a ptr to ptr that can be
updated by the protocol detection as well.

In addition, the progress tracking was cleaned up and the GAP error
handling in this case was improved as well.
pull/4028/head
Victor Julien 6 years ago
parent 2c1b923500
commit 5ddfc42b87

@ -298,7 +298,7 @@ static int TCPProtoDetectTriggerOpposingSide(ThreadVars *tv,
/** \todo data const */
static int TCPProtoDetect(ThreadVars *tv,
TcpReassemblyThreadCtx *ra_ctx, AppLayerThreadCtx *app_tctx,
Packet *p, Flow *f, TcpSession *ssn, TcpStream *stream,
Packet *p, Flow *f, TcpSession *ssn, TcpStream **stream,
uint8_t *data, uint32_t data_len, uint8_t flags)
{
AppProto *alproto;
@ -351,7 +351,7 @@ static int TCPProtoDetect(ThreadVars *tv,
f->alproto = *alproto;
}
StreamTcpSetStreamFlagAppProtoDetectionCompleted(stream);
StreamTcpSetStreamFlagAppProtoDetectionCompleted(*stream);
TcpSessionSetReassemblyDepth(ssn,
AppLayerParserGetStreamDepth(f));
FlagPacketFlow(p, f, flags);
@ -363,6 +363,11 @@ static int TCPProtoDetect(ThreadVars *tv,
PacketSwap(p);
FlowSwap(f);
SWAP_FLAGS(flags, STREAM_TOSERVER, STREAM_TOCLIENT);
if (*stream == &ssn->client) {
*stream = &ssn->server;
} else {
*stream = &ssn->client;
}
}
/* account flow if we have both sides */
@ -384,7 +389,7 @@ static int TCPProtoDetect(ThreadVars *tv,
AppProtoToString(*alproto));
if (TCPProtoDetectTriggerOpposingSide(tv, ra_ctx,
p, ssn, stream) != 0)
p, ssn, *stream) != 0)
{
DisableAppLayer(tv, f, p);
goto failure;
@ -426,7 +431,7 @@ static int TCPProtoDetect(ThreadVars *tv,
* to the app layer. */
if (first_data_dir && !(first_data_dir & flags)) {
FlowCleanupAppLayer(f);
StreamTcpResetStreamFlagAppProtoDetectionCompleted(stream);
StreamTcpResetStreamFlagAppProtoDetectionCompleted(*stream);
FLOW_RESET_PP_DONE(f, flags);
FLOW_RESET_PM_DONE(f, flags);
FLOW_RESET_PE_DONE(f, flags);
@ -444,6 +449,7 @@ static int TCPProtoDetect(ThreadVars *tv,
PACKET_PROFILING_APP_END(app_tctx, f->alproto);
if (r < 0)
goto failure;
(*stream)->app_progress_rel += data_len;
} else {
/* if the ssn is midstream, we may end up with a case where the
@ -513,6 +519,9 @@ static int TCPProtoDetect(ThreadVars *tv,
f->alproto, flags,
data, data_len);
PACKET_PROFILING_APP_END(app_tctx, f->alproto);
if (r >= 0) {
(*stream)->app_progress_rel += data_len;
}
AppLayerDecoderEventsSetEventRaw(&p->app_layer_events,
APPLAYER_DETECT_PROTOCOL_ONLY_ONE_DIRECTION);
@ -526,7 +535,7 @@ static int TCPProtoDetect(ThreadVars *tv,
goto failure;
}
*alproto = ALPROTO_FAILED;
StreamTcpSetStreamFlagAppProtoDetectionCompleted(stream);
StreamTcpSetStreamFlagAppProtoDetectionCompleted(*stream);
AppLayerIncFlowCounter(tv, f);
FlagPacketFlow(p, f, flags);
@ -547,10 +556,13 @@ failure:
*
* First run protocol detection and then when the protocol is known invoke
* the app layer parser.
*
* \param stream ptr-to-ptr to stream object. Might change if flow dir is
* reversed.
*/
int AppLayerHandleTCPData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx,
Packet *p, Flow *f,
TcpSession *ssn, TcpStream *stream,
TcpSession *ssn, TcpStream **stream,
uint8_t *data, uint32_t data_len,
uint8_t flags)
{
@ -578,17 +590,19 @@ int AppLayerHandleTCPData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx,
* app-layer if known. */
if (flags & STREAM_GAP) {
if (alproto == ALPROTO_UNKNOWN) {
StreamTcpSetStreamFlagAppProtoDetectionCompleted(stream);
StreamTcpSetStreamFlagAppProtoDetectionCompleted(*stream);
SCLogDebug("ALPROTO_UNKNOWN flow %p, due to GAP in stream start", f);
/* if the other side didn't already find the proto, we're done */
if (f->alproto == ALPROTO_UNKNOWN)
goto end;
if (f->alproto == ALPROTO_UNKNOWN) {
goto failure;
}
}
PACKET_PROFILING_APP_START(app_tctx, f->alproto);
r = AppLayerParserParse(tv, app_tctx->alp_tctx, f, f->alproto,
flags, data, data_len);
PACKET_PROFILING_APP_END(app_tctx, f->alproto);
/* ignore parser result for gap */
(*stream)->app_progress_rel += data_len;
goto end;
}
@ -646,6 +660,9 @@ int AppLayerHandleTCPData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx,
r = AppLayerParserParse(tv, app_tctx->alp_tctx, f, f->alproto,
flags, data, data_len);
PACKET_PROFILING_APP_END(app_tctx, f->alproto);
if (r >= 0) {
(*stream)->app_progress_rel += data_len;
}
}
}

@ -43,7 +43,7 @@
*/
int AppLayerHandleTCPData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx,
Packet *p, Flow *f,
TcpSession *ssn, TcpStream *stream,
TcpSession *ssn, TcpStream **stream,
uint8_t *data, uint32_t data_len,
uint8_t flags);

@ -578,7 +578,8 @@ static int DetectAppLayerEventTest03(void)
StreamTcpUTInit(&ra_ctx);
p->flowflags = FLOW_PKT_TOSERVER;
FAIL_IF(AppLayerHandleTCPData(&tv, ra_ctx, p, f, &ssn, &stream_ts, buf_ts,
TcpStream *stream = &stream_ts;
FAIL_IF(AppLayerHandleTCPData(&tv, ra_ctx, p, f, &ssn, &stream, buf_ts,
sizeof(buf_ts), STREAM_TOSERVER | STREAM_START) < 0);
SigMatchSignatures(&tv, de_ctx, det_ctx, p);
@ -586,7 +587,8 @@ static int DetectAppLayerEventTest03(void)
FAIL_IF (PacketAlertCheck(p, 1));
p->flowflags = FLOW_PKT_TOCLIENT;
FAIL_IF (AppLayerHandleTCPData(&tv, ra_ctx, p, f, &ssn, &stream_tc, buf_tc,
stream = &stream_tc;
FAIL_IF (AppLayerHandleTCPData(&tv, ra_ctx, p, f, &ssn, &stream, buf_tc,
sizeof(buf_tc), STREAM_TOCLIENT | STREAM_START) < 0);
SigMatchSignatures(&tv, de_ctx, det_ctx, p);
@ -665,13 +667,15 @@ static int DetectAppLayerEventTest04(void)
StreamTcpUTInit(&ra_ctx);
p->flowflags = FLOW_PKT_TOSERVER;
FAIL_IF(AppLayerHandleTCPData(&tv, ra_ctx, p, f, &ssn, &stream_ts, buf_ts,
TcpStream *stream = &stream_ts;
FAIL_IF(AppLayerHandleTCPData(&tv, ra_ctx, p, f, &ssn, &stream, buf_ts,
sizeof(buf_ts), STREAM_TOSERVER | STREAM_START) < 0);
SigMatchSignatures(&tv, de_ctx, det_ctx, p);
FAIL_IF (PacketAlertCheck(p, 1));
p->flowflags = FLOW_PKT_TOCLIENT;
FAIL_IF (AppLayerHandleTCPData(&tv, ra_ctx, p, f, &ssn, &stream_tc, buf_tc,
stream = &stream_tc;
FAIL_IF (AppLayerHandleTCPData(&tv, ra_ctx, p, f, &ssn, &stream, buf_tc,
sizeof(buf_tc), STREAM_TOCLIENT | STREAM_START) < 0);
SigMatchSignatures(&tv, de_ctx, det_ctx, p);
FAIL_IF (!PacketAlertCheck(p, 1));
@ -764,13 +768,15 @@ static int DetectAppLayerEventTest05(void)
StreamTcpUTInit(&ra_ctx);
p->flowflags = FLOW_PKT_TOSERVER;
FAIL_IF (AppLayerHandleTCPData(&tv, ra_ctx, p, f, &ssn, &stream_ts, buf_ts,
TcpStream *stream = &stream_ts;
FAIL_IF (AppLayerHandleTCPData(&tv, ra_ctx, p, f, &ssn, &stream, buf_ts,
sizeof(buf_ts), STREAM_TOSERVER | STREAM_START) < 0);
SigMatchSignatures(&tv, de_ctx, det_ctx, p);
FAIL_IF (PacketAlertCheck(p, 1));
p->flowflags = FLOW_PKT_TOCLIENT;
FAIL_IF (AppLayerHandleTCPData(&tv, ra_ctx, p, f, &ssn, &stream_tc, buf_tc,
stream = &stream_tc;
FAIL_IF (AppLayerHandleTCPData(&tv, ra_ctx, p, f, &ssn, &stream, buf_tc,
sizeof(buf_tc), STREAM_TOCLIENT | STREAM_START) < 0);
SigMatchSignatures(&tv, de_ctx, det_ctx, p);
FAIL_IF (!PacketAlertCheck(p, 1));

@ -988,38 +988,40 @@ static inline bool CheckGap(TcpSession *ssn, TcpStream *stream, Packet *p)
/** \internal
* \brief get stream buffer and update the app-layer
* \param stream pointer to pointer as app-layer can switch flow dir
* \retval 0 success
*/
static int ReassembleUpdateAppLayer (ThreadVars *tv,
TcpReassemblyThreadCtx *ra_ctx,
TcpSession *ssn, TcpStream *stream,
TcpSession *ssn, TcpStream **stream,
Packet *p, enum StreamUpdateDir dir)
{
uint64_t app_progress = STREAM_APP_PROGRESS(stream);
uint64_t app_progress = STREAM_APP_PROGRESS(*stream);
SCLogDebug("app progress %"PRIu64, app_progress);
SCLogDebug("last_ack %u, base_seq %u", stream->last_ack, stream->base_seq);
SCLogDebug("last_ack %u, base_seq %u", (*stream)->last_ack, (*stream)->base_seq);
const uint8_t *mydata;
uint32_t mydata_len;
while (1) {
GetAppBuffer(stream, &mydata, &mydata_len, app_progress);
if (mydata == NULL && mydata_len > 0 && CheckGap(ssn, stream, p)) {
GetAppBuffer(*stream, &mydata, &mydata_len, app_progress);
if (mydata == NULL && mydata_len > 0 && CheckGap(ssn, *stream, p)) {
SCLogDebug("sending GAP to app-layer (size: %u)", mydata_len);
int r = AppLayerHandleTCPData(tv, ra_ctx, p, p->flow, ssn, stream,
NULL, mydata_len,
StreamGetAppLayerFlags(ssn, stream, p, dir)|STREAM_GAP);
StreamGetAppLayerFlags(ssn, *stream, p, dir)|STREAM_GAP);
AppLayerProfilingStore(ra_ctx->app_tctx, p);
StreamTcpSetEvent(p, STREAM_REASSEMBLY_SEQ_GAP);
StatsIncr(tv, ra_ctx->counter_tcp_reass_gap);
stream->app_progress_rel += mydata_len;
app_progress += mydata_len;
/* AppLayerHandleTCPData has likely updated progress. */
app_progress = STREAM_APP_PROGRESS(*stream);
if (r < 0)
break;
return 0;
continue;
} else if (mydata == NULL || mydata_len == 0) {
@ -1033,7 +1035,7 @@ static int ReassembleUpdateAppLayer (ThreadVars *tv,
//PrintRawDataFp(stdout, mydata, mydata_len);
SCLogDebug("stream %p data in buffer %p of len %u and offset %"PRIu64,
stream, &stream->sb, mydata_len, app_progress);
*stream, &(*stream)->sb, mydata_len, app_progress);
/* get window of data that is acked */
if (StreamTcpInlineMode() == FALSE) {
@ -1041,10 +1043,10 @@ static int ReassembleUpdateAppLayer (ThreadVars *tv,
// fall through, we use all available data
} else {
uint64_t last_ack_abs = app_progress; /* absolute right edge of ack'd data */
if (STREAM_LASTACK_GT_BASESEQ(stream)) {
if (STREAM_LASTACK_GT_BASESEQ(*stream)) {
/* get window of data that is acked */
uint32_t delta = stream->last_ack - stream->base_seq;
DEBUG_VALIDATE_BUG_ON(delta > 10000000ULL && delta > stream->window);
uint32_t delta = (*stream)->last_ack - (*stream)->base_seq;
DEBUG_VALIDATE_BUG_ON(delta > 10000000ULL && delta > (*stream)->window);
/* get max absolute offset */
last_ack_abs += delta;
}
@ -1061,24 +1063,11 @@ static int ReassembleUpdateAppLayer (ThreadVars *tv,
}
/* update the app-layer */
int r = AppLayerHandleTCPData(tv, ra_ctx, p, p->flow, ssn, stream,
(void)AppLayerHandleTCPData(tv, ra_ctx, p, p->flow, ssn, stream,
(uint8_t *)mydata, mydata_len,
StreamGetAppLayerFlags(ssn, stream, p, dir));
StreamGetAppLayerFlags(ssn, *stream, p, dir));
AppLayerProfilingStore(ra_ctx->app_tctx, p);
/* see if we can update the progress */
if (r == 0 && mydata_len > 0 &&
StreamTcpIsSetStreamFlagAppProtoDetectionCompleted(stream))
{
SCLogDebug("app progress %"PRIu64" increasing with data len %u to %"PRIu64,
app_progress, mydata_len, app_progress + mydata_len);
stream->app_progress_rel += mydata_len;
SCLogDebug("app progress now %"PRIu64, STREAM_APP_PROGRESS(stream));
} else {
SCLogDebug("NOT UPDATED app progress still %"PRIu64, app_progress);
}
SCReturnInt(0);
}
@ -1121,7 +1110,7 @@ int StreamTcpReassembleAppLayer (ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx,
if (ssn->state >= TCP_CLOSING || (p->flags & PKT_PSEUDO_STREAM_END)) {
SCLogDebug("sending empty eof message");
/* send EOF to app layer */
AppLayerHandleTCPData(tv, ra_ctx, p, p->flow, ssn, stream,
AppLayerHandleTCPData(tv, ra_ctx, p, p->flow, ssn, &stream,
NULL, 0,
StreamGetAppLayerFlags(ssn, stream, p, dir));
AppLayerProfilingStore(ra_ctx->app_tctx, p);
@ -1131,7 +1120,7 @@ int StreamTcpReassembleAppLayer (ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx,
}
/* with all that out of the way, lets update the app-layer */
return ReassembleUpdateAppLayer(tv, ra_ctx, ssn, stream, p, dir);
return ReassembleUpdateAppLayer(tv, ra_ctx, ssn, &stream, p, dir);
}
/** \internal

Loading…
Cancel
Save