From 1ad289dfffe339df59c3f63571dcd698c18895b2 Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Sat, 19 Jun 2010 16:43:59 +0200 Subject: [PATCH] Add thread cond_t based waiting in the ringbuffer. --- src/util-ringbuffer.c | 173 +++++++++++++++++++++++++++++++++++++++++- src/util-ringbuffer.h | 13 ++++ 2 files changed, 185 insertions(+), 1 deletion(-) diff --git a/src/util-ringbuffer.c b/src/util-ringbuffer.c index d130b095c2..d8c6aef22d 100644 --- a/src/util-ringbuffer.c +++ b/src/util-ringbuffer.c @@ -53,6 +53,10 @@ RingBuffer8 *RingBufferMrSw8Init(void) { SC_ATOMIC_INIT(rb->write); SC_ATOMIC_INIT(rb->read); +#ifdef RINGBUFFER_MUTEX_WAIT + SCMutexInit(&rb->wait_mutex, NULL); + SCCondInit(&rb->wait_cond, NULL); +#endif return rb; } @@ -61,6 +65,10 @@ void RingBufferMrSw8Destroy(RingBuffer8 *rb) { SC_ATOMIC_DESTROY(rb->write); SC_ATOMIC_DESTROY(rb->read); +#ifdef RINGBUFFER_MUTEX_WAIT + SCMutexDestroy(&rb->wait_mutex); + SCCondDestroy(&rb->wait_cond); +#endif SCFree(rb); } } @@ -87,7 +95,16 @@ retry: if (rb->shutdown != 0) return NULL; +#ifdef RINGBUFFER_MUTEX_WAIT + struct timespec cond_time; + cond_time.tv_sec = time(NULL) + 1; + cond_time.tv_nsec = 0; + SCMutexLock(&rb->wait_mutex); + SCCondTimedwait(&rb->wait_cond, &rb->wait_mutex, &cond_time); + SCMutexUnlock(&rb->wait_mutex); +#else usleep(USLEEP_TIME); +#endif } /* atomically update rb->read */ @@ -103,6 +120,10 @@ retry: } while (!(SC_ATOMIC_CAS(&rb->read, readp, (readp + 1)))); SCLogDebug("ptr %p", ptr); + +#ifdef RINGBUFFER_MUTEX_WAIT + SCCondSignal(&rb->wait_cond); +#endif return ptr; } @@ -118,11 +139,24 @@ int RingBufferMrSw8Put(RingBuffer8 *rb, void *ptr) { if (rb->shutdown != 0) return -1; +#ifdef RINGBUFFER_MUTEX_WAIT + struct timespec cond_time; + cond_time.tv_sec = time(NULL) + 1; + cond_time.tv_nsec = 0; + SCMutexLock(&rb->wait_mutex); + SCCondTimedwait(&rb->wait_cond, &rb->wait_mutex, &cond_time); + SCMutexUnlock(&rb->wait_mutex); +#else usleep(USLEEP_TIME); +#endif } rb->array[SC_ATOMIC_GET(rb->write)] = ptr; SC_ATOMIC_ADD(rb->write, 1); + +#ifdef RINGBUFFER_MUTEX_WAIT + SCCondSignal(&rb->wait_cond); +#endif return 0; } @@ -140,6 +174,10 @@ RingBuffer16 *RingBufferMrSwInit(void) { SC_ATOMIC_INIT(rb->write); SC_ATOMIC_INIT(rb->read); +#ifdef RINGBUFFER_MUTEX_WAIT + SCMutexInit(&rb->wait_mutex, NULL); + SCCondInit(&rb->wait_cond, NULL); +#endif return rb; } @@ -148,6 +186,10 @@ void RingBufferMrSwDestroy(RingBuffer16 *rb) { SC_ATOMIC_DESTROY(rb->write); SC_ATOMIC_DESTROY(rb->read); +#ifdef RINGBUFFER_MUTEX_WAIT + SCMutexDestroy(&rb->wait_mutex); + SCCondDestroy(&rb->wait_cond); +#endif SCFree(rb); } } @@ -174,7 +216,16 @@ retry: if (rb->shutdown != 0) return NULL; +#ifdef RINGBUFFER_MUTEX_WAIT + struct timespec cond_time; + cond_time.tv_sec = time(NULL) + 1; + cond_time.tv_nsec = 0; + SCMutexLock(&rb->wait_mutex); + SCCondTimedwait(&rb->wait_cond, &rb->wait_mutex, &cond_time); + SCMutexUnlock(&rb->wait_mutex); +#else usleep(USLEEP_TIME); +#endif } /* atomically update rb->read */ @@ -190,6 +241,10 @@ retry: } while (!(SC_ATOMIC_CAS(&rb->read, readp, (readp + 1)))); SCLogDebug("ptr %p", ptr); + +#ifdef RINGBUFFER_MUTEX_WAIT + SCCondSignal(&rb->wait_cond); +#endif return ptr; } @@ -205,11 +260,24 @@ int RingBufferMrSwPut(RingBuffer16 *rb, void *ptr) { if (rb->shutdown != 0) return -1; +#ifdef RINGBUFFER_MUTEX_WAIT + struct timespec cond_time; + cond_time.tv_sec = time(NULL) + 1; + cond_time.tv_nsec = 0; + SCMutexLock(&rb->wait_mutex); + SCCondTimedwait(&rb->wait_cond, &rb->wait_mutex, &cond_time); + SCMutexUnlock(&rb->wait_mutex); +#else usleep(USLEEP_TIME); +#endif } rb->array[SC_ATOMIC_GET(rb->write)] = ptr; SC_ATOMIC_ADD(rb->write, 1); + +#ifdef RINGBUFFER_MUTEX_WAIT + SCCondSignal(&rb->wait_cond); +#endif return 0; } @@ -227,6 +295,10 @@ RingBuffer16 *RingBufferSrSwInit(void) { SC_ATOMIC_INIT(rb->write); SC_ATOMIC_INIT(rb->read); +#ifdef RINGBUFFER_MUTEX_WAIT + SCMutexInit(&rb->wait_mutex, NULL); + SCCondInit(&rb->wait_cond, NULL); +#endif return rb; } @@ -235,6 +307,11 @@ void RingBufferSrSwDestroy(RingBuffer16 *rb) { SC_ATOMIC_DESTROY(rb->write); SC_ATOMIC_DESTROY(rb->read); +#ifdef RINGBUFFER_MUTEX_WAIT + SCMutexDestroy(&rb->wait_mutex); + SCCondDestroy(&rb->wait_cond); +#endif + SCFree(rb); } } @@ -248,12 +325,24 @@ void *RingBufferSrSwGet(RingBuffer16 *rb) { if (rb->shutdown != 0) return NULL; +#ifdef RINGBUFFER_MUTEX_WAIT + struct timespec cond_time; + cond_time.tv_sec = time(NULL) + 1; + cond_time.tv_nsec = 0; + SCMutexLock(&rb->wait_mutex); + SCCondTimedwait(&rb->wait_cond, &rb->wait_mutex, &cond_time); + SCMutexUnlock(&rb->wait_mutex); +#else usleep(USLEEP_TIME); +#endif } ptr = rb->array[SC_ATOMIC_GET(rb->read)]; SC_ATOMIC_ADD(rb->read, 1); +#ifdef RINGBUFFER_MUTEX_WAIT + SCCondSignal(&rb->wait_cond); +#endif return ptr; } @@ -264,11 +353,24 @@ int RingBufferSrSwPut(RingBuffer16 *rb, void *ptr) { if (rb->shutdown != 0) return -1; +#ifdef RINGBUFFER_MUTEX_WAIT + struct timespec cond_time; + cond_time.tv_sec = time(NULL) + 1; + cond_time.tv_nsec = 0; + SCMutexLock(&rb->wait_mutex); + SCCondTimedwait(&rb->wait_cond, &rb->wait_mutex, &cond_time); + SCMutexUnlock(&rb->wait_mutex); +#else usleep(USLEEP_TIME); +#endif } rb->array[SC_ATOMIC_GET(rb->write)] = ptr; SC_ATOMIC_ADD(rb->write, 1); + +#ifdef RINGBUFFER_MUTEX_WAIT + SCCondSignal(&rb->wait_cond); +#endif return 0; } @@ -286,6 +388,10 @@ RingBuffer8 *RingBufferMrMw8Init(void) { SC_ATOMIC_INIT(rb->read); SCSpinInit(&rb->spin, 0); +#ifdef RINGBUFFER_MUTEX_WAIT + SCMutexInit(&rb->wait_mutex, NULL); + SCCondInit(&rb->wait_cond, NULL); +#endif return rb; } @@ -295,6 +401,11 @@ void RingBufferMrMw8Destroy(RingBuffer8 *rb) { SC_ATOMIC_DESTROY(rb->read); SCSpinDestroy(&rb->spin); + +#ifdef RINGBUFFER_MUTEX_WAIT + SCMutexDestroy(&rb->wait_mutex); + SCCondDestroy(&rb->wait_cond); +#endif SCFree(rb); } } @@ -320,8 +431,16 @@ retry: /* break out if the engine wants to shutdown */ if (rb->shutdown != 0) return NULL; - +#ifdef RINGBUFFER_MUTEX_WAIT + struct timespec cond_time; + cond_time.tv_sec = time(NULL) + 1; + cond_time.tv_nsec = 0; + SCMutexLock(&rb->wait_mutex); + SCCondTimedwait(&rb->wait_cond, &rb->wait_mutex, &cond_time); + SCMutexUnlock(&rb->wait_mutex); +#else usleep(USLEEP_TIME); +#endif } /* atomically update rb->read */ @@ -337,6 +456,9 @@ retry: } while (!(SC_ATOMIC_CAS(&rb->read, readp, (readp + 1)))); SCLogDebug("ptr %p", ptr); +#ifdef RINGBUFFER_MUTEX_WAIT + SCCondSignal(&rb->wait_cond); +#endif return ptr; } @@ -368,7 +490,16 @@ retry: if (rb->shutdown != 0) return -1; +#ifdef RINGBUFFER_MUTEX_WAIT + struct timespec cond_time; + cond_time.tv_sec = time(NULL) + 1; + cond_time.tv_nsec = 0; + SCMutexLock(&rb->wait_mutex); + SCCondTimedwait(&rb->wait_cond, &rb->wait_mutex, &cond_time); + SCMutexUnlock(&rb->wait_mutex); +#else usleep(USLEEP_TIME); +#endif } /* get our lock */ @@ -386,6 +517,10 @@ retry: SC_ATOMIC_ADD(rb->write, 1); SCSpinUnlock(&rb->spin); SCLogDebug("ptr %p, done", ptr); + +#ifdef RINGBUFFER_MUTEX_WAIT + SCCondSignal(&rb->wait_cond); +#endif return 0; } @@ -403,6 +538,10 @@ RingBuffer16 *RingBufferMrMwInit(void) { SC_ATOMIC_INIT(rb->read); SCSpinInit(&rb->spin, 0); +#ifdef RINGBUFFER_MUTEX_WAIT + SCMutexInit(&rb->wait_mutex, NULL); + SCCondInit(&rb->wait_cond, NULL); +#endif return rb; } @@ -412,6 +551,12 @@ void RingBufferMrMwDestroy(RingBuffer16 *rb) { SC_ATOMIC_DESTROY(rb->read); SCSpinDestroy(&rb->spin); + +#ifdef RINGBUFFER_MUTEX_WAIT + SCMutexDestroy(&rb->wait_mutex); + SCCondDestroy(&rb->wait_cond); +#endif + SCFree(rb); } } @@ -438,7 +583,16 @@ retry: if (rb->shutdown != 0) return NULL; +#ifdef RINGBUFFER_MUTEX_WAIT + struct timespec cond_time; + cond_time.tv_sec = time(NULL) + 1; + cond_time.tv_nsec = 0; + SCMutexLock(&rb->wait_mutex); + SCCondTimedwait(&rb->wait_cond, &rb->wait_mutex, &cond_time); + SCMutexUnlock(&rb->wait_mutex); +#else usleep(USLEEP_TIME); +#endif } /* atomically update rb->read */ @@ -454,6 +608,10 @@ retry: } while (!(SC_ATOMIC_CAS(&rb->read, readp, (readp + 1)))); SCLogDebug("ptr %p", ptr); + +#ifdef RINGBUFFER_MUTEX_WAIT + SCCondSignal(&rb->wait_cond); +#endif return ptr; } @@ -485,7 +643,16 @@ retry: if (rb->shutdown != 0) return -1; +#ifdef RINGBUFFER_MUTEX_WAIT + struct timespec cond_time; + cond_time.tv_sec = time(NULL) + 1; + cond_time.tv_nsec = 0; + SCMutexLock(&rb->wait_mutex); + SCCondTimedwait(&rb->wait_cond, &rb->wait_mutex, &cond_time); + SCMutexUnlock(&rb->wait_mutex); +#else usleep(USLEEP_TIME); +#endif } /* get our lock */ @@ -503,6 +670,10 @@ retry: SC_ATOMIC_ADD(rb->write, 1); SCSpinUnlock(&rb->spin); SCLogDebug("ptr %p, done", ptr); + +#ifdef RINGBUFFER_MUTEX_WAIT + SCCondSignal(&rb->wait_cond); +#endif return 0; } diff --git a/src/util-ringbuffer.h b/src/util-ringbuffer.h index 6399246c92..761788f77c 100644 --- a/src/util-ringbuffer.h +++ b/src/util-ringbuffer.h @@ -26,6 +26,11 @@ #ifndef __UTIL_RINGBUFFER_H__ #include "util-atomic.h" +#include "threads.h" + +/** When the ringbuffer is full we have two options, either we spin & sleep + * or we use a pthread condition to wait. */ +#define RINGBUFFER_MUTEX_WAIT /** \brief ring buffer api * @@ -38,6 +43,10 @@ typedef struct RingBuffer8_ { SC_ATOMIC_DECLARE(unsigned char, write); /**< idx where we put data */ SC_ATOMIC_DECLARE(unsigned char, read); /**< idx where we read data */ uint8_t shutdown; +#ifdef RINGBUFFER_MUTEX_WAIT + SCCondT wait_cond; + SCMutex wait_mutex; +#endif /* RINGBUFFER_MUTEX_WAIT */ SCSpinlock spin; /**< lock protecting writes for multi writer mode*/ void *array[RING_BUFFER_8_SIZE]; } RingBuffer8; @@ -47,6 +56,10 @@ typedef struct RingBuffer16_ { SC_ATOMIC_DECLARE(unsigned short, write); /**< idx where we put data */ SC_ATOMIC_DECLARE(unsigned short, read); /**< idx where we read data */ uint8_t shutdown; +#ifdef RINGBUFFER_MUTEX_WAIT + SCCondT wait_cond; + SCMutex wait_mutex; +#endif /* RINGBUFFER_MUTEX_WAIT */ SCSpinlock spin; /**< lock protecting writes for multi writer mode*/ void *array[RING_BUFFER_16_SIZE]; } RingBuffer16;