diff --git a/src/stream-tcp-reassemble.c b/src/stream-tcp-reassemble.c index 2ed65348eb..08a0e25961 100644 --- a/src/stream-tcp-reassemble.c +++ b/src/stream-tcp-reassemble.c @@ -888,7 +888,7 @@ static void GetAppBuffer(TcpStream *stream, const uint8_t **data, uint32_t *data const uint8_t *mydata; uint32_t mydata_len; - if (stream->sb.block_list == NULL) { + if (RB_EMPTY(&stream->sb.sbb_tree)) { SCLogDebug("getting one blob"); StreamingBufferGetDataAtOffset(&stream->sb, &mydata, &mydata_len, offset); @@ -896,7 +896,7 @@ static void GetAppBuffer(TcpStream *stream, const uint8_t **data, uint32_t *data *data = mydata; *data_len = mydata_len; } else { - StreamingBufferBlock *blk = stream->sb.block_list; + StreamingBufferBlock *blk = RB_MIN(SBB, &stream->sb.sbb_tree); if (blk->offset > offset) { SCLogDebug("gap, want data at offset %"PRIu64", " @@ -908,7 +908,7 @@ static void GetAppBuffer(TcpStream *stream, const uint8_t **data, uint32_t *data } else if (offset >= (blk->offset + blk->len)) { *data = NULL; - StreamingBufferBlock *nblk = blk->next; + StreamingBufferBlock *nblk = SBB_RB_NEXT(blk); *data_len = nblk ? nblk->offset - offset : 0; if (nblk) { SCLogDebug("gap, want data at offset %"PRIu64", " @@ -957,14 +957,14 @@ static inline bool CheckGap(TcpSession *ssn, TcpStream *stream, Packet *p) * is beyond next_seq, we only consider it a gap now if we do * already have data beyond the gap. */ if (SEQ_GT(stream->last_ack, stream->next_seq)) { - if (stream->sb.block_list == NULL) { + if (RB_EMPTY(&stream->sb.sbb_tree)) { SCLogDebug("packet %"PRIu64": no GAP. " "next_seq %u < last_ack %u, but no data in list", p->pcap_cnt, stream->next_seq, stream->last_ack); return false; } else { const uint64_t next_seq_abs = STREAM_BASE_OFFSET(stream) + (stream->next_seq - stream->base_seq); - StreamingBufferBlock *blk = stream->sb.block_list; + StreamingBufferBlock *blk = RB_MIN(SBB, &stream->sb.sbb_tree); if (blk->offset > next_seq_abs && blk->offset < last_ack_abs) { /* ack'd data after the gap */ SCLogDebug("packet %"PRIu64": GAP. " @@ -1146,8 +1146,8 @@ static int GetRawBuffer(TcpStream *stream, const uint8_t **data, uint32_t *data_ { const uint8_t *mydata; uint32_t mydata_len; - if (stream->sb.block_list == NULL) { - SCLogDebug("getting one blob"); + if (RB_EMPTY(&stream->sb.sbb_tree)) { + SCLogDebug("getting one blob for offset %"PRIu64, offset); uint64_t roffset = offset; if (offset) @@ -1160,29 +1160,24 @@ static int GetRawBuffer(TcpStream *stream, const uint8_t **data, uint32_t *data_ *data_len = mydata_len; *data_offset = roffset; } else { - if (*iter == NULL) - *iter = stream->sb.block_list; + SCLogDebug("multiblob %s. Want offset %"PRIu64, + *iter == NULL ? "starting" : "continuing", offset); + if (*iter == NULL) { + StreamingBufferBlock key = { .offset = offset, .len = 0 }; + *iter = SBB_RB_FIND_INCLUSIVE(&stream->sb.sbb_tree, &key); + SCLogDebug("*iter %p", *iter); + } if (*iter == NULL) { + SCLogDebug("no data"); *data = NULL; *data_len = 0; *data_offset = 0; return 0; } - - if (offset) { - while (*iter && ((*iter)->offset + (*iter)->len < offset)) - *iter = (*iter)->next; - if (*iter == NULL) { - *data = NULL; - *data_len = 0; - *data_offset = 0; - return 0; - } - } - - SCLogDebug("getting multiple blobs. Iter %p, %"PRIu64"/%u (next? %s)", *iter, (*iter)->offset, (*iter)->len, (*iter)->next ? "yes":"no"); + SCLogDebug("getting multiple blobs. Iter %p, %"PRIu64"/%u", *iter, (*iter)->offset, (*iter)->len); StreamingBufferSBBGetData(&stream->sb, (*iter), &mydata, &mydata_len); + SCLogDebug("mydata %p", mydata); if ((*iter)->offset < offset) { uint64_t delta = offset - (*iter)->offset; @@ -1191,6 +1186,7 @@ static int GetRawBuffer(TcpStream *stream, const uint8_t **data, uint32_t *data_ *data_len = mydata_len - delta; *data_offset = offset; } else { + SCLogDebug("no data (yet)"); *data = NULL; *data_len = 0; *data_offset = 0; @@ -1202,7 +1198,8 @@ static int GetRawBuffer(TcpStream *stream, const uint8_t **data, uint32_t *data_ *data_offset = (*iter)->offset; } - *iter = (*iter)->next; + *iter = SBB_RB_NEXT(*iter); + SCLogDebug("*iter %p", *iter); } return 0; } @@ -1385,25 +1382,20 @@ static int StreamReassembleRawInline(TcpSession *ssn, const Packet *p, /* simply return progress from the block we inspected. */ bool return_progress = false; - if (stream->sb.block_list == NULL) { + if (RB_EMPTY(&stream->sb.sbb_tree)) { /* continues block */ StreamingBufferGetData(&stream->sb, &mydata, &mydata_len, &mydata_offset); return_progress = true; } else { + SCLogDebug("finding our SBB from offset %"PRIu64, packet_leftedge_abs); /* find our block */ - StreamingBufferBlock *iter = stream->sb.block_list; - for ( ; iter != NULL; iter = iter->next) { - uint64_t iter_re_abs = iter->offset + iter->len; - DEBUG_VALIDATE_BUG_ON(packet_leftedge_abs < iter->offset && - packet_rightedge_abs > iter->offset && - packet_rightedge_abs < iter_re_abs); - - if (iter->offset <= packet_leftedge_abs && iter_re_abs >= packet_rightedge_abs) { - StreamingBufferSBBGetData(&stream->sb, iter, &mydata, &mydata_len); - mydata_offset = iter->offset; - break; - } + StreamingBufferBlock key = { .offset = packet_leftedge_abs, .len = p->payload_len }; + StreamingBufferBlock *sbb = SBB_RB_FIND_INCLUSIVE(&stream->sb.sbb_tree, &key); + if (sbb) { + SCLogDebug("found %p offset %"PRIu64" len %u", sbb, sbb->offset, sbb->len); + StreamingBufferSBBGetData(&stream->sb, sbb, &mydata, &mydata_len); + mydata_offset = sbb->offset; } } diff --git a/src/util-streaming-buffer.c b/src/util-streaming-buffer.c index e1f27c06e9..dc5d08e741 100644 --- a/src/util-streaming-buffer.c +++ b/src/util-streaming-buffer.c @@ -19,6 +19,7 @@ #include "util-streaming-buffer.h" #include "util-unittest.h" #include "util-print.h" +#include "util-validate.h" /** * \file @@ -41,6 +42,63 @@ static void SBBFree(StreamingBuffer *sb); +RB_GENERATE(SBB, StreamingBufferBlock, rb, SBBCompare); + +int SBBCompare(struct StreamingBufferBlock *a, struct StreamingBufferBlock *b) +{ + SCLogDebug("a %"PRIu64" len %u, b %"PRIu64" len %u", + a->offset, a->len, b->offset, b->len); + + if (a->offset > b->offset) + SCReturnInt(1); + else if (a->offset < b->offset) + SCReturnInt(-1); + else { + if (a->len == 0 || b->len == 0 || a->len == b->len) + SCReturnInt(0); + else if (a->len > b->len) + SCReturnInt(1); + else + SCReturnInt(-1); + } +} + +/* inclusive compare function that also considers the right edge, + * not just the offset. */ +static inline int InclusiveCompare(StreamingBufferBlock *lookup, StreamingBufferBlock *intree) { + const uint64_t lre = lookup->offset + lookup->len; + const uint64_t tre = intree->offset + intree->len; + if (lre < intree->offset) // entirely before + return -1; + else if (lre >= intree->offset && lre <= tre) // (some) overlap + return 0; + else + return 1; // entirely after +} + +StreamingBufferBlock *SBB_RB_FIND_INCLUSIVE(struct SBB *head, StreamingBufferBlock *elm) +{ + SCLogDebug("looking up %"PRIu64, elm->offset); + + struct StreamingBufferBlock *tmp = RB_ROOT(head); + struct StreamingBufferBlock *res = NULL; + while (tmp) { + SCLogDebug("compare with %"PRIu64"/%u", tmp->offset, tmp->len); + const int comp = InclusiveCompare(elm, tmp); + SCLogDebug("compare result: %d", comp); + if (comp < 0) { + res = tmp; + tmp = RB_LEFT(tmp, rb); + } else if (comp > 0) { + tmp = RB_RIGHT(tmp, rb); + } else { + return tmp; + } + } + return res; +} + + static inline int InitBuffer(StreamingBuffer *sb) { sb->buf = CALLOC(sb->cfg, 1, sb->cfg->buf_size); @@ -93,19 +151,18 @@ void StreamingBufferFree(StreamingBuffer *sb) } #ifdef DEBUG -static void SBBPrintList(const StreamingBuffer *sb) +static void SBBPrintList(StreamingBuffer *sb) { - const StreamingBufferBlock *sbb = sb->block_list; - while (sbb) { + StreamingBufferBlock *sbb = NULL; + RB_FOREACH(sbb, SBB, &sb->sbb_tree) { SCLogDebug("sbb: offset %"PRIu64", len %u", sbb->offset, sbb->len); - if (sbb->next) { - if ((sbb->offset + sbb->len) != sbb->next->offset) { + StreamingBufferBlock *next = SBB_RB_NEXT(sbb); + if (next) { + if ((sbb->offset + sbb->len) != next->offset) { SCLogDebug("gap: offset %"PRIu64", len %"PRIu64, (sbb->offset + sbb->len), - sbb->next->offset - (sbb->offset + sbb->len)); + next->offset - (sbb->offset + sbb->len)); } } - - sbb = sbb->next; } } #endif @@ -117,8 +174,8 @@ static void SBBPrintList(const StreamingBuffer *sb) static void SBBInit(StreamingBuffer *sb, uint32_t rel_offset, uint32_t data_len) { - BUG_ON(sb->block_list != NULL); - BUG_ON(sb->buf_offset > sb->stream_offset + rel_offset); + DEBUG_VALIDATE_BUG_ON(!RB_EMPTY(&sb->sbb_tree)); + DEBUG_VALIDATE_BUG_ON(sb->buf_offset > sb->stream_offset + rel_offset); /* need to set up 2: existing data block and new data block */ StreamingBufferBlock *sbb = CALLOC(sb->cfg, 1, sizeof(*sbb)); @@ -136,9 +193,8 @@ static void SBBInit(StreamingBuffer *sb, sbb2->offset = sb->stream_offset + rel_offset; sbb2->len = data_len; - sb->block_list = sbb; - sbb->next = sbb2; - sb->block_list_tail = sbb2; + SBB_RB_INSERT(&sb->sbb_tree, sbb); + SBB_RB_INSERT(&sb->sbb_tree, sbb2); SCLogDebug("sbb1 %"PRIu64", len %u, sbb2 %"PRIu64", len %u", sbb->offset, sbb->len, sbb2->offset, sbb2->len); @@ -155,7 +211,7 @@ static void SBBInit(StreamingBuffer *sb, static void SBBInitLeadingGap(StreamingBuffer *sb, uint64_t offset, uint32_t data_len) { - BUG_ON(sb->block_list != NULL); + DEBUG_VALIDATE_BUG_ON(!RB_EMPTY(&sb->sbb_tree)); StreamingBufferBlock *sbb = CALLOC(sb->cfg, 1, sizeof(*sbb)); if (sbb == NULL) @@ -163,8 +219,7 @@ static void SBBInitLeadingGap(StreamingBuffer *sb, sbb->offset = offset; sbb->len = data_len; - sb->block_list = sbb; - sb->block_list_tail = sbb; + SBB_RB_INSERT(&sb->sbb_tree, sbb); SCLogDebug("sbb %"PRIu64", len %u", sbb->offset, sbb->len); @@ -173,280 +228,164 @@ static void SBBInitLeadingGap(StreamingBuffer *sb, #endif } -static int IsBefore(StreamingBufferBlock *me, StreamingBufferBlock *you) +static inline void ConsolidateFwd(StreamingBuffer *sb, + struct SBB *tree, StreamingBufferBlock *sa) { - if ((me->offset + me->len) < you->offset) { - return 1; + uint64_t sa_re = sa->offset + sa->len; + StreamingBufferBlock *tr, *s = sa; + RB_FOREACH_FROM(tr, SBB, s) { + if (sa == tr) + continue; + + const uint64_t tr_re = tr->offset + tr->len; + SCLogDebug("-> (fwd) tr %p %"PRIu64"/%u re %"PRIu64, + tr, tr->offset, tr->len, tr_re); + + if (sa_re < tr->offset) + break; // entirely before + + /* + sa: [ ] + tr: [ ] + sa: [ ] + tr: [ ] + sa: [ ] + tr: [ ] + */ + if (sa->offset >= tr->offset && sa_re <= tr_re) { + sa->len = tr->len; + sa->offset = tr->offset; + sa_re = sa->offset + sa->len; + SCLogDebug("-> (fwd) tr %p %"PRIu64"/%u REMOVED ECLIPSED2", tr, tr->offset, tr->len); + SBB_RB_REMOVE(tree, tr); + FREE(sb->cfg, tr, sizeof(StreamingBufferBlock)); + /* + sa: [ ] + tr: [ ] + sa: [ ] + tr: [ ] + sa: [ ] + tr: [ ] + */ + } else if (sa->offset <= tr->offset && sa_re >= tr_re) { + SCLogDebug("-> (fwd) tr %p %"PRIu64"/%u REMOVED ECLIPSED", tr, tr->offset, tr->len); + SBB_RB_REMOVE(tree, tr); + FREE(sb->cfg, tr, sizeof(StreamingBufferBlock)); + /* + sa: [ ] + tr: [ ] + sa: [ ] + tr: [ ] + */ + } else if (sa->offset < tr->offset && // starts before + sa_re >= tr->offset && sa_re < tr_re) // ends inside + { + // merge + sa->len = tr_re - sa->offset; + sa_re = sa->offset + sa->len; + SCLogDebug("-> (fwd) tr %p %"PRIu64"/%u REMOVED MERGED", tr, tr->offset, tr->len); + SBB_RB_REMOVE(tree, tr); + FREE(sb->cfg, tr, sizeof(StreamingBufferBlock)); + } } - return 0; -} - -static int StartsBefore(StreamingBufferBlock *me, StreamingBufferBlock *you) -{ - if (me->offset < you->offset) - return 1; - return 0; -} - -static int IsAfter(StreamingBufferBlock *me, StreamingBufferBlock *you) -{ - if (you->offset + you->len < me->offset) - return 1; - return 0; -} - -static int IsOverlappedBy(StreamingBufferBlock *me, StreamingBufferBlock *you) -{ - if (you->offset <= me->offset && (you->offset + you->len) >= (me->offset + me->len)) - return 1; - return 0; -} - -static int EndsAfter(StreamingBufferBlock *me, StreamingBufferBlock *you) -{ - if ((me->offset + me->len) > (you->offset + you->len)) - return 1; - return 0; -} - -static StreamingBufferBlock *GetNew(StreamingBuffer *sb, - uint64_t offset, uint32_t len, - StreamingBufferBlock *next) -{ - StreamingBufferBlock *new_sbb = CALLOC(sb->cfg, 1, sizeof(*new_sbb)); - if (new_sbb == NULL) - return NULL; - new_sbb->offset = offset; - new_sbb->len = len; - new_sbb->next = next; - return new_sbb; } -/* expand our sbb forward if possible */ -static int SBBUpdateLookForward(StreamingBuffer *sb, - StreamingBufferBlock *sbb, - StreamingBufferBlock *my_block) +static inline void ConsolidateBackward(StreamingBuffer *sb, + struct SBB *tree, StreamingBufferBlock *sa) { - SCLogDebug("EndsAfter: consider next"); - - while (sbb->offset + sbb->len == sbb->next->offset) - { - SCLogDebug("EndsAfter: gobble up next: %"PRIu64"/%u", sbb->next->offset, sbb->next->len); - uint64_t right_edge = sbb->next->offset + sbb->next->len; - uint32_t expand_by = right_edge - (sbb->offset + sbb->len); - sbb->len += expand_by; - SCLogDebug("EndsAfter: expand_by %u (part 2)", expand_by); - SCLogDebug("EndsAfter: (loop) sbb now %"PRIu64"/%u", sbb->offset, sbb->len); - - /* we can gobble up next */ - StreamingBufferBlock *to_free = sbb->next; - sbb->next = sbb->next->next; - FREE(sb->cfg, to_free, sizeof(StreamingBufferBlock)); - if (sbb->next == NULL) - sb->block_list_tail = sbb; - - /* update my block */ - if (expand_by >= my_block->len) { - return 1; - } - - my_block->len -= expand_by; - my_block->offset += expand_by; - - if (sbb->next == NULL) { - /* if we have nothing left in the list we're almost done, - * except we need to check if we have some of our block - * left */ - sbb->len += my_block->len; - my_block->offset += my_block->len; - my_block->len = 0; - return 1; - } else { - /* if next is not directly connected and we have some - * block len left, expand sbb further */ - uint32_t gap = sbb->next->offset - (sbb->offset + sbb->len); - SCLogDebug("EndsAfter: we now have a gap of %u and a block of %"PRIu64"/%u", gap, my_block->offset, my_block->len); - - if (my_block->len < gap) { - sbb->len += my_block->len; - my_block->offset += my_block->len; - my_block->len = 0; - return 1; - } else { - sbb->len += gap; - my_block->offset += gap; - my_block->len -= gap; - SCLogDebug("EndsAfter: (loop) block at %"PRIu64"/%u, sbb %"PRIu64"/%u", my_block->offset, my_block->len, sbb->offset, sbb->len); - SCLogDebug("EndsAfter: (loop) sbb->next %"PRIu64"/%u", sbb->next->offset, sbb->next->len); - } + uint64_t sa_re = sa->offset + sa->len; + StreamingBufferBlock *tr, *s = sa; + RB_FOREACH_REVERSE_FROM(tr, SBB, s) { + if (sa == tr) + continue; + const uint64_t tr_re = tr->offset + tr->len; + SCLogDebug("-> (bwd) tr %p %"PRIu64"/%u", tr, tr->offset, tr->len); + + if (sa->offset > tr_re) + break; // entirely after + + if (sa->offset >= tr->offset && sa_re <= tr_re) { + sa->len = tr->len; + sa->offset = tr->offset; + sa_re = sa->offset + sa->len; + SCLogDebug("-> (bwd) tr %p %"PRIu64"/%u REMOVED ECLIPSED2", tr, tr->offset, tr->len); + SBB_RB_REMOVE(tree, tr); + FREE(sb->cfg, tr, sizeof(StreamingBufferBlock)); + /* + sa: [ ] + tr: [ ] + sa: [ ] + tr: [ ] + sa: [ ] + tr: [ ] + */ + } else if (sa->offset <= tr->offset && sa_re >= tr_re) { + SCLogDebug("-> (bwd) tr %p %"PRIu64"/%u REMOVED ECLIPSED", tr, tr->offset, tr->len); + SBB_RB_REMOVE(tree, tr); + FREE(sb->cfg, tr, sizeof(StreamingBufferBlock)); + /* + sa: [ ] + tr: [ ] + sa: [ ] + tr: [ ] + */ + } else if (sa->offset > tr->offset && sa_re > tr_re && sa->offset <= tr_re) { + // merge + sa->len = sa_re - tr->offset; + sa->offset = tr->offset; + sa_re = sa->offset + sa->len; + SCLogDebug("-> (bwd) tr %p %"PRIu64"/%u REMOVED MERGED", tr, tr->offset, tr->len); + SBB_RB_REMOVE(tree, tr); + FREE(sb->cfg, tr, sizeof(StreamingBufferBlock)); } } - return 0; } -static void SBBUpdate(StreamingBuffer *sb, - uint32_t rel_offset, uint32_t data_len) +static int Insert(StreamingBuffer *sb, struct SBB *tree, + uint32_t rel_offset, uint32_t len) { - StreamingBufferBlock my_block = { .offset = sb->stream_offset + rel_offset, - .len = data_len, - .next = NULL }; - const uint64_t my_block_right_edge = my_block.offset + my_block.len; + SCLogDebug("* inserting: %u/%u\n", rel_offset, len); - StreamingBufferBlock *tail = sb->block_list_tail; - - /* fast path 1: expands tail */ - if (tail && ((tail->offset + tail->len) == my_block.offset)) - { - tail->len = my_block_right_edge - tail->offset; - goto done; - } - /* fast path 2: new isolated block after tail */ - else if (tail && IsAfter(&my_block, tail)) { - StreamingBufferBlock *new_sbb = GetNew(sb, my_block.offset, my_block.len, NULL); - sb->block_list_tail = tail->next = new_sbb; - SCLogDebug("tail: new block at %"PRIu64"/%u", my_block.offset, my_block.len); - goto done; + StreamingBufferBlock *sbb = CALLOC(sb->cfg, 1, sizeof(*sbb)); + if (sbb == NULL) + return -1; + sbb->offset = sb->stream_offset + rel_offset; + sbb->len = len; + StreamingBufferBlock *res = SBB_RB_INSERT(tree, sbb); + if (res) { + // exact overlap + SCLogDebug("* insert failed: exact match in tree with %p %"PRIu64"/%u", res, res->offset, res->len); + FREE(sb->cfg, sbb, sizeof(StreamingBufferBlock)); + return 0; } - - BUG_ON(sb->block_list == NULL); + ConsolidateBackward(sb, tree, sbb); + ConsolidateFwd(sb, tree, sbb); #ifdef DEBUG SBBPrintList(sb); #endif - SCLogDebug("PreInsert: block at %"PRIu64"/%u", my_block.offset, my_block.len); - StreamingBufferBlock *sbb = sb->block_list, *prev = NULL; - while (sbb) { - SCLogDebug("sbb %"PRIu64"/%u data %"PRIu64"/%u. Next %s", sbb->offset, sbb->len, - my_block.offset, my_block.len, sbb->next ? "true" : "false"); - - if (IsBefore(&my_block, sbb)) { - StreamingBufferBlock *new_sbb = GetNew(sb, my_block.offset, my_block.len, sbb); - - /* place before, maybe replace list head */ - if (prev == NULL) { - sb->block_list = new_sbb; - } else { - prev->next = new_sbb; - } - SCLogDebug("IsBefore: new block at %"PRIu64"/%u", my_block.offset, my_block.len); - break; - - } else if (IsOverlappedBy(&my_block, sbb)) { - /* nothing to do */ - SCLogDebug("IsOverlappedBy: overlapped block at %"PRIu64"/%u", my_block.offset, my_block.len); - break; - - } else if (IsAfter(&my_block, sbb)) { - - /* if no next, place after, otherwise, iterate */ - if (sbb->next == NULL) { - StreamingBufferBlock *new_sbb = GetNew(sb, my_block.offset, my_block.len, NULL); - sbb->next = new_sbb; - sb->block_list_tail = new_sbb; - SCLogDebug("new block at %"PRIu64"/%u", my_block.offset, my_block.len); - break; - } - SCLogDebug("IsAfter: block at %"PRIu64"/%u, is after sbb", my_block.offset, my_block.len); - - } else { - - /* those were the simple cases */ - - if (StartsBefore(&my_block, sbb)) { - /* expand sbb */ - uint32_t expand_by = sbb->offset - my_block.offset; - SCLogDebug("StartsBefore: expand_by %u", expand_by); - sbb->offset = my_block.offset; - sbb->len += expand_by; - - /* if my_block ends before sbb right edge, we are done */ - if (my_block_right_edge <= (sbb->offset + sbb->len)) - break; - - my_block.offset = sbb->offset + sbb->len; - my_block.len = my_block_right_edge - my_block.offset; - SCLogDebug("StartsBefore: block now %"PRIu64"/%u", my_block.offset, my_block.len); - - if (sbb->next == NULL) { - sbb->len += my_block.len; - break; - } - /* expand, but consider next */ - uint64_t right_edge = my_block_right_edge; - if (right_edge > sbb->next->offset) { - right_edge = sbb->next->offset; - } - - expand_by = right_edge - (sbb->offset + sbb->len); - SCLogDebug("EndsAfter: expand_by %u", expand_by); - sbb->len += expand_by; - SCLogDebug("EndsAfter: sbb now %"PRIu64"/%u", sbb->offset, sbb->len); - - my_block.offset = sbb->offset + sbb->len; - my_block.len = my_block_right_edge - my_block.offset; - SCLogDebug("StartsBefore: sbb now %"PRIu64"/%u", sbb->offset, sbb->len); - - } else if (EndsAfter(&my_block, sbb)) { - /* expand sbb, but we need to mind "next" */ - - if (sbb->next == NULL) { - /* last, so just expand sbb */ - sbb->len = my_block_right_edge - sbb->offset; - break; - } - - /* expand, but consider next */ - uint64_t right_edge = my_block_right_edge; - if (right_edge > sbb->next->offset) { - right_edge = sbb->next->offset; - } - - uint32_t expand_by = right_edge - (sbb->offset + sbb->len); - SCLogDebug("EndsAfter: expand_by %u", expand_by); - sbb->len += expand_by; - SCLogDebug("EndsAfter: sbb now %"PRIu64"/%u", sbb->offset, sbb->len); - - my_block.offset = sbb->offset + sbb->len; - my_block.len = my_block_right_edge - my_block.offset; - } - - if (sbb->next != NULL) { - SCLogDebug("EndsAfter: consider next"); - - if (SBBUpdateLookForward(sb, sbb, &my_block) == 1) - goto done; - } - - SCLogDebug("EndsAfter: block at %"PRIu64"/%u, is after sbb", my_block.offset, my_block.len); + return 0; +} - if (my_block.len == 0) - break; - } - prev = sbb; - sbb = sbb->next; - } -done: - SCLogDebug("PostInsert: block at %"PRIu64"/%u", my_block.offset, my_block.len); - SCLogDebug("PostInsert"); -#ifdef DEBUG - SBBPrintList(sb); -#endif +static void SBBUpdate(StreamingBuffer *sb, + uint32_t rel_offset, uint32_t data_len) +{ + Insert(sb, &sb->sbb_tree, rel_offset, data_len); } static void SBBFree(StreamingBuffer *sb) { - StreamingBufferBlock *sbb = sb->block_list; - while (sbb) { - StreamingBufferBlock *next = sbb->next; + StreamingBufferBlock *sbb = NULL, *safe = NULL; + RB_FOREACH_SAFE(sbb, SBB, &sb->sbb_tree, safe) { + SBB_RB_REMOVE(&sb->sbb_tree, sbb); FREE(sb->cfg, sbb, sizeof(StreamingBufferBlock)); - sbb = next; } - sb->block_list = NULL; } static void SBBPrune(StreamingBuffer *sb) { - StreamingBufferBlock *sbb = sb->block_list; - while (sbb) { + SCLogDebug("pruning %p to %"PRIu64, sb, sb->stream_offset); + StreamingBufferBlock *sbb = NULL, *safe = NULL; + RB_FOREACH_SAFE(sbb, SBB, &sb->sbb_tree, safe) { /* completely beyond window, we're done */ if (sbb->offset > sb->stream_offset) break; @@ -455,20 +394,18 @@ static void SBBPrune(StreamingBuffer *sb) if (sbb->offset < sb->stream_offset && sbb->offset + sbb->len > sb->stream_offset) { uint32_t shrink_by = sb->stream_offset - sbb->offset; - BUG_ON(shrink_by > sbb->len); - sbb->len -= shrink_by; - sbb->offset += shrink_by; - BUG_ON(sbb->offset != sb->stream_offset); + DEBUG_VALIDATE_BUG_ON(shrink_by > sbb->len); + if (sbb->len >= shrink_by) { + sbb->len -= shrink_by; + sbb->offset += shrink_by; + DEBUG_VALIDATE_BUG_ON(sbb->offset != sb->stream_offset); + } break; } - StreamingBufferBlock *next = sbb->next; + SBB_RB_REMOVE(&sb->sbb_tree, sbb); + SCLogDebug("sb %p removed %p %"PRIu64", %u", sb, sbb, sbb->offset, sbb->len); FREE(sb->cfg, sbb, sizeof(StreamingBufferBlock)); - - sbb = next; - sb->block_list = next; - if (sbb && sbb->next == NULL) - sb->block_list_tail = NULL; } } @@ -610,7 +547,7 @@ StreamingBufferSegment *StreamingBufferAppendRaw(StreamingBuffer *sb, const uint uint32_t rel_offset = sb->buf_offset; sb->buf_offset += data_len; - if (sb->block_list) { + if (!RB_EMPTY(&sb->sbb_tree)) { SBBUpdate(sb, rel_offset, data_len); } return seg; @@ -652,7 +589,7 @@ int StreamingBufferAppend(StreamingBuffer *sb, StreamingBufferSegment *seg, uint32_t rel_offset = sb->buf_offset; sb->buf_offset += data_len; - if (sb->block_list) { + if (!RB_EMPTY(&sb->sbb_tree)) { SBBUpdate(sb, rel_offset, data_len); } return 0; @@ -691,7 +628,7 @@ int StreamingBufferAppendNoTrack(StreamingBuffer *sb, uint32_t rel_offset = sb->buf_offset; sb->buf_offset += data_len; - if (sb->block_list) { + if (!RB_EMPTY(&sb->sbb_tree)) { SBBUpdate(sb, rel_offset, data_len); } return 0; @@ -739,7 +676,7 @@ int StreamingBufferInsertAt(StreamingBuffer *sb, StreamingBufferSegment *seg, SCLogDebug("rel_offset %u sb->stream_offset %"PRIu64", buf_offset %u", rel_offset, sb->stream_offset, sb->buf_offset); - if (sb->block_list == NULL) { + if (RB_EMPTY(&sb->sbb_tree)) { SCLogDebug("empty sbb list"); if (sb->stream_offset == offset) { @@ -1145,7 +1082,7 @@ static int StreamingBufferTest04(void) StreamingBufferSegment seg1; FAIL_IF(StreamingBufferAppend(sb, &seg1, (const uint8_t *)"ABCDEFGH", 8) != 0); - FAIL_IF(sb->block_list != NULL); + FAIL_IF(!RB_EMPTY(&sb->sbb_tree)); StreamingBufferSegment seg2; FAIL_IF(StreamingBufferInsertAt(sb, &seg2, (const uint8_t *)"01234567", 8, 14) != 0); FAIL_IF(sb->stream_offset != 0); @@ -1154,12 +1091,15 @@ static int StreamingBufferTest04(void) FAIL_IF(seg2.stream_offset != 14); FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg1)); FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg2)); - FAIL_IF(sb->block_list == NULL); - FAIL_IF(sb->block_list->offset != 0); - FAIL_IF(sb->block_list->len != 8); - FAIL_IF(sb->block_list->next == NULL); - FAIL_IF(sb->block_list->next->offset != 14); - FAIL_IF(sb->block_list->next->len != 8); + FAIL_IF(RB_EMPTY(&sb->sbb_tree)); + StreamingBufferBlock *sbb1 = RB_MIN(SBB, &sb->sbb_tree); + FAIL_IF_NULL(sbb1); + FAIL_IF(sbb1->offset != 0); + FAIL_IF(sbb1->len != 8); + StreamingBufferBlock *sbb2 = SBB_RB_NEXT(sbb1); + FAIL_IF_NULL(sbb2); + FAIL_IF(sbb2->offset != 14); + FAIL_IF(sbb2->len != 8); Dump(sb); DumpSegment(sb, &seg1); DumpSegment(sb, &seg2); @@ -1172,10 +1112,11 @@ static int StreamingBufferTest04(void) FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg1)); FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg2)); FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg3)); - FAIL_IF(sb->block_list == NULL); - FAIL_IF(sb->block_list->offset != 0); - FAIL_IF(sb->block_list->len != 22); - FAIL_IF(sb->block_list->next != NULL); + sbb1 = RB_MIN(SBB, &sb->sbb_tree); + FAIL_IF_NULL(sbb1); + FAIL_IF(sbb1->offset != 0); + FAIL_IF(sbb1->len != 22); + FAIL_IF(SBB_RB_NEXT(sbb1)); Dump(sb); DumpSegment(sb, &seg1); DumpSegment(sb, &seg2); @@ -1192,10 +1133,11 @@ static int StreamingBufferTest04(void) FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg2)); FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg3)); FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg4)); - FAIL_IF(sb->block_list == NULL); - FAIL_IF(sb->block_list->offset != 0); - FAIL_IF(sb->block_list->len != 22); - FAIL_IF(sb->block_list->next == NULL); + sbb1 = RB_MIN(SBB, &sb->sbb_tree); + FAIL_IF_NULL(sbb1); + FAIL_IF(sbb1->offset != 0); + FAIL_IF(sbb1->len != 22); + FAIL_IF(!SBB_RB_NEXT(sbb1)); Dump(sb); DumpSegment(sb, &seg1); DumpSegment(sb, &seg2); @@ -1272,18 +1214,20 @@ static int StreamingBufferTest06(void) StreamingBufferSegment seg5; FAIL_IF(StreamingBufferInsertAt(sb, &seg5, (const uint8_t *)"ABCDEFGHIJ", 10, 0) != 0); Dump(sb); - FAIL_IF(sb->block_list == NULL); - FAIL_IF(sb->block_list->offset != 0); - FAIL_IF(sb->block_list->len != 10); - FAIL_IF(sb->block_list->next != NULL); + StreamingBufferBlock *sbb1 = RB_MIN(SBB, &sb->sbb_tree); + FAIL_IF_NULL(sbb1); + FAIL_IF(sbb1->offset != 0); + FAIL_IF(sbb1->len != 10); + FAIL_IF(SBB_RB_NEXT(sbb1)); StreamingBufferSegment seg6; FAIL_IF(StreamingBufferInsertAt(sb, &seg6, (const uint8_t *)"abcdefghij", 10, 0) != 0); Dump(sb); - FAIL_IF(sb->block_list == NULL); - FAIL_IF(sb->block_list->offset != 0); - FAIL_IF(sb->block_list->len != 10); - FAIL_IF(sb->block_list->next != NULL); + sbb1 = RB_MIN(SBB, &sb->sbb_tree); + FAIL_IF_NULL(sbb1); + FAIL_IF(sbb1->offset != 0); + FAIL_IF(sbb1->len != 10); + FAIL_IF(SBB_RB_NEXT(sbb1)); StreamingBufferFree(sb); PASS; @@ -1313,18 +1257,20 @@ static int StreamingBufferTest07(void) StreamingBufferSegment seg5; FAIL_IF(StreamingBufferInsertAt(sb, &seg5, (const uint8_t *)"ABCDEFGHIJ", 10, 0) != 0); Dump(sb); - FAIL_IF(sb->block_list == NULL); - FAIL_IF(sb->block_list->offset != 0); - FAIL_IF(sb->block_list->len != 10); - FAIL_IF(sb->block_list->next != NULL); + StreamingBufferBlock *sbb1 = RB_MIN(SBB, &sb->sbb_tree); + FAIL_IF_NULL(sbb1); + FAIL_IF(sbb1->offset != 0); + FAIL_IF(sbb1->len != 10); + FAIL_IF(SBB_RB_NEXT(sbb1)); StreamingBufferSegment seg6; FAIL_IF(StreamingBufferInsertAt(sb, &seg6, (const uint8_t *)"abcdefghij", 10, 0) != 0); Dump(sb); - FAIL_IF(sb->block_list == NULL); - FAIL_IF(sb->block_list->offset != 0); - FAIL_IF(sb->block_list->len != 10); - FAIL_IF(sb->block_list->next != NULL); + sbb1 = RB_MIN(SBB, &sb->sbb_tree); + FAIL_IF_NULL(sbb1); + FAIL_IF(sbb1->offset != 0); + FAIL_IF(sbb1->len != 10); + FAIL_IF(SBB_RB_NEXT(sbb1)); StreamingBufferFree(sb); PASS; @@ -1354,18 +1300,20 @@ static int StreamingBufferTest08(void) StreamingBufferSegment seg5; FAIL_IF(StreamingBufferInsertAt(sb, &seg5, (const uint8_t *)"ABCDEFGHIJ", 10, 0) != 0); Dump(sb); - FAIL_IF(sb->block_list == NULL); - FAIL_IF(sb->block_list->offset != 0); - FAIL_IF(sb->block_list->len != 10); - FAIL_IF(sb->block_list->next != NULL); + StreamingBufferBlock *sbb1 = RB_MIN(SBB, &sb->sbb_tree); + FAIL_IF_NULL(sbb1); + FAIL_IF(sbb1->offset != 0); + FAIL_IF(sbb1->len != 10); + FAIL_IF(SBB_RB_NEXT(sbb1)); StreamingBufferSegment seg6; FAIL_IF(StreamingBufferAppend(sb, &seg6, (const uint8_t *)"abcdefghij", 10) != 0); Dump(sb); - FAIL_IF(sb->block_list == NULL); - FAIL_IF(sb->block_list->offset != 0); - FAIL_IF(sb->block_list->len != 20); - FAIL_IF(sb->block_list->next != NULL); + sbb1 = RB_MIN(SBB, &sb->sbb_tree); + FAIL_IF_NULL(sbb1); + FAIL_IF(sbb1->offset != 0); + FAIL_IF(sbb1->len != 20); + FAIL_IF(SBB_RB_NEXT(sbb1)); StreamingBufferFree(sb); PASS; @@ -1395,18 +1343,20 @@ static int StreamingBufferTest09(void) StreamingBufferSegment seg5; FAIL_IF(StreamingBufferInsertAt(sb, &seg5, (const uint8_t *)"ABCDEFGHIJ", 10, 0) != 0); Dump(sb); - FAIL_IF(sb->block_list == NULL); - FAIL_IF(sb->block_list->offset != 0); - FAIL_IF(sb->block_list->len != 10); - FAIL_IF(sb->block_list->next != NULL); + StreamingBufferBlock *sbb1 = RB_MIN(SBB, &sb->sbb_tree); + FAIL_IF_NULL(sbb1); + FAIL_IF(sbb1->offset != 0); + FAIL_IF(sbb1->len != 10); + FAIL_IF(SBB_RB_NEXT(sbb1)); StreamingBufferSegment seg6; FAIL_IF(StreamingBufferInsertAt(sb, &seg6, (const uint8_t *)"abcdefghij", 10, 0) != 0); Dump(sb); - FAIL_IF(sb->block_list == NULL); - FAIL_IF(sb->block_list->offset != 0); - FAIL_IF(sb->block_list->len != 10); - FAIL_IF(sb->block_list->next != NULL); + sbb1 = RB_MIN(SBB, &sb->sbb_tree); + FAIL_IF_NULL(sbb1); + FAIL_IF(sbb1->offset != 0); + FAIL_IF(sbb1->len != 10); + FAIL_IF(SBB_RB_NEXT(sbb1)); StreamingBufferFree(sb); PASS; @@ -1421,10 +1371,10 @@ static int StreamingBufferTest10(void) StreamingBufferSegment seg1; FAIL_IF(StreamingBufferInsertAt(sb, &seg1, (const uint8_t *)"A", 1, 0) != 0); + Dump(sb); StreamingBufferSegment seg2; FAIL_IF(StreamingBufferInsertAt(sb, &seg2, (const uint8_t *)"D", 1, 3) != 0); Dump(sb); - StreamingBufferSegment seg3; FAIL_IF(StreamingBufferInsertAt(sb, &seg3, (const uint8_t *)"H", 1, 7) != 0); Dump(sb); @@ -1442,18 +1392,20 @@ static int StreamingBufferTest10(void) StreamingBufferSegment seg7; FAIL_IF(StreamingBufferInsertAt(sb, &seg7, (const uint8_t *)"ABCDEFGHIJ", 10, 0) != 0); Dump(sb); - FAIL_IF(sb->block_list == NULL); - FAIL_IF(sb->block_list->offset != 0); - FAIL_IF(sb->block_list->len != 10); - FAIL_IF(sb->block_list->next != NULL); + StreamingBufferBlock *sbb1 = RB_MIN(SBB, &sb->sbb_tree); + FAIL_IF_NULL(sbb1); + FAIL_IF(sbb1->offset != 0); + FAIL_IF(sbb1->len != 10); + FAIL_IF(SBB_RB_NEXT(sbb1)); StreamingBufferSegment seg8; FAIL_IF(StreamingBufferInsertAt(sb, &seg8, (const uint8_t *)"abcdefghij", 10, 0) != 0); Dump(sb); - FAIL_IF(sb->block_list == NULL); - FAIL_IF(sb->block_list->offset != 0); - FAIL_IF(sb->block_list->len != 10); - FAIL_IF(sb->block_list->next != NULL); + sbb1 = RB_MIN(SBB, &sb->sbb_tree); + FAIL_IF_NULL(sbb1); + FAIL_IF(sbb1->offset != 0); + FAIL_IF(sbb1->len != 10); + FAIL_IF(SBB_RB_NEXT(sbb1)); StreamingBufferFree(sb); PASS; diff --git a/src/util-streaming-buffer.h b/src/util-streaming-buffer.h index 91dd011514..defe09ce23 100644 --- a/src/util-streaming-buffer.h +++ b/src/util-streaming-buffer.h @@ -59,6 +59,8 @@ #ifndef __UTIL_STREAMING_BUFFER_H__ #define __UTIL_STREAMING_BUFFER_H__ +#include "tree.h" + #define STREAMING_BUFFER_NOFLAGS 0 #define STREAMING_BUFFER_AUTOSLIDE (1<<0) @@ -77,11 +79,18 @@ typedef struct StreamingBufferConfig_ { /** * \brief block of continues data */ -typedef struct StreamingBufferBlock_ { +typedef struct StreamingBufferBlock { uint64_t offset; + RB_ENTRY(StreamingBufferBlock) rb; uint32_t len; - struct StreamingBufferBlock_ *next; -} StreamingBufferBlock; +} __attribute__((__packed__)) StreamingBufferBlock; + +int SBBCompare(struct StreamingBufferBlock *a, struct StreamingBufferBlock *b); + +/* red-black tree prototype for SACK records */ +RB_HEAD(SBB, StreamingBufferBlock); +RB_PROTOTYPE(SBB, StreamingBufferBlock, rb, SBBCompare); +StreamingBufferBlock *SBB_RB_FIND_INCLUSIVE(struct SBB *head, StreamingBufferBlock *elm); typedef struct StreamingBuffer_ { const StreamingBufferConfig *cfg; @@ -91,17 +100,16 @@ typedef struct StreamingBuffer_ { uint32_t buf_size; /**< size of memory block */ uint32_t buf_offset; /**< how far we are in buf_size */ - StreamingBufferBlock *block_list; - StreamingBufferBlock *block_list_tail; + struct SBB sbb_tree; /**< red black tree of Stream Buffer Blocks */ #ifdef DEBUG uint32_t buf_size_max; #endif } StreamingBuffer; #ifndef DEBUG -#define STREAMING_BUFFER_INITIALIZER(cfg) { (cfg), 0, NULL, 0, 0, NULL, NULL}; +#define STREAMING_BUFFER_INITIALIZER(cfg) { (cfg), 0, NULL, 0, 0, { NULL }, }; #else -#define STREAMING_BUFFER_INITIALIZER(cfg) { (cfg), 0, NULL, 0, 0, NULL, NULL, 0 }; +#define STREAMING_BUFFER_INITIALIZER(cfg) { (cfg), 0, NULL, 0, 0, { NULL }, 0 }; #endif typedef struct StreamingBufferSegment_ {