streaming: implement memory regions

In TCP, large gaps in the data could lead to an extremely poor utilization
of the streaming buffer memory. This was caused by the implementation using
a single continues memory allocation from the "stream offset" to the
current data. If a 100 byte segment was inserted for ISN + 20MiB, we would
allocate 20MiB, even if only 100 bytes were actually used.

This patch addresses the issue by implementing a list of memory regions.
The StreamingBuffer structure holds a static "main" region, which can be
extended in the form of a simple list of regions.

    [ main region ] [ gap ] [ aux region ]
    [ sbb ]                 [ sbb ]

On insert, find the correct region and see if the new data fits. If it
doesn't, see if we can expand the current region, or than we need to add
a new region. If expanding the current region means we overlap or get
too close to the next region, we merge them.

On sliding, we free any regions that slide out of window and consolidate
auxilary regions into main where needed.

Bug: #4580.
pull/8406/head
Victor Julien 3 years ago
parent 61e47ad6f5
commit 1dac2467c5

@ -380,7 +380,7 @@ static void EveHttpLogJSONHeaders(JsonBuilder *js, uint32_t direction, htp_tx_t
static void BodyPrintableBuffer(JsonBuilder *js, HtpBody *body, const char *key)
{
if (body->sb != NULL && body->sb->buf != NULL) {
if (body->sb != NULL && body->sb->region.buf != NULL) {
uint32_t offset = 0;
const uint8_t *body_data;
uint32_t body_data_len;
@ -418,7 +418,7 @@ void EveHttpLogJSONBodyPrintable(JsonBuilder *js, Flow *f, uint64_t tx_id)
static void BodyBase64Buffer(JsonBuilder *js, HtpBody *body, const char *key)
{
if (body->sb != NULL && body->sb->buf != NULL) {
if (body->sb != NULL && body->sb->region.buf != NULL) {
const uint8_t *body_data;
uint32_t body_data_len;
uint64_t body_offset;

@ -140,7 +140,7 @@ typedef struct TcpStream_ {
struct TCPSACK sack_tree; /**< red back tree of TCP SACK records. */
} TcpStream;
#define STREAM_BASE_OFFSET(stream) ((stream)->sb.stream_offset)
#define STREAM_BASE_OFFSET(stream) ((stream)->sb.region.stream_offset)
#define STREAM_APP_PROGRESS(stream) (STREAM_BASE_OFFSET((stream)) + (stream)->app_progress_rel)
#define STREAM_RAW_PROGRESS(stream) (STREAM_BASE_OFFSET((stream)) + (stream)->raw_progress_rel)
#define STREAM_LOG_PROGRESS(stream) (STREAM_BASE_OFFSET((stream)) + (stream)->log_progress_rel)

@ -686,10 +686,10 @@ static uint32_t StreamTcpReassembleCheckDepth(TcpSession *ssn, TcpStream *stream
uint32_t StreamDataAvailableForProtoDetect(TcpStream *stream)
{
if (RB_EMPTY(&stream->sb.sbb_tree)) {
if (stream->sb.stream_offset != 0)
if (stream->sb.region.stream_offset != 0)
return 0;
return stream->sb.buf_offset;
return stream->sb.region.buf_offset;
} else {
DEBUG_VALIDATE_BUG_ON(stream->sb.head == NULL);
DEBUG_VALIDATE_BUG_ON(stream->sb.sbb_size == 0);
@ -913,7 +913,6 @@ uint8_t StreamNeedsReassembly(const TcpSession *ssn, uint8_t direction)
}
const uint64_t right_edge = StreamingBufferGetConsecutiveDataRightEdge(&stream->sb);
SCLogDebug("%s: app %"PRIu64" (use: %s), raw %"PRIu64" (use: %s). Stream right edge: %"PRIu64,
dirstr,
STREAM_APP_PROGRESS(stream), use_app ? "yes" : "no",
@ -945,7 +944,7 @@ static uint64_t GetStreamSize(TcpStream *stream)
uint64_t last_ack_abs = GetAbsLastAck(stream);
uint64_t last_re = 0;
SCLogDebug("stream_offset %" PRIu64, stream->sb.stream_offset);
SCLogDebug("stream_offset %" PRIu64, stream->sb.region.stream_offset);
TcpSegment *seg;
RB_FOREACH(seg, TCPSEG, &stream->seg_tree) {
@ -1065,6 +1064,7 @@ static bool GetAppBuffer(const TcpStream *stream, const uint8_t **data, uint32_t
SCLogDebug("blk at offset");
StreamingBufferSBBGetData(&stream->sb, blk, data, data_len);
BUG_ON(blk->len != *data_len);
gap_ahead = check_for_gap && GapAhead(stream, blk);

File diff suppressed because it is too large Load Diff

@ -74,6 +74,19 @@ typedef struct StreamingBufferConfig_ {
0, 0, NULL, NULL, NULL, \
}
#define STREAMING_BUFFER_REGION_INIT \
{ \
NULL, 0, 0, 0ULL, NULL, \
}
typedef struct StreamingBufferRegion_ {
uint8_t *buf; /**< memory block for reassembly */
uint32_t buf_size; /**< size of memory block */
uint32_t buf_offset; /**< how far we are in buf_size */
uint64_t stream_offset; /**< stream offset of this region */
struct StreamingBufferRegion_ *next;
} StreamingBufferRegion;
/**
* \brief block of continues data
*/
@ -92,15 +105,12 @@ StreamingBufferBlock *SBB_RB_FIND_INCLUSIVE(struct SBB *head, StreamingBufferBlo
typedef struct StreamingBuffer_ {
const StreamingBufferConfig *cfg;
uint64_t stream_offset; /**< offset of the start of the memory block */
uint8_t *buf; /**< memory block for reassembly */
uint32_t buf_size; /**< size of memory block */
uint32_t buf_offset; /**< how far we are in buf_size */
StreamingBufferRegion region;
struct SBB sbb_tree; /**< red black tree of Stream Buffer Blocks */
StreamingBufferBlock *head; /**< head, should always be the same as RB_MIN */
uint32_t sbb_size; /**< data size covered by sbbs */
uint16_t regions;
uint16_t max_regions;
#ifdef DEBUG
uint32_t buf_size_max;
#endif
@ -108,33 +118,34 @@ typedef struct StreamingBuffer_ {
static inline bool StreamingBufferHasData(const StreamingBuffer *sb)
{
return (sb->stream_offset || sb->buf_offset || !RB_EMPTY(&sb->sbb_tree));
return (sb->region.stream_offset || sb->region.buf_offset || sb->region.next != NULL ||
!RB_EMPTY(&sb->sbb_tree));
}
static inline uint64_t StreamingBufferGetConsecutiveDataRightEdge(const StreamingBuffer *sb)
{
return sb->stream_offset + sb->buf_offset;
return sb->region.stream_offset + sb->region.buf_offset;
}
static inline uint64_t StreamingBufferGetOffset(const StreamingBuffer *sb)
{
return sb->stream_offset;
return sb->region.stream_offset;
}
#ifndef DEBUG
#define STREAMING_BUFFER_INITIALIZER(cfg) \
{ \
(cfg), \
0, \
NULL, \
0, \
0, \
STREAMING_BUFFER_REGION_INIT, \
{ NULL }, \
NULL, \
0, \
1, \
1, \
};
#else
#define STREAMING_BUFFER_INITIALIZER(cfg) { (cfg), 0, NULL, 0, 0, { NULL }, NULL, 0, 0 };
#define STREAMING_BUFFER_INITIALIZER(cfg) \
{ (cfg), STREAMING_BUFFER_REGION_INIT, { NULL }, NULL, 0, 1, 1, 0 };
#endif
typedef struct StreamingBufferSegment_ {

Loading…
Cancel
Save