Implement thread specific data option when __thread is not available.

pull/1053/head
Ken Steele 11 years ago
parent be448aef22
commit a38d5a0135

@ -2053,6 +2053,7 @@ error:
*/
void Unified2RegisterTests(void)
{
PacketPoolInit();
#ifdef UNITTESTS
UtRegisterTest("Unified2Test01 -- Ipv4 test", Unified2Test01, 1);
UtRegisterTest("Unified2Test02 -- Ipv6 test", Unified2Test02, 1);

@ -111,7 +111,6 @@ TmModuleDecodeErfFileRegister(void)
TmEcode ReceiveErfFileLoop(ThreadVars *tv, void *data, void *slot)
{
Packet *p = NULL;
uint16_t packet_q_len = 0;
ErfFileThreadVars *etv = (ErfFileThreadVars *)data;
etv->slot = ((TmSlot *)slot)->slot_next;

@ -330,7 +330,6 @@ TmEcode ReceiveMpipeLoop(ThreadVars *tv, void *data, void *slot)
TmSlot *s = (TmSlot *)slot;
ptv->slot = s->slot_next;
Packet *p = NULL;
int cpu = tmc_cpus_get_my_cpu();
int rank = tv->rank;
int max_queued = 0;
char *ctype;
@ -918,7 +917,7 @@ TmEcode ReceiveMpipeThreadInit(ThreadVars *tv, void *initdata, void **data)
SCReturnInt(TM_ECODE_FAILED);
}
}
gxio_mpipe_init(context, instance);
result = gxio_mpipe_init(context, instance);
VERIFY(result, "gxio_mpipe_init()");
/* open ingress interfaces */
for (int i = 0; i < nlive; i++) {

@ -48,12 +48,68 @@
#include "util-profiling.h"
#include "util-device.h"
/* TODO: Handle case without __thread */
#ifdef TLS
__thread PktPool thread_pkt_pool;
static inline PktPool *GetThreadPacketPool(void)
{
return &thread_pkt_pool;
}
static inline PktPool *ThreadPacketPoolCreate(void)
{
/* Nothing to do since __thread allocates the memory. */
return GetThreadPacketPool();
}
#else
/* __thread not supported. */
static pthread_key_t pkt_pool_thread_key;
static inline PktPool *GetThreadPacketPool(void)
{
return (PktPool*)pthread_getspecific(pkt_pool_thread_key);
}
static inline PktPool *ThreadPacketPoolCreate(void)
{
/* Check that the pool is not already created */
PktPool *pool = GetThreadPacketPool();
if (pool)
return pool;
/* Create a new pool for this thread. */
pool = (PktPool*)SCMallocAligned(sizeof(PktPool), CLS);
if (pool == NULL) {
SCLogError(SC_ERR_MEM_ALLOC, "malloc failed");
exit(EXIT_FAILURE);
}
pthread_setspecific(pkt_pool_thread_key, pool);
return pool;
}
static void PktPoolThreadDestroy(void * buf)
{
free(buf);
}
#endif
/* Number of freed packet to save for one pool before freeing them. */
#define MAX_PENDING_RETURN_PACKETS 32
void TmqhPacketpoolInit(void)
{
#ifndef TLS
/* Create the pthread Key that is used to look up thread specific
* data buffer. Needs to be created only once.
*/
pthread_key_create(&pkt_pool_thread_key, PktPoolThreadDestroy);
#endif
}
/**
* \brief TmqhPacketpoolRegister
* \initonly
@ -62,12 +118,14 @@ void TmqhPacketpoolRegister (void) {
tmqh_table[TMQH_PACKETPOOL].name = "packetpool";
tmqh_table[TMQH_PACKETPOOL].InHandler = TmqhInputPacketpool;
tmqh_table[TMQH_PACKETPOOL].OutHandler = TmqhOutputPacketpool;
TmqhPacketpoolInit();
}
static int PacketPoolIsEmpty(void)
static int PacketPoolIsEmpty(PktPool *pool)
{
/* Check local stack first. */
if (thread_pkt_pool.head || thread_pkt_pool.return_stack.head)
if (pool->head || pool->return_stack.head)
return 0;
return 1;
@ -75,7 +133,9 @@ static int PacketPoolIsEmpty(void)
void PacketPoolWait(void)
{
while(PacketPoolIsEmpty())
PktPool *my_pool = GetThreadPacketPool();
while(PacketPoolIsEmpty(my_pool))
;
}
@ -88,7 +148,7 @@ static void PacketPoolStorePacket(Packet *p)
/* Clear the PKT_ALLOC flag, since that indicates to push back
* onto the ring buffer. */
p->flags &= ~PKT_ALLOC;
p->pool = &thread_pkt_pool;;
p->pool = GetThreadPacketPool();
p->ReleasePacket = PacketPoolReturnPacket;
PacketPoolReturnPacket(p);
}
@ -102,7 +162,7 @@ static void PacketPoolStorePacket(Packet *p)
*/
Packet *PacketPoolGetPacket(void)
{
PktPool *pool = &thread_pkt_pool;
PktPool *pool = GetThreadPacketPool();
if (pool->head) {
/* Stack is not empty. */
@ -125,7 +185,7 @@ Packet *PacketPoolGetPacket(void)
*/
if (pool->head) {
/* Stack is not empty. */
Packet *p = pool->return_stack.head;
Packet *p = pool->head;
pool->head = p->next;
p->pool = pool;
return p;
@ -141,6 +201,8 @@ Packet *PacketPoolGetPacket(void)
*/
void PacketPoolReturnPacket(Packet *p)
{
PktPool *my_pool = GetThreadPacketPool();
PktPool *pool = p->pool;
if (pool == NULL) {
free(p);
@ -149,31 +211,31 @@ void PacketPoolReturnPacket(Packet *p)
PACKET_RECYCLE(p);
if (pool == &thread_pkt_pool) {
if (pool == my_pool) {
/* Push back onto this thread's own stack, so no locking. */
p->next = thread_pkt_pool.head;
thread_pkt_pool.head = p;
p->next = my_pool->head;
my_pool->head = p;
} else {
PktPool *pending_pool = thread_pkt_pool.pending_pool;
PktPool *pending_pool = my_pool->pending_pool;
if (pending_pool == NULL) {
/* No pending packet, so store the current packet. */
thread_pkt_pool.pending_pool = pool;
thread_pkt_pool.pending_head = p;
thread_pkt_pool.pending_tail = p;
thread_pkt_pool.pending_count = 1;
my_pool->pending_pool = pool;
my_pool->pending_head = p;
my_pool->pending_tail = p;
my_pool->pending_count = 1;
} else if (pending_pool == pool) {
/* Another packet for the pending pool list. */
p->next = thread_pkt_pool.pending_head;
thread_pkt_pool.pending_head->next = p;
thread_pkt_pool.pending_count++;
if (thread_pkt_pool.pending_count > MAX_PENDING_RETURN_PACKETS) {
p->next = my_pool->pending_head;
my_pool->pending_head->next = p;
my_pool->pending_count++;
if (my_pool->pending_count > MAX_PENDING_RETURN_PACKETS) {
/* Return the entire list of pending packets. */
SCMutexLock(&pool->return_stack.mutex);
thread_pkt_pool.pending_tail->next = pool->return_stack.head;
pool->return_stack.head = thread_pkt_pool.pending_head;
my_pool->pending_tail->next = pool->return_stack.head;
pool->return_stack.head = my_pool->pending_head;
SCMutexUnlock(&pool->return_stack.mutex);
/* Clear the list of pending packets to return. */
thread_pkt_pool.pending_pool = NULL;
my_pool->pending_pool = NULL;
}
} else {
/* Push onto return stack for this pool */
@ -189,7 +251,9 @@ void PacketPoolInit(void)
{
extern intmax_t max_pending_packets;
SCMutexInit(&thread_pkt_pool.return_stack.mutex, NULL);
PktPool *my_pool = ThreadPacketPoolCreate();
SCMutexInit(&my_pool->return_stack.mutex, NULL);
/* pre allocate packets */
SCLogDebug("preallocating packets... packet size %" PRIuMAX "",

@ -30,7 +30,7 @@
/* Return stack, onto which other threads free packets. */
typedef struct PktPoolLockedStack_{
/* linked list of free packets. */
SCSpinlock mutex;
SCMutex mutex;
Packet *head;
} __attribute__((aligned(CLS))) PktPoolLockedStack;

Loading…
Cancel
Save