From 1858be7a2fd8e716bfa2f6c4740343f1fe08a04b Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Mon, 14 Sep 2009 18:01:51 +0200 Subject: [PATCH] Lock threadvars flags using spinlocks. --- src/app-layer-detect-proto.c | 4 +- src/source-pcap.c | 3 +- src/threads.h | 10 ++- src/threadvars.h | 2 + src/tm-threads.c | 155 +++++++++++++++++++++-------------- src/tm-threads.h | 3 + 6 files changed, 108 insertions(+), 69 deletions(-) diff --git a/src/app-layer-detect-proto.c b/src/app-layer-detect-proto.c index 21281bc45c..b689b15cf9 100644 --- a/src/app-layer-detect-proto.c +++ b/src/app-layer-detect-proto.c @@ -304,7 +304,7 @@ void *AppLayerDetectProtoThread(void *td) StreamMsgQueueSetMinInitChunkLen(FLOW_PKT_TOSERVER, INSPECT_BYTES); StreamMsgQueueSetMinInitChunkLen(FLOW_PKT_TOCLIENT, INSPECT_BYTES); - tv->flags |= THV_INIT_DONE; + TmThreadsSetFlag(tv, THV_INIT_DONE); /* main loop */ while(run) { @@ -363,7 +363,7 @@ void *AppLayerDetectProtoThread(void *td) StreamMsgReturnToPool(smsg); } - if (tv->flags & THV_KILL) { + if (TmThreadsCheckFlag(tv, THV_KILL)) { PerfUpdateCounterArray(tv->pca, &tv->pctx, 0); run = 0; } diff --git a/src/source-pcap.c b/src/source-pcap.c index 96c4dfc2c4..5a8d907f38 100644 --- a/src/source-pcap.c +++ b/src/source-pcap.c @@ -22,6 +22,7 @@ #include "threadvars.h" #include "tm-queuehandlers.h" #include "tm-modules.h" +#include "tm-threads.h" #include "source-pcap.h" /** @@ -138,7 +139,7 @@ int ReceivePcap(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) { break; } - if (tv->flags & THV_KILL || tv->flags & THV_PAUSE) { + if (TmThreadsCheckFlag(tv, THV_KILL) || TmThreadsCheckFlag(tv, THV_PAUSE)) { printf("ReceivePcap: interrupted.\n"); return 0; } diff --git a/src/threads.h b/src/threads.h index c238bda19f..79ab1d22c7 100644 --- a/src/threads.h +++ b/src/threads.h @@ -16,9 +16,13 @@ int mutex_unlock_dbg (pthread_mutex_t *); #else /* DBG_THREADS */ -#define mutex_lock pthread_mutex_lock -#define mutex_trylock pthread_mutex_trylock -#define mutex_unlock pthread_mutex_unlock +#define mutex_lock pthread_mutex_lock +#define mutex_trylock pthread_mutex_trylock +#define mutex_unlock pthread_mutex_unlock + +#define spin_lock pthread_spin_lock +#define spin_trylock pthread_spin_trylock +#define spin_unlock pthread_spin_unlock #endif /* DBG_THREADS */ diff --git a/src/threadvars.h b/src/threadvars.h index d0d161aed3..8ab4b0f24e 100644 --- a/src/threadvars.h +++ b/src/threadvars.h @@ -27,7 +27,9 @@ typedef struct ThreadVars_ { pthread_t t; char *name; + uint8_t flags; + pthread_spinlock_t flags_spinlock; /** aof(action on failure) determines what should be done with the thread when it encounters certain conditions like failures */ diff --git a/src/tm-threads.c b/src/tm-threads.c index e05c170440..31b0475ddb 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -17,6 +17,7 @@ #include "tm-modules.h" #include "tm-threads.h" #include "tmqh-packetpool.h" +#include "threads.h" /* prototypes */ static int SetCPUAffinity(int cpu); @@ -58,6 +59,29 @@ typedef struct TmVarSlot_ { TmSlot *s; } TmVarSlot; +/** \retval 1 flag is set + * \retval 0 flag is not set + */ +inline int TmThreadsCheckFlag(ThreadVars *tv, uint8_t flag) { + int r; + spin_lock(&tv->flags_spinlock); + r = (tv->flags & flag); + spin_unlock(&tv->flags_spinlock); + return r; +} + +inline void TmThreadsSetFlag(ThreadVars *tv, uint8_t flag) { + spin_lock(&tv->flags_spinlock); + tv->flags |= flag; + spin_unlock(&tv->flags_spinlock); +} + +inline void TmThreadsUnsetFlag(ThreadVars *tv, uint8_t flag) { + spin_lock(&tv->flags_spinlock); + tv->flags &= ~flag; + spin_unlock(&tv->flags_spinlock); +} + /* 1 slot functions */ void *TmThreadsSlot1NoIn(void *td) { @@ -75,13 +99,14 @@ void *TmThreadsSlot1NoIn(void *td) { if (r != 0) { EngineKill(); - tv->flags |= THV_CLOSED; + TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) -1); } } memset(&s->s.slot_pq, 0, sizeof(PacketQueue)); - tv->flags |= THV_INIT_DONE; + TmThreadsSetFlag(tv, THV_INIT_DONE); + while(run) { TmThreadTestThreadUnPaused(tv); @@ -91,7 +116,7 @@ void *TmThreadsSlot1NoIn(void *td) { if (r == 1) { TmqhReleasePacketsToPacketPool(&s->s.slot_pq); TmqhOutputPacketpool(tv, p); - tv->flags |= THV_FAILED; + TmThreadsSetFlag(tv, THV_FAILED); break; } @@ -102,7 +127,7 @@ void *TmThreadsSlot1NoIn(void *td) { tv->tmqh_out(tv, p); - if (tv->flags & THV_KILL) { + if (TmThreadsCheckFlag(tv, THV_KILL)) { PerfUpdateCounterArray(tv->pca, &tv->pctx, 0); run = 0; } @@ -115,12 +140,12 @@ void *TmThreadsSlot1NoIn(void *td) { if (s->s.SlotThreadDeinit != NULL) { r = s->s.SlotThreadDeinit(tv, s->s.slot_data); if (r != 0) { - tv->flags |= THV_CLOSED; + TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) -1); } } - tv->flags |= THV_CLOSED; + TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) 0); } @@ -139,13 +164,14 @@ void *TmThreadsSlot1NoOut(void *td) { if (r != 0) { EngineKill(); - tv->flags |= THV_CLOSED; + TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) -1); } } memset(&s->s.slot_pq, 0, sizeof(PacketQueue)); - tv->flags |= THV_INIT_DONE; + TmThreadsSetFlag(tv, THV_INIT_DONE); + while(run) { TmThreadTestThreadUnPaused(tv); @@ -156,11 +182,11 @@ void *TmThreadsSlot1NoOut(void *td) { /* handle error */ if (r == 1) { TmqhOutputPacketpool(tv, p); - tv->flags |= THV_FAILED; + TmThreadsSetFlag(tv, THV_FAILED); break; } - if (tv->flags & THV_KILL) { + if (TmThreadsCheckFlag(tv, THV_KILL)) { PerfUpdateCounterArray(tv->pca, &tv->pctx, 0); run = 0; } @@ -173,12 +199,12 @@ void *TmThreadsSlot1NoOut(void *td) { if (s->s.SlotThreadDeinit != NULL) { r = s->s.SlotThreadDeinit(tv, s->s.slot_data); if (r != 0) { - tv->flags |= THV_CLOSED; + TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) -1); } } - tv->flags |= THV_CLOSED; + TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) 0); } @@ -198,13 +224,13 @@ void *TmThreadsSlot1NoInOut(void *td) { if (r != 0) { EngineKill(); - tv->flags |= THV_CLOSED; + TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) -1); } } memset(&s->s.slot_pq, 0, sizeof(PacketQueue)); - tv->flags |= THV_INIT_DONE; + TmThreadsSetFlag(tv, THV_INIT_DONE); while(run) { TmThreadTestThreadUnPaused(tv); @@ -213,11 +239,11 @@ void *TmThreadsSlot1NoInOut(void *td) { /* handle error */ if (r == 1) { - tv->flags |= THV_FAILED; + TmThreadsSetFlag(tv, THV_FAILED); break; } - if (tv->flags & THV_KILL) { + if (TmThreadsCheckFlag(tv, THV_KILL)) { //printf("%s: TmThreadsSlot1NoInOut: KILL is set\n", tv->name); PerfUpdateCounterArray(tv->pca, &tv->pctx, 0); run = 0; @@ -231,13 +257,13 @@ void *TmThreadsSlot1NoInOut(void *td) { if (s->s.SlotThreadDeinit != NULL) { r = s->s.SlotThreadDeinit(tv, s->s.slot_data); if (r != 0) { - tv->flags |= THV_CLOSED; + TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) -1); } } //printf("TmThreadsSlot1NoInOut: %s ending\n", tv->name); - tv->flags |= THV_CLOSED; + TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) 0); } @@ -258,13 +284,13 @@ void *TmThreadsSlot1(void *td) { if (r != 0) { EngineKill(); - tv->flags |= THV_CLOSED; + TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) -1); } } memset(&s->s.slot_pq, 0, sizeof(PacketQueue)); - tv->flags |= THV_INIT_DONE; + TmThreadsSetFlag(tv, THV_INIT_DONE); while(run) { TmThreadTestThreadUnPaused(tv); @@ -280,7 +306,7 @@ void *TmThreadsSlot1(void *td) { if (r == 1) { TmqhReleasePacketsToPacketPool(&s->s.slot_pq); TmqhOutputPacketpool(tv, p); - tv->flags |= THV_FAILED; + TmThreadsSetFlag(tv, THV_FAILED); break; } @@ -296,7 +322,7 @@ void *TmThreadsSlot1(void *td) { tv->tmqh_out(tv, p); } - if (tv->flags & THV_KILL) { + if (TmThreadsCheckFlag(tv, THV_KILL)) { //printf("%s: TmThreadsSlot1: KILL is set\n", tv->name); PerfUpdateCounterArray(tv->pca, &tv->pctx, 0); run = 0; @@ -310,13 +336,13 @@ void *TmThreadsSlot1(void *td) { if (s->s.SlotThreadDeinit != NULL) { r = s->s.SlotThreadDeinit(tv, s->s.slot_data); if (r != 0) { - tv->flags |= THV_CLOSED; + TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) -1); } } //printf("TmThreadsSlot1: %s ending\n", tv->name); - tv->flags |= THV_CLOSED; + TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) 0); } @@ -332,7 +358,7 @@ static inline int TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *slot) //printf("TmThreadsSlotVarRun: s->SlotFunc %p returned 1\n", s->SlotFunc); /* Encountered error. Return packets to packetpool and return */ TmqhReleasePacketsToPacketPool(&s->slot_pq); - tv->flags |= THV_FAILED; + TmThreadsSetFlag(tv, THV_FAILED); return 1; } @@ -348,7 +374,7 @@ static inline int TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *slot) //printf("TmThreadsSlotVarRun: recursive TmThreadsSlotVarRun returned 1\n"); TmqhReleasePacketsToPacketPool(&s->slot_pq); TmqhOutputPacketpool(tv, extra_p); - tv->flags |= THV_FAILED; + TmThreadsSetFlag(tv, THV_FAILED); return 1; } } @@ -378,14 +404,15 @@ void *TmThreadsSlotVar(void *td) { if (r != 0) { EngineKill(); - tv->flags |= THV_CLOSED; + TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) -1); } } memset(&slot->slot_pq, 0, sizeof(PacketQueue)); } - tv->flags |= THV_INIT_DONE; + TmThreadsSetFlag(tv, THV_INIT_DONE); + while(run) { TmThreadTestThreadUnPaused(tv); @@ -401,7 +428,7 @@ void *TmThreadsSlotVar(void *td) { if (r == 1) { //printf("TmThreadsSlotVar: TmThreadsSlotVarRun returned 1, breaking out of the loop.\n"); TmqhOutputPacketpool(tv, p); - tv->flags |= THV_FAILED; + TmThreadsSetFlag(tv, THV_FAILED); break; } @@ -409,7 +436,7 @@ void *TmThreadsSlotVar(void *td) { tv->tmqh_out(tv, p); } - if (tv->flags & THV_KILL) { + if (TmThreadsCheckFlag(tv, THV_KILL)) { //printf("%s: TmThreadsSlot1: KILL is set\n", tv->name); PerfUpdateCounterArray(tv->pca, &tv->pctx, 0); run = 0; @@ -424,14 +451,14 @@ void *TmThreadsSlotVar(void *td) { if (slot->SlotThreadDeinit != NULL) { r = slot->SlotThreadDeinit(tv, slot->slot_data); if (r != 0) { - tv->flags |= THV_CLOSED; + TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) -1); } } } //printf("TmThreadsSlot1: %s ending\n", tv->name); - tv->flags |= THV_CLOSED; + TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) 0); } @@ -538,7 +565,7 @@ static int SetCPUAffinity(int cpu) { CPU_ZERO(&cs); CPU_SET(cpu,&cs); - int r = sched_setaffinity(tid,sizeof(cpu_set_t),&cs); + int r = sched_setaffinity(tid,sizeof(cpu_set_t),&cs); if (r != 0) { printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r, strerror(errno)); } @@ -582,9 +609,11 @@ ThreadVars *TmThreadCreate(char *name, char *inq_name, char *inqh_name, if (tv == NULL) goto error; memset(tv, 0, sizeof(ThreadVars)); + pthread_spin_init(&tv->flags_spinlock, PTHREAD_PROCESS_PRIVATE); + tv->name = name; /* default state for every newly created thread */ - tv->flags = THV_USE | THV_PAUSE; + TmThreadsSetFlag(tv, THV_USE | THV_PAUSE); /* default aof for every newly created thread */ tv->aof = THV_RESTART_THREAD; @@ -737,23 +766,23 @@ void TmThreadAppend(ThreadVars *tv, int type) } void TmThreadKillThreads(void) { - ThreadVars *t = NULL; + ThreadVars *tv = NULL; int i = 0; for (i = 0; i < TVT_MAX; i++) { - t = tv_root[i]; + tv = tv_root[i]; - while (t) { - t->flags |= THV_KILL; + while (tv) { + TmThreadsSetFlag(tv, THV_KILL); #ifdef DEBUG - printf("TmThreadKillThreads: told thread %s to stop\n", t->name); + printf("TmThreadKillThreads: told thread %s to stop\n", tv->name); #endif /* XXX hack */ StreamMsgSignalQueueHack(); - if (t->inq != NULL) { + if (tv->inq != NULL) { int i; //printf("TmThreadKillThreads: (t->inq->reader_cnt + t->inq->writer_cnt) %" PRIu32 "\n", (t->inq->reader_cnt + t->inq->writer_cnt)); @@ -763,13 +792,13 @@ void TmThreadKillThreads(void) { /* signal the queue for the number of users */ - for (i = 0; i < (t->inq->reader_cnt + t->inq->writer_cnt); i++) - pthread_cond_signal(&trans_q[t->inq->id].cond_q); + for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) + pthread_cond_signal(&trans_q[tv->inq->id].cond_q); /* to be sure, signal more */ int cnt = 0; while (1) { - if (t->flags & THV_CLOSED) { + if (TmThreadsCheckFlag(tv, THV_CLOSED)) { #ifdef DEBUG printf("signalled the thread %" PRId32 " times\n", cnt); #endif @@ -778,22 +807,22 @@ void TmThreadKillThreads(void) { cnt++; - for (i = 0; i < (t->inq->reader_cnt + t->inq->writer_cnt); i++) - pthread_cond_signal(&trans_q[t->inq->id].cond_q); + for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) + pthread_cond_signal(&trans_q[tv->inq->id].cond_q); usleep(100); } #ifdef DEBUG - printf("TmThreadKillThreads: signalled t->inq->id %" PRIu32 "\n", t->inq->id); + printf("TmThreadKillThreads: signalled tv->inq->id %" PRIu32 "\n", tv->inq->id); #endif } - if (t->cond != NULL ) { + if (tv->cond != NULL ) { int cnt = 0; while (1) { - if (t->flags & THV_CLOSED) { + if (TmThreadsCheckFlag(tv, THV_CLOSED)) { #ifdef DEBUG printf("signalled the thread %" PRId32 " times\n", cnt); #endif @@ -802,19 +831,19 @@ void TmThreadKillThreads(void) { cnt++; - pthread_cond_broadcast(t->cond); + pthread_cond_broadcast(tv->cond); usleep(100); } } /* join it */ - pthread_join(t->t, NULL); + pthread_join(tv->t, NULL); #ifdef DEBUG - printf("TmThreadKillThreads: thread %s stopped\n", t->name); + printf("TmThreadKillThreads: thread %s stopped\n", tv->name); #endif - t = t->next; + tv = tv->next; } } } @@ -854,6 +883,7 @@ int TmThreadSpawn(ThreadVars *tv) * \param tv Pointer to the thread instance for which the flag has to be set * \param flags Holds the thread state this thread instance has to be set to */ +#if 0 void TmThreadSetFlags(ThreadVars *tv, uint8_t flags) { if (tv != NULL) @@ -861,7 +891,7 @@ void TmThreadSetFlags(ThreadVars *tv, uint8_t flags) return; } - +#endif /** * \brief Sets the aof(Action on failure) for a thread instance(tv) * @@ -914,9 +944,9 @@ void TmThreadInitMC(ThreadVars *tv) */ void TmThreadTestThreadUnPaused(ThreadVars *tv) { - while (tv->flags & THV_PAUSE) { + while (TmThreadsCheckFlag(tv, THV_PAUSE)) { usleep(100); - if (tv->flags & THV_KILL) + if (TmThreadsCheckFlag(tv, THV_KILL)) break; } @@ -930,8 +960,7 @@ void TmThreadTestThreadUnPaused(ThreadVars *tv) */ void TmThreadContinue(ThreadVars *tv) { - tv->flags &= ~THV_PAUSE; - + TmThreadsUnsetFlag(tv, THV_PAUSE); return; } @@ -961,8 +990,7 @@ void TmThreadContinueThreads() */ void TmThreadPause(ThreadVars *tv) { - tv->flags |= THV_PAUSE; - + TmThreadsSetFlag(tv, THV_PAUSE); return; } @@ -1000,7 +1028,8 @@ static void TmThreadRestartThread(ThreadVars *tv) return; } - tv->flags &= ~((uint8_t)(THV_CLOSED | THV_FAILED)); + /** \todo consider a TmThreadUnsetFlag func? */ + TmThreadsUnsetFlag(tv, THV_CLOSED | THV_FAILED); if (TmThreadSpawn(tv) != 0) { printf("Error: TmThreadSpawn failed\n"); @@ -1029,13 +1058,13 @@ void TmThreadCheckThreadState(void) tv = tv_root[i]; while (tv) { - if (tv->flags & THV_FAILED) { + if (TmThreadsCheckFlag(tv, THV_FAILED)) { pthread_join(tv->t, NULL); if ( !(tv_aof & THV_ENGINE_EXIT) && (tv->aof & THV_RESTART_THREAD) ) { TmThreadRestartThread(tv); } else { - tv->flags |= THV_CLOSED; + TmThreadsSetFlag(tv, THV_CLOSED); EngineKill(); } } @@ -1058,7 +1087,7 @@ void TmThreadWaitOnThreadInit(void) for (i = 0; i < TVT_MAX; i++) { tv = tv_root[i]; while (tv != NULL) { - while (!(tv->flags & THV_INIT_DONE)) + while (!(TmThreadsCheckFlag(tv, THV_INIT_DONE))) ; tv = tv->next; } diff --git a/src/tm-threads.h b/src/tm-threads.h index 6fb21e879c..1cee7eb1e2 100644 --- a/src/tm-threads.h +++ b/src/tm-threads.h @@ -52,5 +52,8 @@ void TmThreadCheckThreadState(void); void TmThreadWaitOnThreadInit(void); +inline int TmThreadsCheckFlag(ThreadVars *, uint8_t); +inline void TmThreadsSetFlag(ThreadVars *, uint8_t); + #endif /* __TM_THREADS_H__ */