packetpool: dynamic return threshold

Problem:

In pcap autofp mode, there is one threads reading packets (RX). These packets
are then passed on to worker threads. When these workers are done with a
packet, they return packets to the pcap reader threads packet pool, which is
the owner of the packets. Since this requires expensive synchronization between
threads, there is logic in place to batch this operation.

When the reader thread depletes its pool, it notifies the other threads that
it is starving and that a sync needs to happen asap. Then the reader enters
a wait state. During this time no new packets are read.

However, there is a problem with this approach. When the reader encountered
an empty pool, it would set an atomic flag that it needed a sync. The first
worker to return a packet to the pool would then set this flag, sync, and
unset the flag. This forced sync could result in just a single packet being
synchronized, or several. So if unlucky, the reader would just get a single
packet before hitting the same condition again.

Solution:

This patch updates the logic to use a new approach. Instead of using a
binary flag approach where the behavior only changes when the reader is
already starved, it uses a dynamic sync threshold that is controlled by
the reader. The reader keeps a running count of packets it its pool,
and calculates the percentage of available packets. This percentage is
then used to set the sync threshold.

When the pool is starved, it sets the threshold to 1 (sync for each packet).
After each successful get/sync the threshold is adjusted.
pull/9859/head
Victor Julien 1 year ago committed by Victor Julien
parent ec1482cf48
commit edc89ce791

