app-layer: improve/fix updates logic

In 23323a961f ("app-layer: reduce app cleanup and output-tx calls"), flag
was set per packet updating the app-layer state. However this was missing a
common pattern: in IDS mode most updates are done in the opposite direction
of the traffic due to updates getting triggered by ACK's. This meant that
file store processing might not happen for a long time, or at all. Also,
app layer cleanup might not be called, which includes file pruning.

This patch sets per flow set of flags to indicate app layer is (potentially)
updated. It sets this per direction, based on how the parsers were invoked.
If an ACK triggers an app update, the flow is tagged for the opposite
direction and the next packet in that direction triggers output and cleanup.

Fixes: 23323a961f ("app-layer: reduce app cleanup and output-tx calls")

Bug: #6120.
pull/9009/head
Victor Julien 3 years ago
parent 22d7323eee
commit c90f67ac55

@ -338,10 +338,9 @@ extern enum ExceptionPolicy g_applayerparser_error_policy;
* \retval int -1 error
* \retval int 0 ok
*/
static int TCPProtoDetect(ThreadVars *tv,
TcpReassemblyThreadCtx *ra_ctx, AppLayerThreadCtx *app_tctx,
Packet *p, Flow *f, TcpSession *ssn, TcpStream **stream,
uint8_t *data, uint32_t data_len, uint8_t flags)
static int TCPProtoDetect(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx,
AppLayerThreadCtx *app_tctx, Packet *p, Flow *f, TcpSession *ssn, TcpStream **stream,
uint8_t *data, uint32_t data_len, uint8_t flags, enum StreamUpdateDir dir)
{
AppProto *alproto;
AppProto *alproto_otherdir;
@ -507,7 +506,7 @@ static int TCPProtoDetect(ThreadVars *tv,
int r = AppLayerParserParse(tv, app_tctx->alp_tctx, f, f->alproto,
flags, data, data_len);
PACKET_PROFILING_APP_END(app_tctx, f->alproto);
p->flags |= PKT_APPLAYER_UPDATE;
p->app_update_direction = (uint8_t)dir;
if (r != 1) {
StreamTcpUpdateAppLayerProgress(ssn, direction, data_len);
}
@ -581,7 +580,7 @@ static int TCPProtoDetect(ThreadVars *tv,
f->alproto, flags,
data, data_len);
PACKET_PROFILING_APP_END(app_tctx, f->alproto);
p->flags |= PKT_APPLAYER_UPDATE;
p->app_update_direction = (uint8_t)dir;
if (r != 1) {
StreamTcpUpdateAppLayerProgress(ssn, direction, data_len);
}
@ -641,11 +640,9 @@ detect_error:
* \param stream ptr-to-ptr to stream object. Might change if flow dir is
* reversed.
*/
int AppLayerHandleTCPData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx,
Packet *p, Flow *f,
TcpSession *ssn, TcpStream **stream,
uint8_t *data, uint32_t data_len,
uint8_t flags)
int AppLayerHandleTCPData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx, Packet *p, Flow *f,
TcpSession *ssn, TcpStream **stream, uint8_t *data, uint32_t data_len, uint8_t flags,
enum StreamUpdateDir dir)
{
SCEnter();
@ -691,7 +688,7 @@ int AppLayerHandleTCPData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx,
r = AppLayerParserParse(tv, app_tctx->alp_tctx, f, f->alproto,
flags, data, data_len);
PACKET_PROFILING_APP_END(app_tctx, f->alproto);
p->flags |= PKT_APPLAYER_UPDATE;
p->app_update_direction = (uint8_t)dir;
/* ignore parser result for gap */
StreamTcpUpdateAppLayerProgress(ssn, direction, data_len);
if (r < 0) {
@ -709,8 +706,8 @@ int AppLayerHandleTCPData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx,
if (alproto == ALPROTO_UNKNOWN && (flags & STREAM_START)) {
DEBUG_VALIDATE_BUG_ON(FlowChangeProto(f));
/* run protocol detection */
if (TCPProtoDetect(tv, ra_ctx, app_tctx, p, f, ssn, stream,
data, data_len, flags) != 0) {
if (TCPProtoDetect(tv, ra_ctx, app_tctx, p, f, ssn, stream, data, data_len, flags, dir) !=
0) {
goto failure;
}
} else if (alproto != ALPROTO_UNKNOWN && FlowChangeProto(f)) {
@ -722,7 +719,8 @@ int AppLayerHandleTCPData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx,
StreamTcpResetStreamFlagAppProtoDetectionCompleted(&ssn->client);
StreamTcpResetStreamFlagAppProtoDetectionCompleted(&ssn->server);
/* rerun protocol detection */
int rd = TCPProtoDetect(tv, ra_ctx, app_tctx, p, f, ssn, stream, data, data_len, flags);
int rd =
TCPProtoDetect(tv, ra_ctx, app_tctx, p, f, ssn, stream, data, data_len, flags, dir);
if (f->alproto == ALPROTO_UNKNOWN) {
DEBUG_VALIDATE_BUG_ON(alstate_orig != f->alstate);
// not enough data, revert AppLayerProtoDetectReset to rerun detection
@ -775,7 +773,7 @@ int AppLayerHandleTCPData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx,
r = AppLayerParserParse(tv, app_tctx->alp_tctx, f, f->alproto,
flags, data, data_len);
PACKET_PROFILING_APP_END(app_tctx, f->alproto);
p->flags |= PKT_APPLAYER_UPDATE;
p->app_update_direction = (uint8_t)dir;
if (r != 1) {
StreamTcpUpdateAppLayerProgress(ssn, direction, data_len);
if (r < 0) {
@ -900,7 +898,7 @@ int AppLayerHandleUdp(ThreadVars *tv, AppLayerThreadCtx *tctx, Packet *p, Flow *
r = AppLayerParserParse(tv, tctx->alp_tctx, f, f->alproto,
flags, p->payload, p->payload_len);
PACKET_PROFILING_APP_END(tctx, f->alproto);
p->flags |= PKT_APPLAYER_UPDATE;
p->app_update_direction = (uint8_t)UPDATE_DIR_PACKET;
}
PACKET_PROFILING_APP_STORE(tctx, p);
/* we do only inspection in one direction, so flag both
@ -917,7 +915,7 @@ int AppLayerHandleUdp(ThreadVars *tv, AppLayerThreadCtx *tctx, Packet *p, Flow *
flags, p->payload, p->payload_len);
PACKET_PROFILING_APP_END(tctx, f->alproto);
PACKET_PROFILING_APP_STORE(tctx, p);
p->flags |= PKT_APPLAYER_UPDATE;
p->app_update_direction = (uint8_t)UPDATE_DIR_PACKET;
}
if (r < 0) {
ExceptionPolicyApply(p, g_applayerparser_error_policy, PKT_DROP_REASON_APPLAYER_ERROR);

@ -41,11 +41,9 @@
/**
* \brief Handles reassembled tcp stream.
*/
int AppLayerHandleTCPData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx,
Packet *p, Flow *f,
TcpSession *ssn, TcpStream **stream,
uint8_t *data, uint32_t data_len,
uint8_t flags);
int AppLayerHandleTCPData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx, Packet *p, Flow *f,
TcpSession *ssn, TcpStream **stream, uint8_t *data, uint32_t data_len, uint8_t flags,
enum StreamUpdateDir dir);
/**
* \brief Handles an udp chunk.

@ -459,6 +459,8 @@ typedef struct Packet_
uint8_t flowflags;
/* coccinelle: Packet:flowflags:FLOW_PKT_ */
uint8_t app_update_direction; // enum StreamUpdateDir
/* Pkt Flags */
uint32_t flags;
@ -1056,9 +1058,6 @@ void DecodeUnregisterCounters(void);
#define PKT_FIRST_ALERTS BIT_U32(29)
#define PKT_FIRST_TAG BIT_U32(30)
/** Packet updated the app-layer. */
#define PKT_APPLAYER_UPDATE BIT_U32(31)
/** \brief return 1 if the packet is a pseudo packet */
#define PKT_IS_PSEUDOPKT(p) \
((p)->flags & (PKT_PSEUDO_STREAM_END|PKT_PSEUDO_DETECTLOG_FLUSH))

@ -512,6 +512,44 @@ static inline void FlowWorkerProcessLocalFlows(ThreadVars *tv, FlowWorkerThreadD
FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW_EVICTED);
}
/** \internal
* \brief apply Packet::app_update_direction to the flow flags
*/
static void PacketAppUpdate2FlowFlags(Packet *p)
{
switch ((enum StreamUpdateDir)p->app_update_direction) {
case UPDATE_DIR_NONE: // NONE implies pseudo packet
break;
case UPDATE_DIR_PACKET:
if (PKT_IS_TOSERVER(p)) {
p->flow->flags |= FLOW_TS_APP_UPDATED;
SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TS_APP_UPDATED set", p->pcap_cnt);
} else {
p->flow->flags |= FLOW_TC_APP_UPDATED;
SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TC_APP_UPDATED set", p->pcap_cnt);
}
break;
case UPDATE_DIR_BOTH:
if (PKT_IS_TOSERVER(p)) {
p->flow->flags |= FLOW_TS_APP_UPDATED;
SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TS_APP_UPDATED set", p->pcap_cnt);
} else {
p->flow->flags |= FLOW_TC_APP_UPDATED;
SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TC_APP_UPDATED set", p->pcap_cnt);
}
/* fall through */
case UPDATE_DIR_OPPOSING:
if (PKT_IS_TOSERVER(p)) {
p->flow->flags |= FLOW_TC_APP_UPDATED;
SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TC_APP_UPDATED set", p->pcap_cnt);
} else {
p->flow->flags |= FLOW_TS_APP_UPDATED;
SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TS_APP_UPDATED set", p->pcap_cnt);
}
break;
}
}
static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data)
{
FlowWorkerThreadData *fw = data;
@ -567,12 +605,14 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data)
}
FlowWorkerStreamTCPUpdate(tv, fw, p, detect_thread, false);
PacketAppUpdate2FlowFlags(p);
/* handle the app layer part of the UDP packet payload */
} else if (p->proto == IPPROTO_UDP && !PacketCheckAction(p, ACTION_DROP)) {
FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_APPLAYERUDP);
AppLayerHandleUdp(tv, fw->stream_thread->ra_ctx->app_tctx, p, p->flow);
FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_APPLAYERUDP);
PacketAppUpdate2FlowFlags(p);
}
}
@ -609,13 +649,29 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data)
FramesPrune(p->flow, p);
}
if ((PKT_IS_PSEUDOPKT(p)) || ((p->flags & PKT_APPLAYER_UPDATE) != 0)) {
SCLogDebug("pseudo or app update: run cleanup");
/* run tx cleanup last */
AppLayerParserTransactionsCleanup(p->flow, STREAM_FLAGS_FOR_PACKET(p));
if ((PKT_IS_PSEUDOPKT(p)) ||
(p->flow->flags & (FLOW_TS_APP_UPDATED | FLOW_TC_APP_UPDATED))) {
if (PKT_IS_TOSERVER(p)) {
if (PKT_IS_PSEUDOPKT(p) || (p->flow->flags & (FLOW_TS_APP_UPDATED))) {
AppLayerParserTransactionsCleanup(p->flow, STREAM_TOSERVER);
p->flow->flags &= ~FLOW_TS_APP_UPDATED;
}
} else {
if (PKT_IS_PSEUDOPKT(p) || (p->flow->flags & (FLOW_TC_APP_UPDATED))) {
AppLayerParserTransactionsCleanup(p->flow, STREAM_TOCLIENT);
p->flow->flags &= ~FLOW_TC_APP_UPDATED;
}
}
} else {
SCLogDebug("not pseudo, no app update: skip");
}
if (p->flow->flags & FLOW_ACTION_DROP) {
SCLogDebug("flow drop in place: remove app update flags");
p->flow->flags &= ~(FLOW_TS_APP_UPDATED | FLOW_TC_APP_UPDATED);
}
Flow *f = p->flow;
FlowDeReference(&p->flow);
FLOWLOCK_UNLOCK(f);

@ -112,6 +112,9 @@ typedef struct AppLayerParserState_ AppLayerParserState;
/** All packets in this flow should be passed */
#define FLOW_ACTION_PASS BIT_U32(28)
#define FLOW_TS_APP_UPDATED BIT_U32(29)
#define FLOW_TC_APP_UPDATED BIT_U32(30)
/* File flags */
#define FLOWFILE_INIT 0

@ -339,7 +339,7 @@ static TmEcode OutputTxLog(ThreadVars *tv, Packet *p, void *thread_data)
DEBUG_VALIDATE_BUG_ON(thread_data == NULL);
if (p->flow == NULL)
return TM_ECODE_OK;
if (!((PKT_IS_PSEUDOPKT(p)) || (p->flags & PKT_APPLAYER_UPDATE) != 0)) {
if (!((PKT_IS_PSEUDOPKT(p)) || p->flow->flags & (FLOW_TS_APP_UPDATED | FLOW_TC_APP_UPDATED))) {
SCLogDebug("not pseudo, no app update: skip");
return TM_ECODE_OK;
}

@ -96,6 +96,7 @@ void PacketReinit(Packet *p)
p->proto = 0;
p->recursion_level = 0;
PACKET_FREE_EXTDATA(p);
p->app_update_direction = 0;
p->flags = 0;
p->flowflags = 0;
p->pkt_src = 0;

@ -767,7 +767,7 @@ int StreamTcpReassembleHandleSegmentHandleData(ThreadVars *tv, TcpReassemblyThre
StreamTcpSetEvent(p, STREAM_REASSEMBLY_DEPTH_REACHED);
/* increment stream depth counter */
StatsIncr(tv, ra_ctx->counter_tcp_stream_depth);
p->flags |= PKT_APPLAYER_UPDATE;
p->app_update_direction = UPDATE_DIR_PACKET;
}
if (size == 0) {
SCLogDebug("ssn %p: depth reached, not reassembling", ssn);
@ -1246,9 +1246,8 @@ static int ReassembleUpdateAppLayer (ThreadVars *tv,
if (mydata == NULL && mydata_len > 0 && CheckGap(ssn, *stream, p)) {
SCLogDebug("sending GAP to app-layer (size: %u)", mydata_len);
int r = AppLayerHandleTCPData(tv, ra_ctx, p, p->flow, ssn, stream,
NULL, mydata_len,
StreamGetAppLayerFlags(ssn, *stream, p)|STREAM_GAP);
int r = AppLayerHandleTCPData(tv, ra_ctx, p, p->flow, ssn, stream, NULL, mydata_len,
StreamGetAppLayerFlags(ssn, *stream, p) | STREAM_GAP, dir);
AppLayerProfilingStore(ra_ctx->app_tctx, p);
StreamTcpSetEvent(p, STREAM_REASSEMBLY_SEQ_GAP);
@ -1320,8 +1319,8 @@ static int ReassembleUpdateAppLayer (ThreadVars *tv,
SCLogDebug("parser");
/* update the app-layer */
(void)AppLayerHandleTCPData(tv, ra_ctx, p, p->flow, ssn, stream,
(uint8_t *)mydata, mydata_len, flags);
(void)AppLayerHandleTCPData(
tv, ra_ctx, p, p->flow, ssn, stream, (uint8_t *)mydata, mydata_len, flags, dir);
AppLayerProfilingStore(ra_ctx->app_tctx, p);
AppLayerFrameDump(p->flow);
uint64_t new_app_progress = STREAM_APP_PROGRESS(*stream);
@ -1374,9 +1373,8 @@ int StreamTcpReassembleAppLayer (ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx,
if (ssn->state >= TCP_CLOSING || (p->flags & PKT_PSEUDO_STREAM_END)) {
SCLogDebug("sending empty eof message");
/* send EOF to app layer */
AppLayerHandleTCPData(tv, ra_ctx, p, p->flow, ssn, &stream,
NULL, 0,
StreamGetAppLayerFlags(ssn, stream, p));
AppLayerHandleTCPData(tv, ra_ctx, p, p->flow, ssn, &stream, NULL, 0,
StreamGetAppLayerFlags(ssn, stream, p), dir);
AppLayerProfilingStore(ra_ctx->app_tctx, p);
SCReturnInt(0);

@ -51,6 +51,7 @@ enum
};
enum StreamUpdateDir {
UPDATE_DIR_NONE = 0,
UPDATE_DIR_PACKET,
UPDATE_DIR_OPPOSING,
UPDATE_DIR_BOTH,

Loading…
Cancel
Save