streaming: use rbtree for stream blocks

Switch StreamBufferBlocks implementation to use RBTREE instead of
a list. This makes inserts/removals and lookups a lot cheaper if
the number of data gaps is large.

Use separate compare functions for inserts and regular lookups.
Inserts care about the offset, while lookups care about the blocks
right edge as well.
pull/3479/head
Victor Julien 7 years ago
parent 9bda558c59
commit 450500e667

@ -888,7 +888,7 @@ static void GetAppBuffer(TcpStream *stream, const uint8_t **data, uint32_t *data
const uint8_t *mydata; const uint8_t *mydata;
uint32_t mydata_len; uint32_t mydata_len;
if (stream->sb.block_list == NULL) { if (RB_EMPTY(&stream->sb.sbb_tree)) {
SCLogDebug("getting one blob"); SCLogDebug("getting one blob");
StreamingBufferGetDataAtOffset(&stream->sb, &mydata, &mydata_len, offset); StreamingBufferGetDataAtOffset(&stream->sb, &mydata, &mydata_len, offset);
@ -896,7 +896,7 @@ static void GetAppBuffer(TcpStream *stream, const uint8_t **data, uint32_t *data
*data = mydata; *data = mydata;
*data_len = mydata_len; *data_len = mydata_len;
} else { } else {
StreamingBufferBlock *blk = stream->sb.block_list; StreamingBufferBlock *blk = RB_MIN(SBB, &stream->sb.sbb_tree);
if (blk->offset > offset) { if (blk->offset > offset) {
SCLogDebug("gap, want data at offset %"PRIu64", " SCLogDebug("gap, want data at offset %"PRIu64", "
@ -908,7 +908,7 @@ static void GetAppBuffer(TcpStream *stream, const uint8_t **data, uint32_t *data
} else if (offset >= (blk->offset + blk->len)) { } else if (offset >= (blk->offset + blk->len)) {
*data = NULL; *data = NULL;
StreamingBufferBlock *nblk = blk->next; StreamingBufferBlock *nblk = SBB_RB_NEXT(blk);
*data_len = nblk ? nblk->offset - offset : 0; *data_len = nblk ? nblk->offset - offset : 0;
if (nblk) { if (nblk) {
SCLogDebug("gap, want data at offset %"PRIu64", " SCLogDebug("gap, want data at offset %"PRIu64", "
@ -957,14 +957,14 @@ static inline bool CheckGap(TcpSession *ssn, TcpStream *stream, Packet *p)
* is beyond next_seq, we only consider it a gap now if we do * is beyond next_seq, we only consider it a gap now if we do
* already have data beyond the gap. */ * already have data beyond the gap. */
if (SEQ_GT(stream->last_ack, stream->next_seq)) { if (SEQ_GT(stream->last_ack, stream->next_seq)) {
if (stream->sb.block_list == NULL) { if (RB_EMPTY(&stream->sb.sbb_tree)) {
SCLogDebug("packet %"PRIu64": no GAP. " SCLogDebug("packet %"PRIu64": no GAP. "
"next_seq %u < last_ack %u, but no data in list", "next_seq %u < last_ack %u, but no data in list",
p->pcap_cnt, stream->next_seq, stream->last_ack); p->pcap_cnt, stream->next_seq, stream->last_ack);
return false; return false;
} else { } else {
const uint64_t next_seq_abs = STREAM_BASE_OFFSET(stream) + (stream->next_seq - stream->base_seq); const uint64_t next_seq_abs = STREAM_BASE_OFFSET(stream) + (stream->next_seq - stream->base_seq);
StreamingBufferBlock *blk = stream->sb.block_list; StreamingBufferBlock *blk = RB_MIN(SBB, &stream->sb.sbb_tree);
if (blk->offset > next_seq_abs && blk->offset < last_ack_abs) { if (blk->offset > next_seq_abs && blk->offset < last_ack_abs) {
/* ack'd data after the gap */ /* ack'd data after the gap */
SCLogDebug("packet %"PRIu64": GAP. " SCLogDebug("packet %"PRIu64": GAP. "
@ -1146,8 +1146,8 @@ static int GetRawBuffer(TcpStream *stream, const uint8_t **data, uint32_t *data_
{ {
const uint8_t *mydata; const uint8_t *mydata;
uint32_t mydata_len; uint32_t mydata_len;
if (stream->sb.block_list == NULL) { if (RB_EMPTY(&stream->sb.sbb_tree)) {
SCLogDebug("getting one blob"); SCLogDebug("getting one blob for offset %"PRIu64, offset);
uint64_t roffset = offset; uint64_t roffset = offset;
if (offset) if (offset)
@ -1160,29 +1160,24 @@ static int GetRawBuffer(TcpStream *stream, const uint8_t **data, uint32_t *data_
*data_len = mydata_len; *data_len = mydata_len;
*data_offset = roffset; *data_offset = roffset;
} else { } else {
if (*iter == NULL) SCLogDebug("multiblob %s. Want offset %"PRIu64,
*iter = stream->sb.block_list; *iter == NULL ? "starting" : "continuing", offset);
if (*iter == NULL) {
StreamingBufferBlock key = { .offset = offset, .len = 0 };
*iter = SBB_RB_FIND_INCLUSIVE(&stream->sb.sbb_tree, &key);
SCLogDebug("*iter %p", *iter);
}
if (*iter == NULL) { if (*iter == NULL) {
SCLogDebug("no data");
*data = NULL; *data = NULL;
*data_len = 0; *data_len = 0;
*data_offset = 0; *data_offset = 0;
return 0; return 0;
} }
SCLogDebug("getting multiple blobs. Iter %p, %"PRIu64"/%u", *iter, (*iter)->offset, (*iter)->len);
if (offset) {
while (*iter && ((*iter)->offset + (*iter)->len < offset))
*iter = (*iter)->next;
if (*iter == NULL) {
*data = NULL;
*data_len = 0;
*data_offset = 0;
return 0;
}
}
SCLogDebug("getting multiple blobs. Iter %p, %"PRIu64"/%u (next? %s)", *iter, (*iter)->offset, (*iter)->len, (*iter)->next ? "yes":"no");
StreamingBufferSBBGetData(&stream->sb, (*iter), &mydata, &mydata_len); StreamingBufferSBBGetData(&stream->sb, (*iter), &mydata, &mydata_len);
SCLogDebug("mydata %p", mydata);
if ((*iter)->offset < offset) { if ((*iter)->offset < offset) {
uint64_t delta = offset - (*iter)->offset; uint64_t delta = offset - (*iter)->offset;
@ -1191,6 +1186,7 @@ static int GetRawBuffer(TcpStream *stream, const uint8_t **data, uint32_t *data_
*data_len = mydata_len - delta; *data_len = mydata_len - delta;
*data_offset = offset; *data_offset = offset;
} else { } else {
SCLogDebug("no data (yet)");
*data = NULL; *data = NULL;
*data_len = 0; *data_len = 0;
*data_offset = 0; *data_offset = 0;
@ -1202,7 +1198,8 @@ static int GetRawBuffer(TcpStream *stream, const uint8_t **data, uint32_t *data_
*data_offset = (*iter)->offset; *data_offset = (*iter)->offset;
} }
*iter = (*iter)->next; *iter = SBB_RB_NEXT(*iter);
SCLogDebug("*iter %p", *iter);
} }
return 0; return 0;
} }
@ -1385,25 +1382,20 @@ static int StreamReassembleRawInline(TcpSession *ssn, const Packet *p,
/* simply return progress from the block we inspected. */ /* simply return progress from the block we inspected. */
bool return_progress = false; bool return_progress = false;
if (stream->sb.block_list == NULL) { if (RB_EMPTY(&stream->sb.sbb_tree)) {
/* continues block */ /* continues block */
StreamingBufferGetData(&stream->sb, &mydata, &mydata_len, &mydata_offset); StreamingBufferGetData(&stream->sb, &mydata, &mydata_len, &mydata_offset);
return_progress = true; return_progress = true;
} else { } else {
SCLogDebug("finding our SBB from offset %"PRIu64, packet_leftedge_abs);
/* find our block */ /* find our block */
StreamingBufferBlock *iter = stream->sb.block_list; StreamingBufferBlock key = { .offset = packet_leftedge_abs, .len = p->payload_len };
for ( ; iter != NULL; iter = iter->next) { StreamingBufferBlock *sbb = SBB_RB_FIND_INCLUSIVE(&stream->sb.sbb_tree, &key);
uint64_t iter_re_abs = iter->offset + iter->len; if (sbb) {
DEBUG_VALIDATE_BUG_ON(packet_leftedge_abs < iter->offset && SCLogDebug("found %p offset %"PRIu64" len %u", sbb, sbb->offset, sbb->len);
packet_rightedge_abs > iter->offset && StreamingBufferSBBGetData(&stream->sb, sbb, &mydata, &mydata_len);
packet_rightedge_abs < iter_re_abs); mydata_offset = sbb->offset;
if (iter->offset <= packet_leftedge_abs && iter_re_abs >= packet_rightedge_abs) {
StreamingBufferSBBGetData(&stream->sb, iter, &mydata, &mydata_len);
mydata_offset = iter->offset;
break;
}
} }
} }

@ -19,6 +19,7 @@
#include "util-streaming-buffer.h" #include "util-streaming-buffer.h"
#include "util-unittest.h" #include "util-unittest.h"
#include "util-print.h" #include "util-print.h"
#include "util-validate.h"
/** /**
* \file * \file
@ -41,6 +42,63 @@
static void SBBFree(StreamingBuffer *sb); static void SBBFree(StreamingBuffer *sb);
RB_GENERATE(SBB, StreamingBufferBlock, rb, SBBCompare);
int SBBCompare(struct StreamingBufferBlock *a, struct StreamingBufferBlock *b)
{
SCLogDebug("a %"PRIu64" len %u, b %"PRIu64" len %u",
a->offset, a->len, b->offset, b->len);
if (a->offset > b->offset)
SCReturnInt(1);
else if (a->offset < b->offset)
SCReturnInt(-1);
else {
if (a->len == 0 || b->len == 0 || a->len == b->len)
SCReturnInt(0);
else if (a->len > b->len)
SCReturnInt(1);
else
SCReturnInt(-1);
}
}
/* inclusive compare function that also considers the right edge,
* not just the offset. */
static inline int InclusiveCompare(StreamingBufferBlock *lookup, StreamingBufferBlock *intree) {
const uint64_t lre = lookup->offset + lookup->len;
const uint64_t tre = intree->offset + intree->len;
if (lre < intree->offset) // entirely before
return -1;
else if (lre >= intree->offset && lre <= tre) // (some) overlap
return 0;
else
return 1; // entirely after
}
StreamingBufferBlock *SBB_RB_FIND_INCLUSIVE(struct SBB *head, StreamingBufferBlock *elm)
{
SCLogDebug("looking up %"PRIu64, elm->offset);
struct StreamingBufferBlock *tmp = RB_ROOT(head);
struct StreamingBufferBlock *res = NULL;
while (tmp) {
SCLogDebug("compare with %"PRIu64"/%u", tmp->offset, tmp->len);
const int comp = InclusiveCompare(elm, tmp);
SCLogDebug("compare result: %d", comp);
if (comp < 0) {
res = tmp;
tmp = RB_LEFT(tmp, rb);
} else if (comp > 0) {
tmp = RB_RIGHT(tmp, rb);
} else {
return tmp;
}
}
return res;
}
static inline int InitBuffer(StreamingBuffer *sb) static inline int InitBuffer(StreamingBuffer *sb)
{ {
sb->buf = CALLOC(sb->cfg, 1, sb->cfg->buf_size); sb->buf = CALLOC(sb->cfg, 1, sb->cfg->buf_size);
@ -93,19 +151,18 @@ void StreamingBufferFree(StreamingBuffer *sb)
} }
#ifdef DEBUG #ifdef DEBUG
static void SBBPrintList(const StreamingBuffer *sb) static void SBBPrintList(StreamingBuffer *sb)
{ {
const StreamingBufferBlock *sbb = sb->block_list; StreamingBufferBlock *sbb = NULL;
while (sbb) { RB_FOREACH(sbb, SBB, &sb->sbb_tree) {
SCLogDebug("sbb: offset %"PRIu64", len %u", sbb->offset, sbb->len); SCLogDebug("sbb: offset %"PRIu64", len %u", sbb->offset, sbb->len);
if (sbb->next) { StreamingBufferBlock *next = SBB_RB_NEXT(sbb);
if ((sbb->offset + sbb->len) != sbb->next->offset) { if (next) {
if ((sbb->offset + sbb->len) != next->offset) {
SCLogDebug("gap: offset %"PRIu64", len %"PRIu64, (sbb->offset + sbb->len), SCLogDebug("gap: offset %"PRIu64", len %"PRIu64, (sbb->offset + sbb->len),
sbb->next->offset - (sbb->offset + sbb->len)); next->offset - (sbb->offset + sbb->len));
} }
} }
sbb = sbb->next;
} }
} }
#endif #endif
@ -117,8 +174,8 @@ static void SBBPrintList(const StreamingBuffer *sb)
static void SBBInit(StreamingBuffer *sb, static void SBBInit(StreamingBuffer *sb,
uint32_t rel_offset, uint32_t data_len) uint32_t rel_offset, uint32_t data_len)
{ {
BUG_ON(sb->block_list != NULL); DEBUG_VALIDATE_BUG_ON(!RB_EMPTY(&sb->sbb_tree));
BUG_ON(sb->buf_offset > sb->stream_offset + rel_offset); DEBUG_VALIDATE_BUG_ON(sb->buf_offset > sb->stream_offset + rel_offset);
/* need to set up 2: existing data block and new data block */ /* need to set up 2: existing data block and new data block */
StreamingBufferBlock *sbb = CALLOC(sb->cfg, 1, sizeof(*sbb)); StreamingBufferBlock *sbb = CALLOC(sb->cfg, 1, sizeof(*sbb));
@ -136,9 +193,8 @@ static void SBBInit(StreamingBuffer *sb,
sbb2->offset = sb->stream_offset + rel_offset; sbb2->offset = sb->stream_offset + rel_offset;
sbb2->len = data_len; sbb2->len = data_len;
sb->block_list = sbb; SBB_RB_INSERT(&sb->sbb_tree, sbb);
sbb->next = sbb2; SBB_RB_INSERT(&sb->sbb_tree, sbb2);
sb->block_list_tail = sbb2;
SCLogDebug("sbb1 %"PRIu64", len %u, sbb2 %"PRIu64", len %u", SCLogDebug("sbb1 %"PRIu64", len %u, sbb2 %"PRIu64", len %u",
sbb->offset, sbb->len, sbb2->offset, sbb2->len); sbb->offset, sbb->len, sbb2->offset, sbb2->len);
@ -155,7 +211,7 @@ static void SBBInit(StreamingBuffer *sb,
static void SBBInitLeadingGap(StreamingBuffer *sb, static void SBBInitLeadingGap(StreamingBuffer *sb,
uint64_t offset, uint32_t data_len) uint64_t offset, uint32_t data_len)
{ {
BUG_ON(sb->block_list != NULL); DEBUG_VALIDATE_BUG_ON(!RB_EMPTY(&sb->sbb_tree));
StreamingBufferBlock *sbb = CALLOC(sb->cfg, 1, sizeof(*sbb)); StreamingBufferBlock *sbb = CALLOC(sb->cfg, 1, sizeof(*sbb));
if (sbb == NULL) if (sbb == NULL)
@ -163,8 +219,7 @@ static void SBBInitLeadingGap(StreamingBuffer *sb,
sbb->offset = offset; sbb->offset = offset;
sbb->len = data_len; sbb->len = data_len;
sb->block_list = sbb; SBB_RB_INSERT(&sb->sbb_tree, sbb);
sb->block_list_tail = sbb;
SCLogDebug("sbb %"PRIu64", len %u", SCLogDebug("sbb %"PRIu64", len %u",
sbb->offset, sbb->len); sbb->offset, sbb->len);
@ -173,280 +228,164 @@ static void SBBInitLeadingGap(StreamingBuffer *sb,
#endif #endif
} }
static int IsBefore(StreamingBufferBlock *me, StreamingBufferBlock *you) static inline void ConsolidateFwd(StreamingBuffer *sb,
struct SBB *tree, StreamingBufferBlock *sa)
{ {
if ((me->offset + me->len) < you->offset) { uint64_t sa_re = sa->offset + sa->len;
return 1; StreamingBufferBlock *tr, *s = sa;
RB_FOREACH_FROM(tr, SBB, s) {
if (sa == tr)
continue;
const uint64_t tr_re = tr->offset + tr->len;
SCLogDebug("-> (fwd) tr %p %"PRIu64"/%u re %"PRIu64,
tr, tr->offset, tr->len, tr_re);
if (sa_re < tr->offset)
break; // entirely before
/*
sa: [ ]
tr: [ ]
sa: [ ]
tr: [ ]
sa: [ ]
tr: [ ]
*/
if (sa->offset >= tr->offset && sa_re <= tr_re) {
sa->len = tr->len;
sa->offset = tr->offset;
sa_re = sa->offset + sa->len;
SCLogDebug("-> (fwd) tr %p %"PRIu64"/%u REMOVED ECLIPSED2", tr, tr->offset, tr->len);
SBB_RB_REMOVE(tree, tr);
FREE(sb->cfg, tr, sizeof(StreamingBufferBlock));
/*
sa: [ ]
tr: [ ]
sa: [ ]
tr: [ ]
sa: [ ]
tr: [ ]
*/
} else if (sa->offset <= tr->offset && sa_re >= tr_re) {
SCLogDebug("-> (fwd) tr %p %"PRIu64"/%u REMOVED ECLIPSED", tr, tr->offset, tr->len);
SBB_RB_REMOVE(tree, tr);
FREE(sb->cfg, tr, sizeof(StreamingBufferBlock));
/*
sa: [ ]
tr: [ ]
sa: [ ]
tr: [ ]
*/
} else if (sa->offset < tr->offset && // starts before
sa_re >= tr->offset && sa_re < tr_re) // ends inside
{
// merge
sa->len = tr_re - sa->offset;
sa_re = sa->offset + sa->len;
SCLogDebug("-> (fwd) tr %p %"PRIu64"/%u REMOVED MERGED", tr, tr->offset, tr->len);
SBB_RB_REMOVE(tree, tr);
FREE(sb->cfg, tr, sizeof(StreamingBufferBlock));
}
} }
return 0;
}
static int StartsBefore(StreamingBufferBlock *me, StreamingBufferBlock *you)
{
if (me->offset < you->offset)
return 1;
return 0;
}
static int IsAfter(StreamingBufferBlock *me, StreamingBufferBlock *you)
{
if (you->offset + you->len < me->offset)
return 1;
return 0;
}
static int IsOverlappedBy(StreamingBufferBlock *me, StreamingBufferBlock *you)
{
if (you->offset <= me->offset && (you->offset + you->len) >= (me->offset + me->len))
return 1;
return 0;
}
static int EndsAfter(StreamingBufferBlock *me, StreamingBufferBlock *you)
{
if ((me->offset + me->len) > (you->offset + you->len))
return 1;
return 0;
}
static StreamingBufferBlock *GetNew(StreamingBuffer *sb,
uint64_t offset, uint32_t len,
StreamingBufferBlock *next)
{
StreamingBufferBlock *new_sbb = CALLOC(sb->cfg, 1, sizeof(*new_sbb));
if (new_sbb == NULL)
return NULL;
new_sbb->offset = offset;
new_sbb->len = len;
new_sbb->next = next;
return new_sbb;
} }
/* expand our sbb forward if possible */ static inline void ConsolidateBackward(StreamingBuffer *sb,
static int SBBUpdateLookForward(StreamingBuffer *sb, struct SBB *tree, StreamingBufferBlock *sa)
StreamingBufferBlock *sbb,
StreamingBufferBlock *my_block)
{ {
SCLogDebug("EndsAfter: consider next"); uint64_t sa_re = sa->offset + sa->len;
StreamingBufferBlock *tr, *s = sa;
while (sbb->offset + sbb->len == sbb->next->offset) RB_FOREACH_REVERSE_FROM(tr, SBB, s) {
{ if (sa == tr)
SCLogDebug("EndsAfter: gobble up next: %"PRIu64"/%u", sbb->next->offset, sbb->next->len); continue;
uint64_t right_edge = sbb->next->offset + sbb->next->len; const uint64_t tr_re = tr->offset + tr->len;
uint32_t expand_by = right_edge - (sbb->offset + sbb->len); SCLogDebug("-> (bwd) tr %p %"PRIu64"/%u", tr, tr->offset, tr->len);
sbb->len += expand_by;
SCLogDebug("EndsAfter: expand_by %u (part 2)", expand_by); if (sa->offset > tr_re)
SCLogDebug("EndsAfter: (loop) sbb now %"PRIu64"/%u", sbb->offset, sbb->len); break; // entirely after
/* we can gobble up next */ if (sa->offset >= tr->offset && sa_re <= tr_re) {
StreamingBufferBlock *to_free = sbb->next; sa->len = tr->len;
sbb->next = sbb->next->next; sa->offset = tr->offset;
FREE(sb->cfg, to_free, sizeof(StreamingBufferBlock)); sa_re = sa->offset + sa->len;
if (sbb->next == NULL) SCLogDebug("-> (bwd) tr %p %"PRIu64"/%u REMOVED ECLIPSED2", tr, tr->offset, tr->len);
sb->block_list_tail = sbb; SBB_RB_REMOVE(tree, tr);
FREE(sb->cfg, tr, sizeof(StreamingBufferBlock));
/* update my block */ /*
if (expand_by >= my_block->len) { sa: [ ]
return 1; tr: [ ]
} sa: [ ]
tr: [ ]
my_block->len -= expand_by; sa: [ ]
my_block->offset += expand_by; tr: [ ]
*/
if (sbb->next == NULL) { } else if (sa->offset <= tr->offset && sa_re >= tr_re) {
/* if we have nothing left in the list we're almost done, SCLogDebug("-> (bwd) tr %p %"PRIu64"/%u REMOVED ECLIPSED", tr, tr->offset, tr->len);
* except we need to check if we have some of our block SBB_RB_REMOVE(tree, tr);
* left */ FREE(sb->cfg, tr, sizeof(StreamingBufferBlock));
sbb->len += my_block->len; /*
my_block->offset += my_block->len; sa: [ ]
my_block->len = 0; tr: [ ]
return 1; sa: [ ]
} else { tr: [ ]
/* if next is not directly connected and we have some */
* block len left, expand sbb further */ } else if (sa->offset > tr->offset && sa_re > tr_re && sa->offset <= tr_re) {
uint32_t gap = sbb->next->offset - (sbb->offset + sbb->len); // merge
SCLogDebug("EndsAfter: we now have a gap of %u and a block of %"PRIu64"/%u", gap, my_block->offset, my_block->len); sa->len = sa_re - tr->offset;
sa->offset = tr->offset;
if (my_block->len < gap) { sa_re = sa->offset + sa->len;
sbb->len += my_block->len; SCLogDebug("-> (bwd) tr %p %"PRIu64"/%u REMOVED MERGED", tr, tr->offset, tr->len);
my_block->offset += my_block->len; SBB_RB_REMOVE(tree, tr);
my_block->len = 0; FREE(sb->cfg, tr, sizeof(StreamingBufferBlock));
return 1;
} else {
sbb->len += gap;
my_block->offset += gap;
my_block->len -= gap;
SCLogDebug("EndsAfter: (loop) block at %"PRIu64"/%u, sbb %"PRIu64"/%u", my_block->offset, my_block->len, sbb->offset, sbb->len);
SCLogDebug("EndsAfter: (loop) sbb->next %"PRIu64"/%u", sbb->next->offset, sbb->next->len);
}
} }
} }
return 0;
} }
static void SBBUpdate(StreamingBuffer *sb, static int Insert(StreamingBuffer *sb, struct SBB *tree,
uint32_t rel_offset, uint32_t data_len) uint32_t rel_offset, uint32_t len)
{ {
StreamingBufferBlock my_block = { .offset = sb->stream_offset + rel_offset, SCLogDebug("* inserting: %u/%u\n", rel_offset, len);
.len = data_len,
.next = NULL };
const uint64_t my_block_right_edge = my_block.offset + my_block.len;
StreamingBufferBlock *tail = sb->block_list_tail; StreamingBufferBlock *sbb = CALLOC(sb->cfg, 1, sizeof(*sbb));
if (sbb == NULL)
/* fast path 1: expands tail */ return -1;
if (tail && ((tail->offset + tail->len) == my_block.offset)) sbb->offset = sb->stream_offset + rel_offset;
{ sbb->len = len;
tail->len = my_block_right_edge - tail->offset; StreamingBufferBlock *res = SBB_RB_INSERT(tree, sbb);
goto done; if (res) {
} // exact overlap
/* fast path 2: new isolated block after tail */ SCLogDebug("* insert failed: exact match in tree with %p %"PRIu64"/%u", res, res->offset, res->len);
else if (tail && IsAfter(&my_block, tail)) { FREE(sb->cfg, sbb, sizeof(StreamingBufferBlock));
StreamingBufferBlock *new_sbb = GetNew(sb, my_block.offset, my_block.len, NULL); return 0;
sb->block_list_tail = tail->next = new_sbb;
SCLogDebug("tail: new block at %"PRIu64"/%u", my_block.offset, my_block.len);
goto done;
} }
ConsolidateBackward(sb, tree, sbb);
BUG_ON(sb->block_list == NULL); ConsolidateFwd(sb, tree, sbb);
#ifdef DEBUG #ifdef DEBUG
SBBPrintList(sb); SBBPrintList(sb);
#endif #endif
SCLogDebug("PreInsert: block at %"PRIu64"/%u", my_block.offset, my_block.len); return 0;
StreamingBufferBlock *sbb = sb->block_list, *prev = NULL; }
while (sbb) {
SCLogDebug("sbb %"PRIu64"/%u data %"PRIu64"/%u. Next %s", sbb->offset, sbb->len,
my_block.offset, my_block.len, sbb->next ? "true" : "false");
if (IsBefore(&my_block, sbb)) {
StreamingBufferBlock *new_sbb = GetNew(sb, my_block.offset, my_block.len, sbb);
/* place before, maybe replace list head */
if (prev == NULL) {
sb->block_list = new_sbb;
} else {
prev->next = new_sbb;
}
SCLogDebug("IsBefore: new block at %"PRIu64"/%u", my_block.offset, my_block.len);
break;
} else if (IsOverlappedBy(&my_block, sbb)) {
/* nothing to do */
SCLogDebug("IsOverlappedBy: overlapped block at %"PRIu64"/%u", my_block.offset, my_block.len);
break;
} else if (IsAfter(&my_block, sbb)) {
/* if no next, place after, otherwise, iterate */
if (sbb->next == NULL) {
StreamingBufferBlock *new_sbb = GetNew(sb, my_block.offset, my_block.len, NULL);
sbb->next = new_sbb;
sb->block_list_tail = new_sbb;
SCLogDebug("new block at %"PRIu64"/%u", my_block.offset, my_block.len);
break;
}
SCLogDebug("IsAfter: block at %"PRIu64"/%u, is after sbb", my_block.offset, my_block.len);
} else {
/* those were the simple cases */
if (StartsBefore(&my_block, sbb)) {
/* expand sbb */
uint32_t expand_by = sbb->offset - my_block.offset;
SCLogDebug("StartsBefore: expand_by %u", expand_by);
sbb->offset = my_block.offset;
sbb->len += expand_by;
/* if my_block ends before sbb right edge, we are done */
if (my_block_right_edge <= (sbb->offset + sbb->len))
break;
my_block.offset = sbb->offset + sbb->len;
my_block.len = my_block_right_edge - my_block.offset;
SCLogDebug("StartsBefore: block now %"PRIu64"/%u", my_block.offset, my_block.len);
if (sbb->next == NULL) {
sbb->len += my_block.len;
break;
}
/* expand, but consider next */
uint64_t right_edge = my_block_right_edge;
if (right_edge > sbb->next->offset) {
right_edge = sbb->next->offset;
}
expand_by = right_edge - (sbb->offset + sbb->len);
SCLogDebug("EndsAfter: expand_by %u", expand_by);
sbb->len += expand_by;
SCLogDebug("EndsAfter: sbb now %"PRIu64"/%u", sbb->offset, sbb->len);
my_block.offset = sbb->offset + sbb->len;
my_block.len = my_block_right_edge - my_block.offset;
SCLogDebug("StartsBefore: sbb now %"PRIu64"/%u", sbb->offset, sbb->len);
} else if (EndsAfter(&my_block, sbb)) {
/* expand sbb, but we need to mind "next" */
if (sbb->next == NULL) {
/* last, so just expand sbb */
sbb->len = my_block_right_edge - sbb->offset;
break;
}
/* expand, but consider next */
uint64_t right_edge = my_block_right_edge;
if (right_edge > sbb->next->offset) {
right_edge = sbb->next->offset;
}
uint32_t expand_by = right_edge - (sbb->offset + sbb->len);
SCLogDebug("EndsAfter: expand_by %u", expand_by);
sbb->len += expand_by;
SCLogDebug("EndsAfter: sbb now %"PRIu64"/%u", sbb->offset, sbb->len);
my_block.offset = sbb->offset + sbb->len;
my_block.len = my_block_right_edge - my_block.offset;
}
if (sbb->next != NULL) {
SCLogDebug("EndsAfter: consider next");
if (SBBUpdateLookForward(sb, sbb, &my_block) == 1)
goto done;
}
SCLogDebug("EndsAfter: block at %"PRIu64"/%u, is after sbb", my_block.offset, my_block.len);
if (my_block.len == 0) static void SBBUpdate(StreamingBuffer *sb,
break; uint32_t rel_offset, uint32_t data_len)
} {
prev = sbb; Insert(sb, &sb->sbb_tree, rel_offset, data_len);
sbb = sbb->next;
}
done:
SCLogDebug("PostInsert: block at %"PRIu64"/%u", my_block.offset, my_block.len);
SCLogDebug("PostInsert");
#ifdef DEBUG
SBBPrintList(sb);
#endif
} }
static void SBBFree(StreamingBuffer *sb) static void SBBFree(StreamingBuffer *sb)
{ {
StreamingBufferBlock *sbb = sb->block_list; StreamingBufferBlock *sbb = NULL, *safe = NULL;
while (sbb) { RB_FOREACH_SAFE(sbb, SBB, &sb->sbb_tree, safe) {
StreamingBufferBlock *next = sbb->next; SBB_RB_REMOVE(&sb->sbb_tree, sbb);
FREE(sb->cfg, sbb, sizeof(StreamingBufferBlock)); FREE(sb->cfg, sbb, sizeof(StreamingBufferBlock));
sbb = next;
} }
sb->block_list = NULL;
} }
static void SBBPrune(StreamingBuffer *sb) static void SBBPrune(StreamingBuffer *sb)
{ {
StreamingBufferBlock *sbb = sb->block_list; SCLogDebug("pruning %p to %"PRIu64, sb, sb->stream_offset);
while (sbb) { StreamingBufferBlock *sbb = NULL, *safe = NULL;
RB_FOREACH_SAFE(sbb, SBB, &sb->sbb_tree, safe) {
/* completely beyond window, we're done */ /* completely beyond window, we're done */
if (sbb->offset > sb->stream_offset) if (sbb->offset > sb->stream_offset)
break; break;
@ -455,20 +394,18 @@ static void SBBPrune(StreamingBuffer *sb)
if (sbb->offset < sb->stream_offset && if (sbb->offset < sb->stream_offset &&
sbb->offset + sbb->len > sb->stream_offset) { sbb->offset + sbb->len > sb->stream_offset) {
uint32_t shrink_by = sb->stream_offset - sbb->offset; uint32_t shrink_by = sb->stream_offset - sbb->offset;
BUG_ON(shrink_by > sbb->len); DEBUG_VALIDATE_BUG_ON(shrink_by > sbb->len);
sbb->len -= shrink_by; if (sbb->len >= shrink_by) {
sbb->offset += shrink_by; sbb->len -= shrink_by;
BUG_ON(sbb->offset != sb->stream_offset); sbb->offset += shrink_by;
DEBUG_VALIDATE_BUG_ON(sbb->offset != sb->stream_offset);
}
break; break;
} }
StreamingBufferBlock *next = sbb->next; SBB_RB_REMOVE(&sb->sbb_tree, sbb);
SCLogDebug("sb %p removed %p %"PRIu64", %u", sb, sbb, sbb->offset, sbb->len);
FREE(sb->cfg, sbb, sizeof(StreamingBufferBlock)); FREE(sb->cfg, sbb, sizeof(StreamingBufferBlock));
sbb = next;
sb->block_list = next;
if (sbb && sbb->next == NULL)
sb->block_list_tail = NULL;
} }
} }
@ -610,7 +547,7 @@ StreamingBufferSegment *StreamingBufferAppendRaw(StreamingBuffer *sb, const uint
uint32_t rel_offset = sb->buf_offset; uint32_t rel_offset = sb->buf_offset;
sb->buf_offset += data_len; sb->buf_offset += data_len;
if (sb->block_list) { if (!RB_EMPTY(&sb->sbb_tree)) {
SBBUpdate(sb, rel_offset, data_len); SBBUpdate(sb, rel_offset, data_len);
} }
return seg; return seg;
@ -652,7 +589,7 @@ int StreamingBufferAppend(StreamingBuffer *sb, StreamingBufferSegment *seg,
uint32_t rel_offset = sb->buf_offset; uint32_t rel_offset = sb->buf_offset;
sb->buf_offset += data_len; sb->buf_offset += data_len;
if (sb->block_list) { if (!RB_EMPTY(&sb->sbb_tree)) {
SBBUpdate(sb, rel_offset, data_len); SBBUpdate(sb, rel_offset, data_len);
} }
return 0; return 0;
@ -691,7 +628,7 @@ int StreamingBufferAppendNoTrack(StreamingBuffer *sb,
uint32_t rel_offset = sb->buf_offset; uint32_t rel_offset = sb->buf_offset;
sb->buf_offset += data_len; sb->buf_offset += data_len;
if (sb->block_list) { if (!RB_EMPTY(&sb->sbb_tree)) {
SBBUpdate(sb, rel_offset, data_len); SBBUpdate(sb, rel_offset, data_len);
} }
return 0; return 0;
@ -739,7 +676,7 @@ int StreamingBufferInsertAt(StreamingBuffer *sb, StreamingBufferSegment *seg,
SCLogDebug("rel_offset %u sb->stream_offset %"PRIu64", buf_offset %u", SCLogDebug("rel_offset %u sb->stream_offset %"PRIu64", buf_offset %u",
rel_offset, sb->stream_offset, sb->buf_offset); rel_offset, sb->stream_offset, sb->buf_offset);
if (sb->block_list == NULL) { if (RB_EMPTY(&sb->sbb_tree)) {
SCLogDebug("empty sbb list"); SCLogDebug("empty sbb list");
if (sb->stream_offset == offset) { if (sb->stream_offset == offset) {
@ -1145,7 +1082,7 @@ static int StreamingBufferTest04(void)
StreamingBufferSegment seg1; StreamingBufferSegment seg1;
FAIL_IF(StreamingBufferAppend(sb, &seg1, (const uint8_t *)"ABCDEFGH", 8) != 0); FAIL_IF(StreamingBufferAppend(sb, &seg1, (const uint8_t *)"ABCDEFGH", 8) != 0);
FAIL_IF(sb->block_list != NULL); FAIL_IF(!RB_EMPTY(&sb->sbb_tree));
StreamingBufferSegment seg2; StreamingBufferSegment seg2;
FAIL_IF(StreamingBufferInsertAt(sb, &seg2, (const uint8_t *)"01234567", 8, 14) != 0); FAIL_IF(StreamingBufferInsertAt(sb, &seg2, (const uint8_t *)"01234567", 8, 14) != 0);
FAIL_IF(sb->stream_offset != 0); FAIL_IF(sb->stream_offset != 0);
@ -1154,12 +1091,15 @@ static int StreamingBufferTest04(void)
FAIL_IF(seg2.stream_offset != 14); FAIL_IF(seg2.stream_offset != 14);
FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg1)); FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg1));
FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg2)); FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg2));
FAIL_IF(sb->block_list == NULL); FAIL_IF(RB_EMPTY(&sb->sbb_tree));
FAIL_IF(sb->block_list->offset != 0); StreamingBufferBlock *sbb1 = RB_MIN(SBB, &sb->sbb_tree);
FAIL_IF(sb->block_list->len != 8); FAIL_IF_NULL(sbb1);
FAIL_IF(sb->block_list->next == NULL); FAIL_IF(sbb1->offset != 0);
FAIL_IF(sb->block_list->next->offset != 14); FAIL_IF(sbb1->len != 8);
FAIL_IF(sb->block_list->next->len != 8); StreamingBufferBlock *sbb2 = SBB_RB_NEXT(sbb1);
FAIL_IF_NULL(sbb2);
FAIL_IF(sbb2->offset != 14);
FAIL_IF(sbb2->len != 8);
Dump(sb); Dump(sb);
DumpSegment(sb, &seg1); DumpSegment(sb, &seg1);
DumpSegment(sb, &seg2); DumpSegment(sb, &seg2);
@ -1172,10 +1112,11 @@ static int StreamingBufferTest04(void)
FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg1)); FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg1));
FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg2)); FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg2));
FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg3)); FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg3));
FAIL_IF(sb->block_list == NULL); sbb1 = RB_MIN(SBB, &sb->sbb_tree);
FAIL_IF(sb->block_list->offset != 0); FAIL_IF_NULL(sbb1);
FAIL_IF(sb->block_list->len != 22); FAIL_IF(sbb1->offset != 0);
FAIL_IF(sb->block_list->next != NULL); FAIL_IF(sbb1->len != 22);
FAIL_IF(SBB_RB_NEXT(sbb1));
Dump(sb); Dump(sb);
DumpSegment(sb, &seg1); DumpSegment(sb, &seg1);
DumpSegment(sb, &seg2); DumpSegment(sb, &seg2);
@ -1192,10 +1133,11 @@ static int StreamingBufferTest04(void)
FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg2)); FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg2));
FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg3)); FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg3));
FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg4)); FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg4));
FAIL_IF(sb->block_list == NULL); sbb1 = RB_MIN(SBB, &sb->sbb_tree);
FAIL_IF(sb->block_list->offset != 0); FAIL_IF_NULL(sbb1);
FAIL_IF(sb->block_list->len != 22); FAIL_IF(sbb1->offset != 0);
FAIL_IF(sb->block_list->next == NULL); FAIL_IF(sbb1->len != 22);
FAIL_IF(!SBB_RB_NEXT(sbb1));
Dump(sb); Dump(sb);
DumpSegment(sb, &seg1); DumpSegment(sb, &seg1);
DumpSegment(sb, &seg2); DumpSegment(sb, &seg2);
@ -1272,18 +1214,20 @@ static int StreamingBufferTest06(void)
StreamingBufferSegment seg5; StreamingBufferSegment seg5;
FAIL_IF(StreamingBufferInsertAt(sb, &seg5, (const uint8_t *)"ABCDEFGHIJ", 10, 0) != 0); FAIL_IF(StreamingBufferInsertAt(sb, &seg5, (const uint8_t *)"ABCDEFGHIJ", 10, 0) != 0);
Dump(sb); Dump(sb);
FAIL_IF(sb->block_list == NULL); StreamingBufferBlock *sbb1 = RB_MIN(SBB, &sb->sbb_tree);
FAIL_IF(sb->block_list->offset != 0); FAIL_IF_NULL(sbb1);
FAIL_IF(sb->block_list->len != 10); FAIL_IF(sbb1->offset != 0);
FAIL_IF(sb->block_list->next != NULL); FAIL_IF(sbb1->len != 10);
FAIL_IF(SBB_RB_NEXT(sbb1));
StreamingBufferSegment seg6; StreamingBufferSegment seg6;
FAIL_IF(StreamingBufferInsertAt(sb, &seg6, (const uint8_t *)"abcdefghij", 10, 0) != 0); FAIL_IF(StreamingBufferInsertAt(sb, &seg6, (const uint8_t *)"abcdefghij", 10, 0) != 0);
Dump(sb); Dump(sb);
FAIL_IF(sb->block_list == NULL); sbb1 = RB_MIN(SBB, &sb->sbb_tree);
FAIL_IF(sb->block_list->offset != 0); FAIL_IF_NULL(sbb1);
FAIL_IF(sb->block_list->len != 10); FAIL_IF(sbb1->offset != 0);
FAIL_IF(sb->block_list->next != NULL); FAIL_IF(sbb1->len != 10);
FAIL_IF(SBB_RB_NEXT(sbb1));
StreamingBufferFree(sb); StreamingBufferFree(sb);
PASS; PASS;
@ -1313,18 +1257,20 @@ static int StreamingBufferTest07(void)
StreamingBufferSegment seg5; StreamingBufferSegment seg5;
FAIL_IF(StreamingBufferInsertAt(sb, &seg5, (const uint8_t *)"ABCDEFGHIJ", 10, 0) != 0); FAIL_IF(StreamingBufferInsertAt(sb, &seg5, (const uint8_t *)"ABCDEFGHIJ", 10, 0) != 0);
Dump(sb); Dump(sb);
FAIL_IF(sb->block_list == NULL); StreamingBufferBlock *sbb1 = RB_MIN(SBB, &sb->sbb_tree);
FAIL_IF(sb->block_list->offset != 0); FAIL_IF_NULL(sbb1);
FAIL_IF(sb->block_list->len != 10); FAIL_IF(sbb1->offset != 0);
FAIL_IF(sb->block_list->next != NULL); FAIL_IF(sbb1->len != 10);
FAIL_IF(SBB_RB_NEXT(sbb1));
StreamingBufferSegment seg6; StreamingBufferSegment seg6;
FAIL_IF(StreamingBufferInsertAt(sb, &seg6, (const uint8_t *)"abcdefghij", 10, 0) != 0); FAIL_IF(StreamingBufferInsertAt(sb, &seg6, (const uint8_t *)"abcdefghij", 10, 0) != 0);
Dump(sb); Dump(sb);
FAIL_IF(sb->block_list == NULL); sbb1 = RB_MIN(SBB, &sb->sbb_tree);
FAIL_IF(sb->block_list->offset != 0); FAIL_IF_NULL(sbb1);
FAIL_IF(sb->block_list->len != 10); FAIL_IF(sbb1->offset != 0);
FAIL_IF(sb->block_list->next != NULL); FAIL_IF(sbb1->len != 10);
FAIL_IF(SBB_RB_NEXT(sbb1));
StreamingBufferFree(sb); StreamingBufferFree(sb);
PASS; PASS;
@ -1354,18 +1300,20 @@ static int StreamingBufferTest08(void)
StreamingBufferSegment seg5; StreamingBufferSegment seg5;
FAIL_IF(StreamingBufferInsertAt(sb, &seg5, (const uint8_t *)"ABCDEFGHIJ", 10, 0) != 0); FAIL_IF(StreamingBufferInsertAt(sb, &seg5, (const uint8_t *)"ABCDEFGHIJ", 10, 0) != 0);
Dump(sb); Dump(sb);
FAIL_IF(sb->block_list == NULL); StreamingBufferBlock *sbb1 = RB_MIN(SBB, &sb->sbb_tree);
FAIL_IF(sb->block_list->offset != 0); FAIL_IF_NULL(sbb1);
FAIL_IF(sb->block_list->len != 10); FAIL_IF(sbb1->offset != 0);
FAIL_IF(sb->block_list->next != NULL); FAIL_IF(sbb1->len != 10);
FAIL_IF(SBB_RB_NEXT(sbb1));
StreamingBufferSegment seg6; StreamingBufferSegment seg6;
FAIL_IF(StreamingBufferAppend(sb, &seg6, (const uint8_t *)"abcdefghij", 10) != 0); FAIL_IF(StreamingBufferAppend(sb, &seg6, (const uint8_t *)"abcdefghij", 10) != 0);
Dump(sb); Dump(sb);
FAIL_IF(sb->block_list == NULL); sbb1 = RB_MIN(SBB, &sb->sbb_tree);
FAIL_IF(sb->block_list->offset != 0); FAIL_IF_NULL(sbb1);
FAIL_IF(sb->block_list->len != 20); FAIL_IF(sbb1->offset != 0);
FAIL_IF(sb->block_list->next != NULL); FAIL_IF(sbb1->len != 20);
FAIL_IF(SBB_RB_NEXT(sbb1));
StreamingBufferFree(sb); StreamingBufferFree(sb);
PASS; PASS;
@ -1395,18 +1343,20 @@ static int StreamingBufferTest09(void)
StreamingBufferSegment seg5; StreamingBufferSegment seg5;
FAIL_IF(StreamingBufferInsertAt(sb, &seg5, (const uint8_t *)"ABCDEFGHIJ", 10, 0) != 0); FAIL_IF(StreamingBufferInsertAt(sb, &seg5, (const uint8_t *)"ABCDEFGHIJ", 10, 0) != 0);
Dump(sb); Dump(sb);
FAIL_IF(sb->block_list == NULL); StreamingBufferBlock *sbb1 = RB_MIN(SBB, &sb->sbb_tree);
FAIL_IF(sb->block_list->offset != 0); FAIL_IF_NULL(sbb1);
FAIL_IF(sb->block_list->len != 10); FAIL_IF(sbb1->offset != 0);
FAIL_IF(sb->block_list->next != NULL); FAIL_IF(sbb1->len != 10);
FAIL_IF(SBB_RB_NEXT(sbb1));
StreamingBufferSegment seg6; StreamingBufferSegment seg6;
FAIL_IF(StreamingBufferInsertAt(sb, &seg6, (const uint8_t *)"abcdefghij", 10, 0) != 0); FAIL_IF(StreamingBufferInsertAt(sb, &seg6, (const uint8_t *)"abcdefghij", 10, 0) != 0);
Dump(sb); Dump(sb);
FAIL_IF(sb->block_list == NULL); sbb1 = RB_MIN(SBB, &sb->sbb_tree);
FAIL_IF(sb->block_list->offset != 0); FAIL_IF_NULL(sbb1);
FAIL_IF(sb->block_list->len != 10); FAIL_IF(sbb1->offset != 0);
FAIL_IF(sb->block_list->next != NULL); FAIL_IF(sbb1->len != 10);
FAIL_IF(SBB_RB_NEXT(sbb1));
StreamingBufferFree(sb); StreamingBufferFree(sb);
PASS; PASS;
@ -1421,10 +1371,10 @@ static int StreamingBufferTest10(void)
StreamingBufferSegment seg1; StreamingBufferSegment seg1;
FAIL_IF(StreamingBufferInsertAt(sb, &seg1, (const uint8_t *)"A", 1, 0) != 0); FAIL_IF(StreamingBufferInsertAt(sb, &seg1, (const uint8_t *)"A", 1, 0) != 0);
Dump(sb);
StreamingBufferSegment seg2; StreamingBufferSegment seg2;
FAIL_IF(StreamingBufferInsertAt(sb, &seg2, (const uint8_t *)"D", 1, 3) != 0); FAIL_IF(StreamingBufferInsertAt(sb, &seg2, (const uint8_t *)"D", 1, 3) != 0);
Dump(sb); Dump(sb);
StreamingBufferSegment seg3; StreamingBufferSegment seg3;
FAIL_IF(StreamingBufferInsertAt(sb, &seg3, (const uint8_t *)"H", 1, 7) != 0); FAIL_IF(StreamingBufferInsertAt(sb, &seg3, (const uint8_t *)"H", 1, 7) != 0);
Dump(sb); Dump(sb);
@ -1442,18 +1392,20 @@ static int StreamingBufferTest10(void)
StreamingBufferSegment seg7; StreamingBufferSegment seg7;
FAIL_IF(StreamingBufferInsertAt(sb, &seg7, (const uint8_t *)"ABCDEFGHIJ", 10, 0) != 0); FAIL_IF(StreamingBufferInsertAt(sb, &seg7, (const uint8_t *)"ABCDEFGHIJ", 10, 0) != 0);
Dump(sb); Dump(sb);
FAIL_IF(sb->block_list == NULL); StreamingBufferBlock *sbb1 = RB_MIN(SBB, &sb->sbb_tree);
FAIL_IF(sb->block_list->offset != 0); FAIL_IF_NULL(sbb1);
FAIL_IF(sb->block_list->len != 10); FAIL_IF(sbb1->offset != 0);
FAIL_IF(sb->block_list->next != NULL); FAIL_IF(sbb1->len != 10);
FAIL_IF(SBB_RB_NEXT(sbb1));
StreamingBufferSegment seg8; StreamingBufferSegment seg8;
FAIL_IF(StreamingBufferInsertAt(sb, &seg8, (const uint8_t *)"abcdefghij", 10, 0) != 0); FAIL_IF(StreamingBufferInsertAt(sb, &seg8, (const uint8_t *)"abcdefghij", 10, 0) != 0);
Dump(sb); Dump(sb);
FAIL_IF(sb->block_list == NULL); sbb1 = RB_MIN(SBB, &sb->sbb_tree);
FAIL_IF(sb->block_list->offset != 0); FAIL_IF_NULL(sbb1);
FAIL_IF(sb->block_list->len != 10); FAIL_IF(sbb1->offset != 0);
FAIL_IF(sb->block_list->next != NULL); FAIL_IF(sbb1->len != 10);
FAIL_IF(SBB_RB_NEXT(sbb1));
StreamingBufferFree(sb); StreamingBufferFree(sb);
PASS; PASS;

@ -59,6 +59,8 @@
#ifndef __UTIL_STREAMING_BUFFER_H__ #ifndef __UTIL_STREAMING_BUFFER_H__
#define __UTIL_STREAMING_BUFFER_H__ #define __UTIL_STREAMING_BUFFER_H__
#include "tree.h"
#define STREAMING_BUFFER_NOFLAGS 0 #define STREAMING_BUFFER_NOFLAGS 0
#define STREAMING_BUFFER_AUTOSLIDE (1<<0) #define STREAMING_BUFFER_AUTOSLIDE (1<<0)
@ -77,11 +79,18 @@ typedef struct StreamingBufferConfig_ {
/** /**
* \brief block of continues data * \brief block of continues data
*/ */
typedef struct StreamingBufferBlock_ { typedef struct StreamingBufferBlock {
uint64_t offset; uint64_t offset;
RB_ENTRY(StreamingBufferBlock) rb;
uint32_t len; uint32_t len;
struct StreamingBufferBlock_ *next; } __attribute__((__packed__)) StreamingBufferBlock;
} StreamingBufferBlock;
int SBBCompare(struct StreamingBufferBlock *a, struct StreamingBufferBlock *b);
/* red-black tree prototype for SACK records */
RB_HEAD(SBB, StreamingBufferBlock);
RB_PROTOTYPE(SBB, StreamingBufferBlock, rb, SBBCompare);
StreamingBufferBlock *SBB_RB_FIND_INCLUSIVE(struct SBB *head, StreamingBufferBlock *elm);
typedef struct StreamingBuffer_ { typedef struct StreamingBuffer_ {
const StreamingBufferConfig *cfg; const StreamingBufferConfig *cfg;
@ -91,17 +100,16 @@ typedef struct StreamingBuffer_ {
uint32_t buf_size; /**< size of memory block */ uint32_t buf_size; /**< size of memory block */
uint32_t buf_offset; /**< how far we are in buf_size */ uint32_t buf_offset; /**< how far we are in buf_size */
StreamingBufferBlock *block_list; struct SBB sbb_tree; /**< red black tree of Stream Buffer Blocks */
StreamingBufferBlock *block_list_tail;
#ifdef DEBUG #ifdef DEBUG
uint32_t buf_size_max; uint32_t buf_size_max;
#endif #endif
} StreamingBuffer; } StreamingBuffer;
#ifndef DEBUG #ifndef DEBUG
#define STREAMING_BUFFER_INITIALIZER(cfg) { (cfg), 0, NULL, 0, 0, NULL, NULL}; #define STREAMING_BUFFER_INITIALIZER(cfg) { (cfg), 0, NULL, 0, 0, { NULL }, };
#else #else
#define STREAMING_BUFFER_INITIALIZER(cfg) { (cfg), 0, NULL, 0, 0, NULL, NULL, 0 }; #define STREAMING_BUFFER_INITIALIZER(cfg) { (cfg), 0, NULL, 0, 0, { NULL }, 0 };
#endif #endif
typedef struct StreamingBufferSegment_ { typedef struct StreamingBufferSegment_ {

Loading…
Cancel
Save