Lock threadvars flags using spinlocks.

remotes/origin/master-1.0.x
Victor Julien 16 years ago
parent 6095b8f2a1
commit 1858be7a2f

@ -304,7 +304,7 @@ void *AppLayerDetectProtoThread(void *td)
StreamMsgQueueSetMinInitChunkLen(FLOW_PKT_TOSERVER, INSPECT_BYTES);
StreamMsgQueueSetMinInitChunkLen(FLOW_PKT_TOCLIENT, INSPECT_BYTES);
tv->flags |= THV_INIT_DONE;
TmThreadsSetFlag(tv, THV_INIT_DONE);
/* main loop */
while(run) {
@ -363,7 +363,7 @@ void *AppLayerDetectProtoThread(void *td)
StreamMsgReturnToPool(smsg);
}
if (tv->flags & THV_KILL) {
if (TmThreadsCheckFlag(tv, THV_KILL)) {
PerfUpdateCounterArray(tv->pca, &tv->pctx, 0);
run = 0;
}

@ -22,6 +22,7 @@
#include "threadvars.h"
#include "tm-queuehandlers.h"
#include "tm-modules.h"
#include "tm-threads.h"
#include "source-pcap.h"
/**
@ -138,7 +139,7 @@ int ReceivePcap(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) {
break;
}
if (tv->flags & THV_KILL || tv->flags & THV_PAUSE) {
if (TmThreadsCheckFlag(tv, THV_KILL) || TmThreadsCheckFlag(tv, THV_PAUSE)) {
printf("ReceivePcap: interrupted.\n");
return 0;
}

@ -16,9 +16,13 @@ int mutex_unlock_dbg (pthread_mutex_t *);
#else /* DBG_THREADS */
#define mutex_lock pthread_mutex_lock
#define mutex_trylock pthread_mutex_trylock
#define mutex_unlock pthread_mutex_unlock
#define mutex_lock pthread_mutex_lock
#define mutex_trylock pthread_mutex_trylock
#define mutex_unlock pthread_mutex_unlock
#define spin_lock pthread_spin_lock
#define spin_trylock pthread_spin_trylock
#define spin_unlock pthread_spin_unlock
#endif /* DBG_THREADS */

@ -27,7 +27,9 @@
typedef struct ThreadVars_ {
pthread_t t;
char *name;
uint8_t flags;
pthread_spinlock_t flags_spinlock;
/** aof(action on failure) determines what should be done with the thread
when it encounters certain conditions like failures */

@ -17,6 +17,7 @@
#include "tm-modules.h"
#include "tm-threads.h"
#include "tmqh-packetpool.h"
#include "threads.h"
/* prototypes */
static int SetCPUAffinity(int cpu);
@ -58,6 +59,29 @@ typedef struct TmVarSlot_ {
TmSlot *s;
} TmVarSlot;
/** \retval 1 flag is set
* \retval 0 flag is not set
*/
inline int TmThreadsCheckFlag(ThreadVars *tv, uint8_t flag) {
int r;
spin_lock(&tv->flags_spinlock);
r = (tv->flags & flag);
spin_unlock(&tv->flags_spinlock);
return r;
}
inline void TmThreadsSetFlag(ThreadVars *tv, uint8_t flag) {
spin_lock(&tv->flags_spinlock);
tv->flags |= flag;
spin_unlock(&tv->flags_spinlock);
}
inline void TmThreadsUnsetFlag(ThreadVars *tv, uint8_t flag) {
spin_lock(&tv->flags_spinlock);
tv->flags &= ~flag;
spin_unlock(&tv->flags_spinlock);
}
/* 1 slot functions */
void *TmThreadsSlot1NoIn(void *td) {
@ -75,13 +99,14 @@ void *TmThreadsSlot1NoIn(void *td) {
if (r != 0) {
EngineKill();
tv->flags |= THV_CLOSED;
TmThreadsSetFlag(tv, THV_CLOSED);
pthread_exit((void *) -1);
}
}
memset(&s->s.slot_pq, 0, sizeof(PacketQueue));
tv->flags |= THV_INIT_DONE;
TmThreadsSetFlag(tv, THV_INIT_DONE);
while(run) {
TmThreadTestThreadUnPaused(tv);
@ -91,7 +116,7 @@ void *TmThreadsSlot1NoIn(void *td) {
if (r == 1) {
TmqhReleasePacketsToPacketPool(&s->s.slot_pq);
TmqhOutputPacketpool(tv, p);
tv->flags |= THV_FAILED;
TmThreadsSetFlag(tv, THV_FAILED);
break;
}
@ -102,7 +127,7 @@ void *TmThreadsSlot1NoIn(void *td) {
tv->tmqh_out(tv, p);
if (tv->flags & THV_KILL) {
if (TmThreadsCheckFlag(tv, THV_KILL)) {
PerfUpdateCounterArray(tv->pca, &tv->pctx, 0);
run = 0;
}
@ -115,12 +140,12 @@ void *TmThreadsSlot1NoIn(void *td) {
if (s->s.SlotThreadDeinit != NULL) {
r = s->s.SlotThreadDeinit(tv, s->s.slot_data);
if (r != 0) {
tv->flags |= THV_CLOSED;
TmThreadsSetFlag(tv, THV_CLOSED);
pthread_exit((void *) -1);
}
}
tv->flags |= THV_CLOSED;
TmThreadsSetFlag(tv, THV_CLOSED);
pthread_exit((void *) 0);
}
@ -139,13 +164,14 @@ void *TmThreadsSlot1NoOut(void *td) {
if (r != 0) {
EngineKill();
tv->flags |= THV_CLOSED;
TmThreadsSetFlag(tv, THV_CLOSED);
pthread_exit((void *) -1);
}
}
memset(&s->s.slot_pq, 0, sizeof(PacketQueue));
tv->flags |= THV_INIT_DONE;
TmThreadsSetFlag(tv, THV_INIT_DONE);
while(run) {
TmThreadTestThreadUnPaused(tv);
@ -156,11 +182,11 @@ void *TmThreadsSlot1NoOut(void *td) {
/* handle error */
if (r == 1) {
TmqhOutputPacketpool(tv, p);
tv->flags |= THV_FAILED;
TmThreadsSetFlag(tv, THV_FAILED);
break;
}
if (tv->flags & THV_KILL) {
if (TmThreadsCheckFlag(tv, THV_KILL)) {
PerfUpdateCounterArray(tv->pca, &tv->pctx, 0);
run = 0;
}
@ -173,12 +199,12 @@ void *TmThreadsSlot1NoOut(void *td) {
if (s->s.SlotThreadDeinit != NULL) {
r = s->s.SlotThreadDeinit(tv, s->s.slot_data);
if (r != 0) {
tv->flags |= THV_CLOSED;
TmThreadsSetFlag(tv, THV_CLOSED);
pthread_exit((void *) -1);
}
}
tv->flags |= THV_CLOSED;
TmThreadsSetFlag(tv, THV_CLOSED);
pthread_exit((void *) 0);
}
@ -198,13 +224,13 @@ void *TmThreadsSlot1NoInOut(void *td) {
if (r != 0) {
EngineKill();
tv->flags |= THV_CLOSED;
TmThreadsSetFlag(tv, THV_CLOSED);
pthread_exit((void *) -1);
}
}
memset(&s->s.slot_pq, 0, sizeof(PacketQueue));
tv->flags |= THV_INIT_DONE;
TmThreadsSetFlag(tv, THV_INIT_DONE);
while(run) {
TmThreadTestThreadUnPaused(tv);
@ -213,11 +239,11 @@ void *TmThreadsSlot1NoInOut(void *td) {
/* handle error */
if (r == 1) {
tv->flags |= THV_FAILED;
TmThreadsSetFlag(tv, THV_FAILED);
break;
}
if (tv->flags & THV_KILL) {
if (TmThreadsCheckFlag(tv, THV_KILL)) {
//printf("%s: TmThreadsSlot1NoInOut: KILL is set\n", tv->name);
PerfUpdateCounterArray(tv->pca, &tv->pctx, 0);
run = 0;
@ -231,13 +257,13 @@ void *TmThreadsSlot1NoInOut(void *td) {
if (s->s.SlotThreadDeinit != NULL) {
r = s->s.SlotThreadDeinit(tv, s->s.slot_data);
if (r != 0) {
tv->flags |= THV_CLOSED;
TmThreadsSetFlag(tv, THV_CLOSED);
pthread_exit((void *) -1);
}
}
//printf("TmThreadsSlot1NoInOut: %s ending\n", tv->name);
tv->flags |= THV_CLOSED;
TmThreadsSetFlag(tv, THV_CLOSED);
pthread_exit((void *) 0);
}
@ -258,13 +284,13 @@ void *TmThreadsSlot1(void *td) {
if (r != 0) {
EngineKill();
tv->flags |= THV_CLOSED;
TmThreadsSetFlag(tv, THV_CLOSED);
pthread_exit((void *) -1);
}
}
memset(&s->s.slot_pq, 0, sizeof(PacketQueue));
tv->flags |= THV_INIT_DONE;
TmThreadsSetFlag(tv, THV_INIT_DONE);
while(run) {
TmThreadTestThreadUnPaused(tv);
@ -280,7 +306,7 @@ void *TmThreadsSlot1(void *td) {
if (r == 1) {
TmqhReleasePacketsToPacketPool(&s->s.slot_pq);
TmqhOutputPacketpool(tv, p);
tv->flags |= THV_FAILED;
TmThreadsSetFlag(tv, THV_FAILED);
break;
}
@ -296,7 +322,7 @@ void *TmThreadsSlot1(void *td) {
tv->tmqh_out(tv, p);
}
if (tv->flags & THV_KILL) {
if (TmThreadsCheckFlag(tv, THV_KILL)) {
//printf("%s: TmThreadsSlot1: KILL is set\n", tv->name);
PerfUpdateCounterArray(tv->pca, &tv->pctx, 0);
run = 0;
@ -310,13 +336,13 @@ void *TmThreadsSlot1(void *td) {
if (s->s.SlotThreadDeinit != NULL) {
r = s->s.SlotThreadDeinit(tv, s->s.slot_data);
if (r != 0) {
tv->flags |= THV_CLOSED;
TmThreadsSetFlag(tv, THV_CLOSED);
pthread_exit((void *) -1);
}
}
//printf("TmThreadsSlot1: %s ending\n", tv->name);
tv->flags |= THV_CLOSED;
TmThreadsSetFlag(tv, THV_CLOSED);
pthread_exit((void *) 0);
}
@ -332,7 +358,7 @@ static inline int TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *slot)
//printf("TmThreadsSlotVarRun: s->SlotFunc %p returned 1\n", s->SlotFunc);
/* Encountered error. Return packets to packetpool and return */
TmqhReleasePacketsToPacketPool(&s->slot_pq);
tv->flags |= THV_FAILED;
TmThreadsSetFlag(tv, THV_FAILED);
return 1;
}
@ -348,7 +374,7 @@ static inline int TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *slot)
//printf("TmThreadsSlotVarRun: recursive TmThreadsSlotVarRun returned 1\n");
TmqhReleasePacketsToPacketPool(&s->slot_pq);
TmqhOutputPacketpool(tv, extra_p);
tv->flags |= THV_FAILED;
TmThreadsSetFlag(tv, THV_FAILED);
return 1;
}
}
@ -378,14 +404,15 @@ void *TmThreadsSlotVar(void *td) {
if (r != 0) {
EngineKill();
tv->flags |= THV_CLOSED;
TmThreadsSetFlag(tv, THV_CLOSED);
pthread_exit((void *) -1);
}
}
memset(&slot->slot_pq, 0, sizeof(PacketQueue));
}
tv->flags |= THV_INIT_DONE;
TmThreadsSetFlag(tv, THV_INIT_DONE);
while(run) {
TmThreadTestThreadUnPaused(tv);
@ -401,7 +428,7 @@ void *TmThreadsSlotVar(void *td) {
if (r == 1) {
//printf("TmThreadsSlotVar: TmThreadsSlotVarRun returned 1, breaking out of the loop.\n");
TmqhOutputPacketpool(tv, p);
tv->flags |= THV_FAILED;
TmThreadsSetFlag(tv, THV_FAILED);
break;
}
@ -409,7 +436,7 @@ void *TmThreadsSlotVar(void *td) {
tv->tmqh_out(tv, p);
}
if (tv->flags & THV_KILL) {
if (TmThreadsCheckFlag(tv, THV_KILL)) {
//printf("%s: TmThreadsSlot1: KILL is set\n", tv->name);
PerfUpdateCounterArray(tv->pca, &tv->pctx, 0);
run = 0;
@ -424,14 +451,14 @@ void *TmThreadsSlotVar(void *td) {
if (slot->SlotThreadDeinit != NULL) {
r = slot->SlotThreadDeinit(tv, slot->slot_data);
if (r != 0) {
tv->flags |= THV_CLOSED;
TmThreadsSetFlag(tv, THV_CLOSED);
pthread_exit((void *) -1);
}
}
}
//printf("TmThreadsSlot1: %s ending\n", tv->name);
tv->flags |= THV_CLOSED;
TmThreadsSetFlag(tv, THV_CLOSED);
pthread_exit((void *) 0);
}
@ -538,7 +565,7 @@ static int SetCPUAffinity(int cpu) {
CPU_ZERO(&cs);
CPU_SET(cpu,&cs);
int r = sched_setaffinity(tid,sizeof(cpu_set_t),&cs);
int r = sched_setaffinity(tid,sizeof(cpu_set_t),&cs);
if (r != 0) {
printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r, strerror(errno));
}
@ -582,9 +609,11 @@ ThreadVars *TmThreadCreate(char *name, char *inq_name, char *inqh_name,
if (tv == NULL) goto error;
memset(tv, 0, sizeof(ThreadVars));
pthread_spin_init(&tv->flags_spinlock, PTHREAD_PROCESS_PRIVATE);
tv->name = name;
/* default state for every newly created thread */
tv->flags = THV_USE | THV_PAUSE;
TmThreadsSetFlag(tv, THV_USE | THV_PAUSE);
/* default aof for every newly created thread */
tv->aof = THV_RESTART_THREAD;
@ -737,23 +766,23 @@ void TmThreadAppend(ThreadVars *tv, int type)
}
void TmThreadKillThreads(void) {
ThreadVars *t = NULL;
ThreadVars *tv = NULL;
int i = 0;
for (i = 0; i < TVT_MAX; i++) {
t = tv_root[i];
tv = tv_root[i];
while (t) {
t->flags |= THV_KILL;
while (tv) {
TmThreadsSetFlag(tv, THV_KILL);
#ifdef DEBUG
printf("TmThreadKillThreads: told thread %s to stop\n", t->name);
printf("TmThreadKillThreads: told thread %s to stop\n", tv->name);
#endif
/* XXX hack */
StreamMsgSignalQueueHack();
if (t->inq != NULL) {
if (tv->inq != NULL) {
int i;
//printf("TmThreadKillThreads: (t->inq->reader_cnt + t->inq->writer_cnt) %" PRIu32 "\n", (t->inq->reader_cnt + t->inq->writer_cnt));
@ -763,13 +792,13 @@ void TmThreadKillThreads(void) {
/* signal the queue for the number of users */
for (i = 0; i < (t->inq->reader_cnt + t->inq->writer_cnt); i++)
pthread_cond_signal(&trans_q[t->inq->id].cond_q);
for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++)
pthread_cond_signal(&trans_q[tv->inq->id].cond_q);
/* to be sure, signal more */
int cnt = 0;
while (1) {
if (t->flags & THV_CLOSED) {
if (TmThreadsCheckFlag(tv, THV_CLOSED)) {
#ifdef DEBUG
printf("signalled the thread %" PRId32 " times\n", cnt);
#endif
@ -778,22 +807,22 @@ void TmThreadKillThreads(void) {
cnt++;
for (i = 0; i < (t->inq->reader_cnt + t->inq->writer_cnt); i++)
pthread_cond_signal(&trans_q[t->inq->id].cond_q);
for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++)
pthread_cond_signal(&trans_q[tv->inq->id].cond_q);
usleep(100);
}
#ifdef DEBUG
printf("TmThreadKillThreads: signalled t->inq->id %" PRIu32 "\n", t->inq->id);
printf("TmThreadKillThreads: signalled tv->inq->id %" PRIu32 "\n", tv->inq->id);
#endif
}
if (t->cond != NULL ) {
if (tv->cond != NULL ) {
int cnt = 0;
while (1) {
if (t->flags & THV_CLOSED) {
if (TmThreadsCheckFlag(tv, THV_CLOSED)) {
#ifdef DEBUG
printf("signalled the thread %" PRId32 " times\n", cnt);
#endif
@ -802,19 +831,19 @@ void TmThreadKillThreads(void) {
cnt++;
pthread_cond_broadcast(t->cond);
pthread_cond_broadcast(tv->cond);
usleep(100);
}
}
/* join it */
pthread_join(t->t, NULL);
pthread_join(tv->t, NULL);
#ifdef DEBUG
printf("TmThreadKillThreads: thread %s stopped\n", t->name);
printf("TmThreadKillThreads: thread %s stopped\n", tv->name);
#endif
t = t->next;
tv = tv->next;
}
}
}
@ -854,6 +883,7 @@ int TmThreadSpawn(ThreadVars *tv)
* \param tv Pointer to the thread instance for which the flag has to be set
* \param flags Holds the thread state this thread instance has to be set to
*/
#if 0
void TmThreadSetFlags(ThreadVars *tv, uint8_t flags)
{
if (tv != NULL)
@ -861,7 +891,7 @@ void TmThreadSetFlags(ThreadVars *tv, uint8_t flags)
return;
}
#endif
/**
* \brief Sets the aof(Action on failure) for a thread instance(tv)
*
@ -914,9 +944,9 @@ void TmThreadInitMC(ThreadVars *tv)
*/
void TmThreadTestThreadUnPaused(ThreadVars *tv)
{
while (tv->flags & THV_PAUSE) {
while (TmThreadsCheckFlag(tv, THV_PAUSE)) {
usleep(100);
if (tv->flags & THV_KILL)
if (TmThreadsCheckFlag(tv, THV_KILL))
break;
}
@ -930,8 +960,7 @@ void TmThreadTestThreadUnPaused(ThreadVars *tv)
*/
void TmThreadContinue(ThreadVars *tv)
{
tv->flags &= ~THV_PAUSE;
TmThreadsUnsetFlag(tv, THV_PAUSE);
return;
}
@ -961,8 +990,7 @@ void TmThreadContinueThreads()
*/
void TmThreadPause(ThreadVars *tv)
{
tv->flags |= THV_PAUSE;
TmThreadsSetFlag(tv, THV_PAUSE);
return;
}
@ -1000,7 +1028,8 @@ static void TmThreadRestartThread(ThreadVars *tv)
return;
}
tv->flags &= ~((uint8_t)(THV_CLOSED | THV_FAILED));
/** \todo consider a TmThreadUnsetFlag func? */
TmThreadsUnsetFlag(tv, THV_CLOSED | THV_FAILED);
if (TmThreadSpawn(tv) != 0) {
printf("Error: TmThreadSpawn failed\n");
@ -1029,13 +1058,13 @@ void TmThreadCheckThreadState(void)
tv = tv_root[i];
while (tv) {
if (tv->flags & THV_FAILED) {
if (TmThreadsCheckFlag(tv, THV_FAILED)) {
pthread_join(tv->t, NULL);
if ( !(tv_aof & THV_ENGINE_EXIT) &&
(tv->aof & THV_RESTART_THREAD) ) {
TmThreadRestartThread(tv);
} else {
tv->flags |= THV_CLOSED;
TmThreadsSetFlag(tv, THV_CLOSED);
EngineKill();
}
}
@ -1058,7 +1087,7 @@ void TmThreadWaitOnThreadInit(void)
for (i = 0; i < TVT_MAX; i++) {
tv = tv_root[i];
while (tv != NULL) {
while (!(tv->flags & THV_INIT_DONE))
while (!(TmThreadsCheckFlag(tv, THV_INIT_DONE)))
;
tv = tv->next;
}

@ -52,5 +52,8 @@ void TmThreadCheckThreadState(void);
void TmThreadWaitOnThreadInit(void);
inline int TmThreadsCheckFlag(ThreadVars *, uint8_t);
inline void TmThreadsSetFlag(ThreadVars *, uint8_t);
#endif /* __TM_THREADS_H__ */

Loading…
Cancel
Save