output-streaming: StreamIterator

StreamIterator implementation for iterating over ACKed segments.

Flag each segment as logged when the log function has been called for it.

Set a 'OPEN' flag for the first segment in both directions.

Set a 'CLOSE' flag when the stream ends. If the last segment was already
logged, a empty CLOSE call is performed with NULL data.
pull/1109/head
Victor Julien 11 years ago
parent 9d9ef983dd
commit ab6fac884d

@ -122,6 +122,56 @@ int Streamer(void *cbdata, Flow *f, uint8_t *data, uint32_t data_len, uint8_t fl
return 0;
}
int StreamIterator(Flow *f, TcpStream *stream, int close, void *cbdata, uint8_t iflags)
{
int logged = 0;
/* optimization: don't iterate list if we've logged all,
* so check the last segment's flags */
if (stream->seg_list_tail != NULL &&
(!(stream->seg_list_tail->flags & SEGMENTTCP_FLAG_LOGAPI_PROCESSED)))
{
TcpSegment *seg = stream->seg_list;
while (seg) {
uint8_t flags = iflags;
if (seg->flags & SEGMENTTCP_FLAG_LOGAPI_PROCESSED) {
seg = seg->next;
continue;
}
if (SEQ_GT(seg->seq + seg->payload_len, stream->last_ack)) {
SCLogDebug("seg not (fully) acked yet");
break;
}
if (seg->seq == stream->isn + 1)
flags |= OUTPUT_STREAMING_FLAG_OPEN;
/* if we need to close and we're at the last segment in the list
* we add the 'close' flag so the logger can close up. */
if (close && seg->next == NULL)
flags |= OUTPUT_STREAMING_FLAG_CLOSE;
Streamer(cbdata, f, seg->payload, (uint32_t)seg->payload_len, flags);
seg->flags |= SEGMENTTCP_FLAG_LOGAPI_PROCESSED;
seg = seg->next;
logged = 1;
}
}
/* if we need to close we need to invoke the Streamer for sure. If we
* logged no segments, we call the Streamer with NULL data so it can
* close up. */
if (logged == 0 && close) {
Streamer(cbdata, f, NULL, 0, OUTPUT_STREAMING_FLAG_CLOSE);
}
return 0;
}
static TmEcode OutputStreamingLog(ThreadVars *tv, Packet *p, void *thread_data, PacketQueue *pq, PacketQueue *postpq)
{
BUG_ON(thread_data == NULL);
@ -131,7 +181,7 @@ static TmEcode OutputStreamingLog(ThreadVars *tv, Packet *p, void *thread_data,
OutputStreamingLogger *logger = list;
OutputLoggerThreadStore *store = op_thread_data->store;
// StreamerCallbackData streamer_cbdata = { logger, store, tv, p };
StreamerCallbackData streamer_cbdata = { logger, store, tv, p };
BUG_ON(logger == NULL && store != NULL);
BUG_ON(logger != NULL && store == NULL);
@ -150,30 +200,18 @@ static TmEcode OutputStreamingLog(ThreadVars *tv, Packet *p, void *thread_data,
else
flags |= OUTPUT_STREAMING_FLAG_TOSERVER;
// int file_close = (p->flags & PKT_PSEUDO_STREAM_END) ? 1 : 0;
// int file_trunc = 0;
FLOWLOCK_WRLOCK(f);
TcpSession *ssn = f->protoctx;
if (ssn) {
int close = (ssn->state >= TCP_CLOSED);
close |= ((p->flags & PKT_PSEUDO_STREAM_END) ? 1 : 0);
SCLogDebug("close ? %s", close ? "yes" : "no");
logger = list;
store = op_thread_data->store;
while (logger && store) {
BUG_ON(logger->LogFunc == NULL);
SCLogDebug("logger %p", logger);
PACKET_PROFILING_TMM_START(p, logger->module_id);
//logger->LogFunc(tv, store->thread_data, (const Packet *)p, (const File *)ff,
// (const FileData *)write_ffd, flags);
PACKET_PROFILING_TMM_END(p, logger->module_id);
logger = logger->next;
store = store->next;
BUG_ON(logger == NULL && store != NULL);
BUG_ON(logger != NULL && store == NULL);
}
TcpStream *stream = flags & OUTPUT_STREAMING_FLAG_TOSERVER ? &ssn->client : &ssn->server;
StreamIterator(p->flow, stream, close, (void *)&streamer_cbdata, flags);
}
FLOWLOCK_UNLOCK(f);
return TM_ECODE_OK;
}

@ -181,6 +181,9 @@ enum
#define SEGMENTTCP_FLAG_RAW_PROCESSED 0x01
/** App Layer reassembly code is done with this segment */
#define SEGMENTTCP_FLAG_APPLAYER_PROCESSED 0x02
/** Log API (streaming) has processed this segment */
#define SEGMENTTCP_FLAG_LOGAPI_PROCESSED 0x04
#define PAWS_24DAYS 2073600 /**< 24 days in seconds */

Loading…
Cancel
Save