@ -35,6 +35,8 @@
#include "util-validate.h"
#include "action-globals.h"
extern uint16_t max_pending_packets;
/* Number of freed packet to save for one pool before freeing them. */
#define MAX_PENDING_RETURN_PACKETS 32
static uint32_t max_pending_return_packets = MAX_PENDING_RETURN_PACKETS;
@ -66,15 +68,27 @@ static int PacketPoolIsEmpty(PktPool *pool)
return 1;
}
static void UpdateReturnThreshold(PktPool *pool)
{
const float perc = (float)pool->cnt / (float)max_pending_packets;
uint32_t threshold = (uint32_t)(perc * (float)max_pending_return_packets);
if (threshold != SC_ATOMIC_GET(pool->return_stack.return_threshold)) {
SC_ATOMIC_SET(pool->return_stack.return_threshold, threshold);
}
}
void PacketPoolWait(void)
{
PktPool *my_pool = GetThreadPacketPool();
if (PacketPoolIsEmpty(my_pool)) {
SC_ATOMIC_SET(my_pool->return_stack.return_threshold, 1);
SCMutexLock(&my_pool->return_stack.mutex);
SC_ATOMIC_ADD(my_pool->return_stack.sync_now, 1);
SCCondWait(&my_pool->return_stack.cond, &my_pool->return_stack.mutex);
SCMutexUnlock(&my_pool->return_stack.mutex);
UpdateReturnThreshold(my_pool);
}
while(PacketPoolIsEmpty(my_pool))
@ -98,6 +112,8 @@ static void PacketPoolGetReturnedPackets(PktPool *pool)
/* Move all the packets from the locked return stack to the local stack. */
pool->head = pool->return_stack.head;
pool->return_stack.head = NULL;
pool->cnt += pool->return_stack.cnt;
pool->return_stack.cnt = 0;
SCMutexUnlock(&pool->return_stack.mutex);
}
@ -119,8 +135,14 @@ Packet *PacketPoolGetPacket(void)
/* Stack is not empty. */
Packet *p = pool->head;
pool->head = p->next;
pool->cnt--;
p->pool = pool;
PacketReinit(p);
UpdateReturnThreshold(pool);
SCLogDebug("pp: %0.2f cnt:%u max:%d threshold:%u",
((float)pool->cnt / (float)max_pending_packets) * (float)100, pool->cnt,
max_pending_packets, SC_ATOMIC_GET(pool->return_stack.return_threshold));
return p;
}
@ -135,8 +157,14 @@ Packet *PacketPoolGetPacket(void)
/* Stack is not empty. */
Packet *p = pool->head;
pool->head = p->next;
pool->cnt--;
p->pool = pool;
PacketReinit(p);
UpdateReturnThreshold(pool);
SCLogDebug("pp: %0.2f cnt:%u max:%d threshold:%u",
((float)pool->cnt / (float)max_pending_packets) * (float)100, pool->cnt,
max_pending_packets, SC_ATOMIC_GET(pool->return_stack.return_threshold));
return p;
}
@ -170,6 +198,7 @@ void PacketPoolReturnPacket(Packet *p)
/* Push back onto this thread's own stack, so no locking. */
p->next = my_pool->head;
my_pool->head = p;
my_pool->cnt++;
} else {
PktPool *pending_pool = my_pool->pending_pool;
if (pending_pool == NULL || pending_pool == pool) {
@ -187,12 +216,13 @@ void PacketPoolReturnPacket(Packet *p)
my_pool->pending_count++;
}
if (SC_ATOMIC_GET(pool->return_stack.sync_now) || my_pool->pending_count > max_pending_return_packets) {
const uint32_t threshold = SC_ATOMIC_GET(pool->return_stack.return_threshold);
if (my_pool->pending_count >= threshold) {
/* Return the entire list of pending packets. */
SCMutexLock(&pool->return_stack.mutex);
my_pool->pending_tail->next = pool->return_stack.head;
pool->return_stack.head = my_pool->pending_head;
SC_ATOMIC_RESET(pool->return_stack.sync_now);
pool->return_stack.cnt += my_pool->pending_count;
SCCondSignal(&pool->return_stack.cond);
SCMutexUnlock(&pool->return_stack.mutex);
/* Clear the list of pending packets to return. */
@ -206,7 +236,7 @@ void PacketPoolReturnPacket(Packet *p)
SCMutexLock(&pool->return_stack.mutex);
p->next = pool->return_stack.head;
pool->return_stack.head = p;
SC_ATOMIC_RESET(pool->return_stack.sync_now);
pool->return_stack.cnt++;
SCMutexUnlock(&pool->return_stack.mutex);
SCCondSignal(&pool->return_stack.cond);
}
@ -225,13 +255,12 @@ void PacketPoolInitEmpty(void)
SCMutexInit(&my_pool->return_stack.mutex, NULL);
SCCondInit(&my_pool->return_stack.cond, NULL);
SC_ATOMIC_INIT(my_pool->return_stack.sync_now);
SC_ATOMIC_INIT(my_pool->return_stack.return_threshold);
SC_ATOMIC_SET(my_pool->return_stack.return_threshold, 32);
}
void PacketPoolInit(void)
{
extern uint16_t max_pending_packets;
PktPool *my_pool = GetThreadPacketPool();
#ifdef DEBUG_VALIDATION
@ -242,7 +271,8 @@ void PacketPoolInit(void)
SCMutexInit(&my_pool->return_stack.mutex, NULL);
SCCondInit(&my_pool->return_stack.cond, NULL);
SC_ATOMIC_INIT(my_pool->return_stack.sync_now);
SC_ATOMIC_INIT(my_pool->return_stack.return_threshold);
SC_ATOMIC_SET(my_pool->return_stack.return_threshold, 32);
/* pre allocate packets */
SCLogDebug("preallocating packets... packet size %" PRIuMAX "",

@ -32,7 +32,11 @@ typedef struct PktPoolLockedStack_{
/* linked list of free packets. */
SCMutex mutex;
SCCondT cond;
SC_ATOMIC_DECLARE(int, sync_now);
/** number of packets in needed to trigger a sync during
* the return to pool logic. Updated by pool owner based
* on how full the pool is. */
SC_ATOMIC_DECLARE(uint32_t, return_threshold);
uint32_t cnt;
Packet *head;
} __attribute__((aligned(CLS))) PktPoolLockedStack;
@ -41,6 +45,8 @@ typedef struct PktPool_ {
* No mutex is needed.
*/
Packet *head;
uint32_t cnt;
/* Packets waiting (pending) to be returned to the given Packet
* Pool. Accumulate packets for the same pool until a threshold is
* reached, then return them all at once. Keep the head and tail

Loading…
Cancel
Save