streaming/buffer: account sbb data size

When tracking data track the size of the blocks so that in case
of gaps we can still know how much data we hold.

(cherry picked from commit be1baa8cab)
pull/6443/head
Victor Julien 5 years ago
parent 3c8587393d
commit 1c6e36abb5

@ -194,6 +194,7 @@ static void SBBInit(StreamingBuffer *sb,
sbb2->len = data_len;
sb->head = sbb;
sb->sbb_size = sbb->len + sbb2->len;
SBB_RB_INSERT(&sb->sbb_tree, sbb);
SBB_RB_INSERT(&sb->sbb_tree, sbb2);
@ -221,6 +222,7 @@ static void SBBInitLeadingGap(StreamingBuffer *sb,
sbb->len = data_len;
sb->head = sbb;
sb->sbb_size = sbb->len;
SBB_RB_INSERT(&sb->sbb_tree, sbb);
SCLogDebug("sbb %"PRIu64", len %u",
@ -255,6 +257,7 @@ static inline void ConsolidateFwd(StreamingBuffer *sb,
tr: [ ]
*/
if (sa->offset >= tr->offset && sa_re <= tr_re) {
sb->sbb_size -= sa->len;
sa->len = tr->len;
sa->offset = tr->offset;
sa_re = sa->offset + sa->len;
@ -272,6 +275,7 @@ static inline void ConsolidateFwd(StreamingBuffer *sb,
} else if (sa->offset <= tr->offset && sa_re >= tr_re) {
SCLogDebug("-> (fwd) tr %p %"PRIu64"/%u REMOVED ECLIPSED", tr, tr->offset, tr->len);
SBB_RB_REMOVE(tree, tr);
sb->sbb_size -= tr->len;
FREE(sb->cfg, tr, sizeof(StreamingBufferBlock));
/*
sa: [ ]
@ -282,11 +286,13 @@ static inline void ConsolidateFwd(StreamingBuffer *sb,
} else if (sa->offset < tr->offset && // starts before
sa_re >= tr->offset && sa_re < tr_re) // ends inside
{
// merge
// merge. sb->sbb_size includes both so we need to adjust that too.
uint32_t combined_len = sa->len + tr->len;
sa->len = tr_re - sa->offset;
sa_re = sa->offset + sa->len;
SCLogDebug("-> (fwd) tr %p %"PRIu64"/%u REMOVED MERGED", tr, tr->offset, tr->len);
SBB_RB_REMOVE(tree, tr);
sb->sbb_size -= (combined_len - sa->len); // remove what we added twice
FREE(sb->cfg, tr, sizeof(StreamingBufferBlock));
}
}
@ -307,6 +313,7 @@ static inline void ConsolidateBackward(StreamingBuffer *sb,
break; // entirely after
if (sa->offset >= tr->offset && sa_re <= tr_re) {
sb->sbb_size -= sa->len; // sa entirely eclipsed so remove double accounting
sa->len = tr->len;
sa->offset = tr->offset;
sa_re = sa->offset + sa->len;
@ -328,6 +335,7 @@ static inline void ConsolidateBackward(StreamingBuffer *sb,
if (sb->head == tr)
sb->head = sa;
SBB_RB_REMOVE(tree, tr);
sb->sbb_size -= tr->len; // tr entirely eclipsed so remove double accounting
FREE(sb->cfg, tr, sizeof(StreamingBufferBlock));
/*
sa: [ ]
@ -336,7 +344,8 @@ static inline void ConsolidateBackward(StreamingBuffer *sb,
tr: [ ]
*/
} else if (sa->offset > tr->offset && sa_re > tr_re && sa->offset <= tr_re) {
// merge
// merge. sb->sbb_size includes both so we need to adjust that too.
uint32_t combined_len = sa->len + tr->len;
sa->len = sa_re - tr->offset;
sa->offset = tr->offset;
sa_re = sa->offset + sa->len;
@ -344,6 +353,7 @@ static inline void ConsolidateBackward(StreamingBuffer *sb,
if (sb->head == tr)
sb->head = sa;
SBB_RB_REMOVE(tree, tr);
sb->sbb_size -= (combined_len - sa->len); // remove what we added twice
FREE(sb->cfg, tr, sizeof(StreamingBufferBlock));
}
}
@ -366,6 +376,7 @@ static int Insert(StreamingBuffer *sb, struct SBB *tree,
FREE(sb->cfg, sbb, sizeof(StreamingBufferBlock));
return 0;
}
sb->sbb_size += len; // may adjust based on consolidation below
if (SBB_RB_PREV(sbb) == NULL) {
sb->head = sbb;
} else {
@ -389,6 +400,7 @@ static void SBBFree(StreamingBuffer *sb)
StreamingBufferBlock *sbb = NULL, *safe = NULL;
RB_FOREACH_SAFE(sbb, SBB, &sb->sbb_tree, safe) {
SBB_RB_REMOVE(&sb->sbb_tree, sbb);
sb->sbb_size -= sbb->len;
FREE(sb->cfg, sbb, sizeof(StreamingBufferBlock));
}
sb->head = NULL;
@ -413,6 +425,7 @@ static void SBBPrune(StreamingBuffer *sb)
if (sbb->len >= shrink_by) {
sbb->len -= shrink_by;
sbb->offset += shrink_by;
sb->sbb_size -= shrink_by;
DEBUG_VALIDATE_BUG_ON(sbb->offset != sb->stream_offset);
}
sb->head = sbb;
@ -422,6 +435,7 @@ static void SBBPrune(StreamingBuffer *sb)
SBB_RB_REMOVE(&sb->sbb_tree, sbb);
/* either we set it again for the next sbb, or there isn't any */
sb->head = NULL;
sb->sbb_size -= sbb->len;
SCLogDebug("sb %p removed %p %"PRIu64", %u", sb, sbb, sbb->offset, sbb->len);
FREE(sb->cfg, sbb, sizeof(StreamingBufferBlock));
}
@ -941,6 +955,7 @@ static int StreamingBufferTest01(void)
FAIL_IF(!StreamingBufferSegmentCompareRawData(sb,seg1,(const uint8_t *)"ABCDEFGH", 8));
FAIL_IF(!StreamingBufferSegmentCompareRawData(sb,seg2,(const uint8_t *)"01234567", 8));
Dump(sb);
FAIL_IF_NOT_NULL(sb->head);
FAIL_IF_NOT(sb->head == RB_MIN(SBB, &sb->sbb_tree));
StreamingBufferSegment *seg3 = StreamingBufferAppendRaw(sb, (const uint8_t *)"QWERTY", 6);
@ -952,6 +967,7 @@ static int StreamingBufferTest01(void)
FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,seg3));
FAIL_IF(!StreamingBufferSegmentCompareRawData(sb,seg3,(const uint8_t *)"QWERTY", 6));
Dump(sb);
FAIL_IF_NOT_NULL(sb->head);
FAIL_IF_NOT(sb->head == RB_MIN(SBB, &sb->sbb_tree));
StreamingBufferSegment *seg4 = StreamingBufferAppendRaw(sb, (const uint8_t *)"KLM", 3);
@ -964,6 +980,7 @@ static int StreamingBufferTest01(void)
FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,seg4));
FAIL_IF(!StreamingBufferSegmentCompareRawData(sb,seg4,(const uint8_t *)"KLM", 3));
Dump(sb);
FAIL_IF_NOT_NULL(sb->head);
FAIL_IF_NOT(sb->head == RB_MIN(SBB, &sb->sbb_tree));
StreamingBufferSegment *seg5 = StreamingBufferAppendRaw(sb, (const uint8_t *)"!@#$%^&*()_+<>?/,.;:'[]{}-=", 27);
@ -977,6 +994,7 @@ static int StreamingBufferTest01(void)
FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,seg5));
FAIL_IF(!StreamingBufferSegmentCompareRawData(sb,seg5,(const uint8_t *)"!@#$%^&*()_+<>?/,.;:'[]{}-=", 27));
Dump(sb);
FAIL_IF_NOT_NULL(sb->head);
FAIL_IF_NOT(sb->head == RB_MIN(SBB, &sb->sbb_tree));
StreamingBufferSegment *seg6 = StreamingBufferAppendRaw(sb, (const uint8_t *)"UVWXYZ", 6);
@ -991,6 +1009,7 @@ static int StreamingBufferTest01(void)
FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,seg6));
FAIL_IF(!StreamingBufferSegmentCompareRawData(sb,seg6,(const uint8_t *)"UVWXYZ", 6));
Dump(sb);
FAIL_IF_NOT_NULL(sb->head);
FAIL_IF_NOT(sb->head == RB_MIN(SBB, &sb->sbb_tree));
SCFree(seg1);
@ -1022,9 +1041,11 @@ static int StreamingBufferTest02(void)
Dump(sb);
DumpSegment(sb, &seg1);
DumpSegment(sb, &seg2);
FAIL_IF_NOT_NULL(sb->head);
FAIL_IF_NOT(sb->head == RB_MIN(SBB, &sb->sbb_tree));
StreamingBufferSlide(sb, 6);
FAIL_IF_NOT_NULL(sb->head);
FAIL_IF_NOT(sb->head == RB_MIN(SBB, &sb->sbb_tree));
StreamingBufferSegment seg3;
@ -1039,6 +1060,7 @@ static int StreamingBufferTest02(void)
DumpSegment(sb, &seg1);
DumpSegment(sb, &seg2);
DumpSegment(sb, &seg3);
FAIL_IF_NOT_NULL(sb->head);
FAIL_IF_NOT(sb->head == RB_MIN(SBB, &sb->sbb_tree));
StreamingBufferSlide(sb, 6);
@ -1049,6 +1071,7 @@ static int StreamingBufferTest02(void)
DumpSegment(sb, &seg1);
DumpSegment(sb, &seg2);
DumpSegment(sb, &seg3);
FAIL_IF_NOT_NULL(sb->head);
FAIL_IF_NOT(sb->head == RB_MIN(SBB, &sb->sbb_tree));
StreamingBufferFree(sb);
@ -1074,6 +1097,8 @@ static int StreamingBufferTest03(void)
Dump(sb);
DumpSegment(sb, &seg1);
DumpSegment(sb, &seg2);
FAIL_IF_NULL(sb->head);
FAIL_IF_NOT(sb->sbb_size == 16);
FAIL_IF_NOT(sb->head == RB_MIN(SBB, &sb->sbb_tree));
StreamingBufferSegment seg3;
@ -1088,6 +1113,8 @@ static int StreamingBufferTest03(void)
DumpSegment(sb, &seg1);
DumpSegment(sb, &seg2);
DumpSegment(sb, &seg3);
FAIL_IF_NULL(sb->head);
FAIL_IF_NOT(sb->sbb_size == 22);
FAIL_IF_NOT(sb->head == RB_MIN(SBB, &sb->sbb_tree));
StreamingBufferSlide(sb, 10);
@ -1098,6 +1125,8 @@ static int StreamingBufferTest03(void)
DumpSegment(sb, &seg1);
DumpSegment(sb, &seg2);
DumpSegment(sb, &seg3);
FAIL_IF_NULL(sb->head);
FAIL_IF_NOT(sb->sbb_size == 12);
FAIL_IF_NOT(sb->head == RB_MIN(SBB, &sb->sbb_tree));
StreamingBufferFree(sb);
@ -1155,6 +1184,8 @@ static int StreamingBufferTest04(void)
DumpSegment(sb, &seg1);
DumpSegment(sb, &seg2);
DumpSegment(sb, &seg3);
FAIL_IF_NULL(sb->head);
FAIL_IF_NOT(sb->sbb_size == 22);
FAIL_IF_NOT(sb->head == RB_MIN(SBB, &sb->sbb_tree));
/* far ahead of curve: */
@ -1179,6 +1210,8 @@ static int StreamingBufferTest04(void)
DumpSegment(sb, &seg2);
DumpSegment(sb, &seg3);
DumpSegment(sb, &seg4);
FAIL_IF_NULL(sb->head);
FAIL_IF_NOT(sb->sbb_size == 25);
FAIL_IF_NOT(sb->head == RB_MIN(SBB, &sb->sbb_tree));
FAIL_IF(!StreamingBufferSegmentCompareRawData(sb,&seg1,(const uint8_t *)"ABCDEFGH", 8));
@ -1241,16 +1274,22 @@ static int StreamingBufferTest06(void)
StreamingBufferSegment seg2;
FAIL_IF(StreamingBufferInsertAt(sb, &seg2, (const uint8_t *)"C", 1, 2) != 0);
Dump(sb);
FAIL_IF_NULL(sb->head);
FAIL_IF_NOT(sb->sbb_size == 2);
FAIL_IF_NOT(sb->head == RB_MIN(SBB, &sb->sbb_tree));
StreamingBufferSegment seg3;
FAIL_IF(StreamingBufferInsertAt(sb, &seg3, (const uint8_t *)"F", 1, 5) != 0);
Dump(sb);
FAIL_IF_NULL(sb->head);
FAIL_IF_NOT(sb->sbb_size == 3);
FAIL_IF_NOT(sb->head == RB_MIN(SBB, &sb->sbb_tree));
StreamingBufferSegment seg4;
FAIL_IF(StreamingBufferInsertAt(sb, &seg4, (const uint8_t *)"H", 1, 7) != 0);
Dump(sb);
FAIL_IF_NULL(sb->head);
FAIL_IF_NOT(sb->sbb_size == 4);
FAIL_IF_NOT(sb->head == RB_MIN(SBB, &sb->sbb_tree));
StreamingBufferSegment seg5;
@ -1261,6 +1300,8 @@ static int StreamingBufferTest06(void)
FAIL_IF(sbb1->offset != 0);
FAIL_IF(sbb1->len != 10);
FAIL_IF(SBB_RB_NEXT(sbb1));
FAIL_IF_NULL(sb->head);
FAIL_IF_NOT(sb->sbb_size == 10);
FAIL_IF_NOT(sb->head == RB_MIN(SBB, &sb->sbb_tree));
StreamingBufferSegment seg6;
@ -1271,6 +1312,8 @@ static int StreamingBufferTest06(void)
FAIL_IF(sbb1->offset != 0);
FAIL_IF(sbb1->len != 10);
FAIL_IF(SBB_RB_NEXT(sbb1));
FAIL_IF_NULL(sb->head);
FAIL_IF_NOT(sb->sbb_size == 10);
FAIL_IF_NOT(sb->head == RB_MIN(SBB, &sb->sbb_tree));
StreamingBufferFree(sb);
@ -1289,16 +1332,22 @@ static int StreamingBufferTest07(void)
StreamingBufferSegment seg2;
FAIL_IF(StreamingBufferInsertAt(sb, &seg2, (const uint8_t *)"D", 1, 3) != 0);
Dump(sb);
FAIL_IF_NULL(sb->head);
FAIL_IF_NOT(sb->sbb_size == 2);
FAIL_IF_NOT(sb->head == RB_MIN(SBB, &sb->sbb_tree));
StreamingBufferSegment seg3;
FAIL_IF(StreamingBufferInsertAt(sb, &seg3, (const uint8_t *)"F", 1, 5) != 0);
Dump(sb);
FAIL_IF_NULL(sb->head);
FAIL_IF_NOT(sb->sbb_size == 3);
FAIL_IF_NOT(sb->head == RB_MIN(SBB, &sb->sbb_tree));
StreamingBufferSegment seg4;
FAIL_IF(StreamingBufferInsertAt(sb, &seg4, (const uint8_t *)"H", 1, 7) != 0);
Dump(sb);
FAIL_IF_NULL(sb->head);
FAIL_IF_NOT(sb->sbb_size == 4);
FAIL_IF_NOT(sb->head == RB_MIN(SBB, &sb->sbb_tree));
StreamingBufferSegment seg5;
@ -1309,6 +1358,8 @@ static int StreamingBufferTest07(void)
FAIL_IF(sbb1->offset != 0);
FAIL_IF(sbb1->len != 10);
FAIL_IF(SBB_RB_NEXT(sbb1));
FAIL_IF_NULL(sb->head);
FAIL_IF_NOT(sb->sbb_size == 10);
FAIL_IF_NOT(sb->head == RB_MIN(SBB, &sb->sbb_tree));
StreamingBufferSegment seg6;
@ -1319,6 +1370,8 @@ static int StreamingBufferTest07(void)
FAIL_IF(sbb1->offset != 0);
FAIL_IF(sbb1->len != 10);
FAIL_IF(SBB_RB_NEXT(sbb1));
FAIL_IF_NULL(sb->head);
FAIL_IF_NOT(sb->sbb_size == 10);
FAIL_IF_NOT(sb->head == RB_MIN(SBB, &sb->sbb_tree));
StreamingBufferFree(sb);
@ -1337,16 +1390,22 @@ static int StreamingBufferTest08(void)
StreamingBufferSegment seg2;
FAIL_IF(StreamingBufferInsertAt(sb, &seg2, (const uint8_t *)"D", 1, 3) != 0);
Dump(sb);
FAIL_IF_NULL(sb->head);
FAIL_IF_NOT(sb->sbb_size == 2);
FAIL_IF_NOT(sb->head == RB_MIN(SBB, &sb->sbb_tree));
StreamingBufferSegment seg3;
FAIL_IF(StreamingBufferInsertAt(sb, &seg3, (const uint8_t *)"F", 1, 5) != 0);
Dump(sb);
FAIL_IF_NULL(sb->head);
FAIL_IF_NOT(sb->sbb_size == 3);
FAIL_IF_NOT(sb->head == RB_MIN(SBB, &sb->sbb_tree));
StreamingBufferSegment seg4;
FAIL_IF(StreamingBufferInsertAt(sb, &seg4, (const uint8_t *)"H", 1, 7) != 0);
Dump(sb);
FAIL_IF_NULL(sb->head);
FAIL_IF_NOT(sb->sbb_size == 4);
FAIL_IF_NOT(sb->head == RB_MIN(SBB, &sb->sbb_tree));
StreamingBufferSegment seg5;
@ -1357,6 +1416,8 @@ static int StreamingBufferTest08(void)
FAIL_IF(sbb1->offset != 0);
FAIL_IF(sbb1->len != 10);
FAIL_IF(SBB_RB_NEXT(sbb1));
FAIL_IF_NULL(sb->head);
FAIL_IF_NOT(sb->sbb_size == 10);
FAIL_IF_NOT(sb->head == RB_MIN(SBB, &sb->sbb_tree));
StreamingBufferSegment seg6;
@ -1367,6 +1428,8 @@ static int StreamingBufferTest08(void)
FAIL_IF(sbb1->offset != 0);
FAIL_IF(sbb1->len != 20);
FAIL_IF(SBB_RB_NEXT(sbb1));
FAIL_IF_NULL(sb->head);
FAIL_IF_NOT(sb->sbb_size == 20);
FAIL_IF_NOT(sb->head == RB_MIN(SBB, &sb->sbb_tree));
StreamingBufferFree(sb);
@ -1385,16 +1448,22 @@ static int StreamingBufferTest09(void)
StreamingBufferSegment seg2;
FAIL_IF(StreamingBufferInsertAt(sb, &seg2, (const uint8_t *)"D", 1, 3) != 0);
Dump(sb);
FAIL_IF_NULL(sb->head);
FAIL_IF_NOT(sb->sbb_size == 2);
FAIL_IF_NOT(sb->head == RB_MIN(SBB, &sb->sbb_tree));
StreamingBufferSegment seg3;
FAIL_IF(StreamingBufferInsertAt(sb, &seg3, (const uint8_t *)"H", 1, 7) != 0);
Dump(sb);
FAIL_IF_NULL(sb->head);
FAIL_IF_NOT(sb->sbb_size == 3);
FAIL_IF_NOT(sb->head == RB_MIN(SBB, &sb->sbb_tree));
StreamingBufferSegment seg4;
FAIL_IF(StreamingBufferInsertAt(sb, &seg4, (const uint8_t *)"F", 1, 5) != 0);
Dump(sb);
FAIL_IF_NULL(sb->head);
FAIL_IF_NOT(sb->sbb_size == 4);
FAIL_IF_NOT(sb->head == RB_MIN(SBB, &sb->sbb_tree));
StreamingBufferSegment seg5;
@ -1405,6 +1474,8 @@ static int StreamingBufferTest09(void)
FAIL_IF(sbb1->offset != 0);
FAIL_IF(sbb1->len != 10);
FAIL_IF(SBB_RB_NEXT(sbb1));
FAIL_IF_NULL(sb->head);
FAIL_IF_NOT(sb->sbb_size == 10);
FAIL_IF_NOT(sb->head == RB_MIN(SBB, &sb->sbb_tree));
StreamingBufferSegment seg6;
@ -1415,6 +1486,8 @@ static int StreamingBufferTest09(void)
FAIL_IF(sbb1->offset != 0);
FAIL_IF(sbb1->len != 10);
FAIL_IF(SBB_RB_NEXT(sbb1));
FAIL_IF_NULL(sb->head);
FAIL_IF_NOT(sb->sbb_size == 10);
FAIL_IF_NOT(sb->head == RB_MIN(SBB, &sb->sbb_tree));
StreamingBufferFree(sb);
@ -1437,6 +1510,8 @@ static int StreamingBufferTest10(void)
StreamingBufferSegment seg3;
FAIL_IF(StreamingBufferInsertAt(sb, &seg3, (const uint8_t *)"H", 1, 7) != 0);
Dump(sb);
FAIL_IF_NULL(sb->head);
FAIL_IF_NOT(sb->sbb_size == 3);
StreamingBufferSegment seg4;
FAIL_IF(StreamingBufferInsertAt(sb, &seg4, (const uint8_t *)"B", 1, 1) != 0);
@ -1448,6 +1523,8 @@ static int StreamingBufferTest10(void)
FAIL_IF(StreamingBufferInsertAt(sb, &seg6, (const uint8_t *)"G", 1, 6) != 0);
Dump(sb);
FAIL_IF_NOT(sb->head == RB_MIN(SBB, &sb->sbb_tree));
FAIL_IF_NULL(sb->head);
FAIL_IF_NOT(sb->sbb_size == 6);
StreamingBufferSegment seg7;
FAIL_IF(StreamingBufferInsertAt(sb, &seg7, (const uint8_t *)"ABCDEFGHIJ", 10, 0) != 0);
@ -1457,6 +1534,8 @@ static int StreamingBufferTest10(void)
FAIL_IF(sbb1->offset != 0);
FAIL_IF(sbb1->len != 10);
FAIL_IF(SBB_RB_NEXT(sbb1));
FAIL_IF_NULL(sb->head);
FAIL_IF_NOT(sb->sbb_size == 10);
StreamingBufferSegment seg8;
FAIL_IF(StreamingBufferInsertAt(sb, &seg8, (const uint8_t *)"abcdefghij", 10, 0) != 0);
@ -1467,6 +1546,8 @@ static int StreamingBufferTest10(void)
FAIL_IF(sbb1->offset != 0);
FAIL_IF(sbb1->len != 10);
FAIL_IF(SBB_RB_NEXT(sbb1));
FAIL_IF_NULL(sb->head);
FAIL_IF_NOT(sb->sbb_size == 10);
StreamingBufferFree(sb);
PASS;

@ -102,15 +102,26 @@ typedef struct StreamingBuffer_ {
struct SBB sbb_tree; /**< red black tree of Stream Buffer Blocks */
StreamingBufferBlock *head; /**< head, should always be the same as RB_MIN */
uint32_t sbb_size; /**< data size covered by sbbs */
#ifdef DEBUG
uint32_t buf_size_max;
#endif
} StreamingBuffer;
#ifndef DEBUG
#define STREAMING_BUFFER_INITIALIZER(cfg) { (cfg), 0, NULL, 0, 0, { NULL }, NULL, };
#define STREAMING_BUFFER_INITIALIZER(cfg) \
{ \
(cfg), \
0, \
NULL, \
0, \
0, \
{ NULL }, \
NULL, \
0, \
};
#else
#define STREAMING_BUFFER_INITIALIZER(cfg) { (cfg), 0, NULL, 0, 0, { NULL }, NULL, 0 };
#define STREAMING_BUFFER_INITIALIZER(cfg) { (cfg), 0, NULL, 0, 0, { NULL }, NULL, 0, 0 };
#endif
typedef struct StreamingBufferSegment_ {

Loading…
Cancel
Save