Introduce atomic operations API that supports GCC's atomic operations and a fallback using (spin)locks. Convert ringbuffer api to use the new atomic api.

remotes/origin/master-1.0.x
Victor Julien 16 years ago
parent daea85e491
commit 0140a14a15

@ -119,6 +119,7 @@ detect-urilen.c detect-urilen.h \
detect-detection-filter.c detect-detection-filter.h \ detect-detection-filter.c detect-detection-filter.h \
detect-http-client-body.c detect-http-client-body.h \ detect-http-client-body.c detect-http-client-body.h \
detect-asn1.c detect-asn1.h \ detect-asn1.c detect-asn1.h \
util-atomic.h \
util-print.c util-print.h \ util-print.c util-print.h \
util-fmemopen.c util-fmemopen.h \ util-fmemopen.c util-fmemopen.h \
util-cpu.c util-cpu.h \ util-cpu.c util-cpu.h \

@ -0,0 +1,212 @@
/* Copyright (C) 2007-2010 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*/
/**
* \file
*
* \author Victor Julien <victor@inliniac.net>
*
* API for atomic operations. Uses atomic instructions (GCC only at this time)
* where available, falls back to (spin)locked* operations otherwise.
*
* To prevent developers from accidentally working with the atomic variables
* directly instead of through the proper macro's, a marco trick is performed
* that exposes different variable names than the developer uses. So if the dev
* uses "somevar", internally "somevar_sc_atomic__" is used.
*
* Where available, we use __sync_fetch_and_add and
* __sync_bool_compare_and_swap. If those are unavailable, the API
* transparently created a matching (spin)lock for each atomic variable. The
* lock will be named "somevar_sc_lock__"
*
* (*) where spinlocks are unavailable, the threading api falls back to mutex
*/
#ifndef __UTIL_ATOMIC_H__
#define __UTIL_ATOMIC_H__
/* test if we have atomic operations support */
#ifndef __GCC_HAVE_SYNC_COMPARE_AND_SWAP_2
/**
* \brief wrapper to declare an atomic variable including a (spin) lock
* to protect it.
*
* \warning Variable and lock are _not_ initialized.
*/
#define SC_ATOMIC_DECLARE(type, name) \
type name ## _sc_atomic__; \
SCSpinlock name ## _sc_lock__
/**
* \brief wrapper to declare an atomic variable including a (spin) lock
* to protect it and initialize them.
*/
#define SC_ATOMIC_DECL_AND_INIT(type, name) \
type name ## _sc_atomic__ = 0; \
SCSpinlock name ## _sc_lock__; \
SCSpinInit(&(name ## _sc_lock__), 0) \
}
/**
* \brief Initialize the previously declared atomic variable and it's
* lock.
*/
#define SC_ATOMIC_INIT(name) \
SCSpinInit(&(name ## _sc_lock__), 0)
/**
* \brief Destroy the lock used to protect this variable
*/
#define SC_ATOMIC_DESTROY(name) \
SCSpinDestroy(&(name ## _sc_lock__))
/**
* \brief add a value to our atomic variable
*
* \param name the atomic variable
* \param val the value to add to the variable
*/
#define SC_ATOMIC_ADD(name, val) \
do { \
SCSpinLock(&(name ## _sc_lock__)); \
(name ## _sc_atomic__) += (val); \
SCSpinUnlock(&(name ## _sc_lock__)); \
} while(0)
/**
* \brief Get the value from the atomic variable.
*
* \retval var value
*/
#define SC_ATOMIC_GET(name) ({ \
typeof(name ## _sc_atomic__) var; \
do { \
SCSpinLock(&(name ## _sc_lock__)); \
var = (name ## _sc_atomic__); \
SCSpinUnlock(&(name ## _sc_lock__)); \
} while (0); \
var; \
})
/**
* \brief atomic Compare and Switch
*
* \warning "name" is passed to us as "&var"
*/
#define SC_ATOMIC_CAS(name, cmpval, newval) ({ \
char r = 0; \
do { \
SCSpinLock((name ## _sc_lock__)); \
if (*(name ## _sc_atomic__) == (cmpval)) { \
*(name ## _sc_atomic__) = (newval); \
r = 1; \
} \
SCSpinUnlock((name ## _sc_lock__)); \
} while(0); \
r; \
})
#else /* we do have support for CAS */
/**
* \brief wrapper for OS/compiler specific atomic compare and swap (CAS)
* function.
*
* \param addr Address of the variable to CAS
* \param tv Test value to compare the value at address against
* \param nv New value to set the variable at addr to
*
* \retval 0 CAS failed
* \retval 1 CAS succeeded
*/
#define SCAtomicCompareAndSwap(addr, tv, nv) \
__sync_bool_compare_and_swap((addr), (tv), (nv))
/**
* \brief wrapper for OS/compiler specific atomic fetch and add
* function.
*
* \param addr Address of the variable to add to
* \param value Value to add to the variable at addr
*/
#define SCAtomicFetchAndAdd(addr, value) \
__sync_fetch_and_add((addr), (value))
/**
* \brief wrapper for declaring atomic variables.
*
* \warning Only char, short, int, long, long long and their unsigned
* versions are supported.
*
* \param type Type of the variable (char, short, int, long, long long)
* \param name Name of the variable.
*
* We just declare the variable here as we rely on atomic operations
* to modify it, so no need for locks.
*
* \warning variable is not initialized
*/
#define SC_ATOMIC_DECLARE(type, name) \
type name ## _sc_atomic__
/**
* \brief wrapper for declaring an atomic variable and initializing it.
**/
#define SC_ATOMIC_DECL_AND_INIT(type, name) \
type name ## _sc_atomic__ = 0
/**
* \brief wrapper for initializing an atomic variable.
**/
#define SC_ATOMIC_INIT(name) \
(name ## _sc_atomic__) = 0
/**
* \brief No-op.
*/
#define SC_ATOMIC_DESTROY(name)
/**
* \brief add a value to our atomic variable
*
* \param name the atomic variable
* \param val the value to add to the variable
*/
#define SC_ATOMIC_ADD(name, val) \
SCAtomicFetchAndAdd(&(name ## _sc_atomic__), (val));
/**
* \brief atomic Compare and Switch
*
* \warning "name" is passed to us as "&var"
*/
#define SC_ATOMIC_CAS(name, cmpval, newval) \
SCAtomicCompareAndSwap((name ## _sc_atomic__), cmpval, newval)
/**
* \brief Get the value from the atomic variable.
*
* \retval var value
*/
#define SC_ATOMIC_GET(name) \
(name ## _sc_atomic__)
#endif /* !no atomic operations */
#endif /* __UTIL_ATOMIC_H__ */

@ -1,6 +1,42 @@
/* Copyright (C) 2007-2010 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*/
/**
* \file
*
* \author Victor Julien <victor@inliniac.net>
*
* Ringbuffer implementation that is lockless for the most part IF atomic
* operations are available.
*
* Two sizes are implemented currently: 256 and 65536. Those sizes are chosen
* for simplicity when working with the read and write indexes. Both can just
* wrap around.
*
* Implemented are:
* Single reader, single writer (lockless)
* Multi reader, single writer (lockless)
* Multi reader, multi writer (partly locked)
*/
#include "suricata-common.h" #include "suricata-common.h"
#include "suricata.h" #include "suricata.h"
#include "util-ringbuffer.h" #include "util-ringbuffer.h"
#include "util-atomic.h"
#define USLEEP_TIME 5 #define USLEEP_TIME 5
@ -13,11 +49,18 @@ RingBufferMrSw8 *RingBufferMrSw8Init(void) {
} }
memset(rb, 0x00, sizeof(RingBufferMrSw8)); memset(rb, 0x00, sizeof(RingBufferMrSw8));
SC_ATOMIC_INIT(rb->write);
SC_ATOMIC_INIT(rb->read);
return rb; return rb;
} }
void RingBufferMrSw8Destroy(RingBufferMrSw8 *rb) { void RingBufferMrSw8Destroy(RingBufferMrSw8 *rb) {
if (rb != NULL) { if (rb != NULL) {
SC_ATOMIC_DESTROY(rb->write);
SC_ATOMIC_DESTROY(rb->read);
SCFree(rb); SCFree(rb);
} }
} }
@ -31,7 +74,7 @@ void RingBufferMrSw8Destroy(RingBufferMrSw8 *rb) {
*/ */
void *RingBufferMrSw8Get(RingBufferMrSw8 *rb) { void *RingBufferMrSw8Get(RingBufferMrSw8 *rb) {
void *ptr; void *ptr;
/** local pointer for data races. If __sync_bool_compare_and_swap (CAS) /** local pointer for data races. If SCAtomicCompareAndSwap (CAS)
* fails we increase our local array idx to try the next array member * fails we increase our local array idx to try the next array member
* until we succeed. Or when the buffer is empty again we jump back * until we succeed. Or when the buffer is empty again we jump back
* to the waiting loop. */ * to the waiting loop. */
@ -39,7 +82,7 @@ void *RingBufferMrSw8Get(RingBufferMrSw8 *rb) {
/* buffer is empty, wait... */ /* buffer is empty, wait... */
retry: retry:
while (rb->read == rb->write) { while (SC_ATOMIC_GET(rb->read) == SC_ATOMIC_GET(rb->write)) {
/* break out if the engine wants to shutdown */ /* break out if the engine wants to shutdown */
if (rb->shutdown != 0) if (rb->shutdown != 0)
return NULL; return NULL;
@ -48,16 +91,16 @@ retry:
} }
/* atomically update rb->read */ /* atomically update rb->read */
readp = rb->read - 1; readp = SC_ATOMIC_GET(rb->read) - 1;
do { do {
/* with multiple readers we can get in the situation that we exitted /* with multiple readers we can get in the situation that we exitted
* from the wait loop but the rb is empty again once we get here. */ * from the wait loop but the rb is empty again once we get here. */
if (rb->read == rb->write) if (SC_ATOMIC_GET(rb->read) == SC_ATOMIC_GET(rb->write))
goto retry; goto retry;
readp++; readp++;
ptr = rb->array[readp]; ptr = rb->array[readp];
} while (!(__sync_bool_compare_and_swap(&rb->read, readp, (readp + 1)))); } while (!(SC_ATOMIC_CAS(&rb->read, readp, (readp + 1))));
SCLogDebug("ptr %p", ptr); SCLogDebug("ptr %p", ptr);
return ptr; return ptr;
@ -70,7 +113,7 @@ int RingBufferMrSw8Put(RingBufferMrSw8 *rb, void *ptr) {
SCLogDebug("ptr %p", ptr); SCLogDebug("ptr %p", ptr);
/* buffer is full, wait... */ /* buffer is full, wait... */
while ((rb->write + 1) == rb->read) { while ((SC_ATOMIC_GET(rb->write) + 1) == SC_ATOMIC_GET(rb->read)) {
/* break out if the engine wants to shutdown */ /* break out if the engine wants to shutdown */
if (rb->shutdown != 0) if (rb->shutdown != 0)
return -1; return -1;
@ -78,11 +121,12 @@ int RingBufferMrSw8Put(RingBufferMrSw8 *rb, void *ptr) {
usleep(USLEEP_TIME); usleep(USLEEP_TIME);
} }
rb->array[rb->write] = ptr; rb->array[SC_ATOMIC_GET(rb->write)] = ptr;
__sync_fetch_and_add(&rb->write, 1); SC_ATOMIC_ADD(rb->write, 1);
return 0; return 0;
} }
/* Multi Reader, Single Writer */ /* Multi Reader, Single Writer */
RingBufferMrSw *RingBufferMrSwInit(void) { RingBufferMrSw *RingBufferMrSwInit(void) {
@ -92,11 +136,18 @@ RingBufferMrSw *RingBufferMrSwInit(void) {
} }
memset(rb, 0x00, sizeof(RingBufferMrSw)); memset(rb, 0x00, sizeof(RingBufferMrSw));
SC_ATOMIC_INIT(rb->write);
SC_ATOMIC_INIT(rb->read);
return rb; return rb;
} }
void RingBufferMrSwDestroy(RingBufferMrSw *rb) { void RingBufferMrSwDestroy(RingBufferMrSw *rb) {
if (rb != NULL) { if (rb != NULL) {
SC_ATOMIC_DESTROY(rb->write);
SC_ATOMIC_DESTROY(rb->read);
SCFree(rb); SCFree(rb);
} }
} }
@ -110,7 +161,7 @@ void RingBufferMrSwDestroy(RingBufferMrSw *rb) {
*/ */
void *RingBufferMrSwGet(RingBufferMrSw *rb) { void *RingBufferMrSwGet(RingBufferMrSw *rb) {
void *ptr; void *ptr;
/** local pointer for data races. If __sync_bool_compare_and_swap (CAS) /** local pointer for data races. If SCAtomicCompareAndSwap (CAS)
* fails we increase our local array idx to try the next array member * fails we increase our local array idx to try the next array member
* until we succeed. Or when the buffer is empty again we jump back * until we succeed. Or when the buffer is empty again we jump back
* to the waiting loop. */ * to the waiting loop. */
@ -118,7 +169,7 @@ void *RingBufferMrSwGet(RingBufferMrSw *rb) {
/* buffer is empty, wait... */ /* buffer is empty, wait... */
retry: retry:
while (rb->read == rb->write) { while (SC_ATOMIC_GET(rb->read) == SC_ATOMIC_GET(rb->write)) {
/* break out if the engine wants to shutdown */ /* break out if the engine wants to shutdown */
if (rb->shutdown != 0) if (rb->shutdown != 0)
return NULL; return NULL;
@ -127,16 +178,16 @@ retry:
} }
/* atomically update rb->read */ /* atomically update rb->read */
readp = rb->read - 1; readp = SC_ATOMIC_GET(rb->read) - 1;
do { do {
/* with multiple readers we can get in the situation that we exitted /* with multiple readers we can get in the situation that we exitted
* from the wait loop but the rb is empty again once we get here. */ * from the wait loop but the rb is empty again once we get here. */
if (rb->read == rb->write) if (SC_ATOMIC_GET(rb->read) == SC_ATOMIC_GET(rb->write))
goto retry; goto retry;
readp++; readp++;
ptr = rb->array[readp]; ptr = rb->array[readp];
} while (!(__sync_bool_compare_and_swap(&rb->read, readp, (readp + 1)))); } while (!(SC_ATOMIC_CAS(&rb->read, readp, (readp + 1))));
SCLogDebug("ptr %p", ptr); SCLogDebug("ptr %p", ptr);
return ptr; return ptr;
@ -149,7 +200,7 @@ int RingBufferMrSwPut(RingBufferMrSw *rb, void *ptr) {
SCLogDebug("ptr %p", ptr); SCLogDebug("ptr %p", ptr);
/* buffer is full, wait... */ /* buffer is full, wait... */
while ((rb->write + 1) == rb->read) { while ((SC_ATOMIC_GET(rb->write) + 1) == SC_ATOMIC_GET(rb->read)) {
/* break out if the engine wants to shutdown */ /* break out if the engine wants to shutdown */
if (rb->shutdown != 0) if (rb->shutdown != 0)
return -1; return -1;
@ -157,8 +208,8 @@ int RingBufferMrSwPut(RingBufferMrSw *rb, void *ptr) {
usleep(USLEEP_TIME); usleep(USLEEP_TIME);
} }
rb->array[rb->write] = ptr; rb->array[SC_ATOMIC_GET(rb->write)] = ptr;
__sync_fetch_and_add(&rb->write, 1); SC_ATOMIC_ADD(rb->write, 1);
return 0; return 0;
} }
@ -172,11 +223,18 @@ RingBufferSrSw *RingBufferSrSwInit(void) {
} }
memset(rb, 0x00, sizeof(RingBufferSrSw)); memset(rb, 0x00, sizeof(RingBufferSrSw));
SC_ATOMIC_INIT(rb->write);
SC_ATOMIC_INIT(rb->read);
return rb; return rb;
} }
void RingBufferSrSwDestroy(RingBufferSrSw *rb) { void RingBufferSrSwDestroy(RingBufferSrSw *rb) {
if (rb != NULL) { if (rb != NULL) {
SC_ATOMIC_DESTROY(rb->write);
SC_ATOMIC_DESTROY(rb->read);
SCFree(rb); SCFree(rb);
} }
} }
@ -185,7 +243,7 @@ void *RingBufferSrSwGet(RingBufferSrSw *rb) {
void *ptr = NULL; void *ptr = NULL;
/* buffer is empty, wait... */ /* buffer is empty, wait... */
while (rb->read == rb->write) { while (SC_ATOMIC_GET(rb->read) == SC_ATOMIC_GET(rb->write)) {
/* break out if the engine wants to shutdown */ /* break out if the engine wants to shutdown */
if (rb->shutdown != 0) if (rb->shutdown != 0)
return NULL; return NULL;
@ -193,15 +251,15 @@ void *RingBufferSrSwGet(RingBufferSrSw *rb) {
usleep(USLEEP_TIME); usleep(USLEEP_TIME);
} }
ptr = rb->array[rb->read]; ptr = rb->array[SC_ATOMIC_GET(rb->read)];
__sync_fetch_and_add(&rb->read, 1); SC_ATOMIC_ADD(rb->read, 1);
return ptr; return ptr;
} }
int RingBufferSrSwPut(RingBufferSrSw *rb, void *ptr) { int RingBufferSrSwPut(RingBufferSrSw *rb, void *ptr) {
/* buffer is full, wait... */ /* buffer is full, wait... */
while ((rb->write + 1) == rb->read) { while ((SC_ATOMIC_GET(rb->write) + 1) == SC_ATOMIC_GET(rb->read)) {
/* break out if the engine wants to shutdown */ /* break out if the engine wants to shutdown */
if (rb->shutdown != 0) if (rb->shutdown != 0)
return -1; return -1;
@ -209,8 +267,8 @@ int RingBufferSrSwPut(RingBufferSrSw *rb, void *ptr) {
usleep(USLEEP_TIME); usleep(USLEEP_TIME);
} }
rb->array[rb->write] = ptr; rb->array[SC_ATOMIC_GET(rb->write)] = ptr;
__sync_fetch_and_add(&rb->write, 1); SC_ATOMIC_ADD(rb->write, 1);
return 0; return 0;
} }
@ -224,12 +282,18 @@ RingBufferMrMw8 *RingBufferMrMw8Init(void) {
memset(rb, 0x00, sizeof(RingBufferMrMw8)); memset(rb, 0x00, sizeof(RingBufferMrMw8));
SC_ATOMIC_INIT(rb->write);
SC_ATOMIC_INIT(rb->read);
SCSpinInit(&rb->spin, 0); SCSpinInit(&rb->spin, 0);
return rb; return rb;
} }
void RingBufferMrMw8Destroy(RingBufferMrMw8 *rb) { void RingBufferMrMw8Destroy(RingBufferMrMw8 *rb) {
if (rb != NULL) { if (rb != NULL) {
SC_ATOMIC_DESTROY(rb->write);
SC_ATOMIC_DESTROY(rb->read);
SCSpinDestroy(&rb->spin); SCSpinDestroy(&rb->spin);
SCFree(rb); SCFree(rb);
} }
@ -244,7 +308,7 @@ void RingBufferMrMw8Destroy(RingBufferMrMw8 *rb) {
*/ */
void *RingBufferMrMw8Get(RingBufferMrMw8 *rb) { void *RingBufferMrMw8Get(RingBufferMrMw8 *rb) {
void *ptr; void *ptr;
/** local pointer for data races. If __sync_bool_compare_and_swap (CAS) /** local pointer for data races. If SCAtomicCompareAndSwap (CAS)
* fails we increase our local array idx to try the next array member * fails we increase our local array idx to try the next array member
* until we succeed. Or when the buffer is empty again we jump back * until we succeed. Or when the buffer is empty again we jump back
* to the waiting loop. */ * to the waiting loop. */
@ -252,7 +316,7 @@ void *RingBufferMrMw8Get(RingBufferMrMw8 *rb) {
/* buffer is empty, wait... */ /* buffer is empty, wait... */
retry: retry:
while (rb->read == rb->write) { while (SC_ATOMIC_GET(rb->read) == SC_ATOMIC_GET(rb->write)) {
/* break out if the engine wants to shutdown */ /* break out if the engine wants to shutdown */
if (rb->shutdown != 0) if (rb->shutdown != 0)
return NULL; return NULL;
@ -261,16 +325,16 @@ retry:
} }
/* atomically update rb->read */ /* atomically update rb->read */
readp = rb->read - 1; readp = SC_ATOMIC_GET(rb->read) - 1;
do { do {
/* with multiple readers we can get in the situation that we exitted /* with multiple readers we can get in the situation that we exitted
* from the wait loop but the rb is empty again once we get here. */ * from the wait loop but the rb is empty again once we get here. */
if (rb->read == rb->write) if (SC_ATOMIC_GET(rb->read) == SC_ATOMIC_GET(rb->write))
goto retry; goto retry;
readp++; readp++;
ptr = rb->array[readp]; ptr = rb->array[readp];
} while (!(__sync_bool_compare_and_swap(&rb->read, readp, (readp + 1)))); } while (!(SC_ATOMIC_CAS(&rb->read, readp, (readp + 1))));
SCLogDebug("ptr %p", ptr); SCLogDebug("ptr %p", ptr);
return ptr; return ptr;
@ -299,7 +363,7 @@ int RingBufferMrMw8Put(RingBufferMrMw8 *rb, void *ptr) {
/* buffer is full, wait... */ /* buffer is full, wait... */
retry: retry:
while ((rb->write + 1) == rb->read) { while ((SC_ATOMIC_GET(rb->write) + 1) == SC_ATOMIC_GET(rb->read)) {
/* break out if the engine wants to shutdown */ /* break out if the engine wants to shutdown */
if (rb->shutdown != 0) if (rb->shutdown != 0)
return -1; return -1;
@ -310,16 +374,16 @@ retry:
/* get our lock */ /* get our lock */
SCSpinLock(&rb->spin); SCSpinLock(&rb->spin);
/* if while we got our lock the buffer changed, we need to retry */ /* if while we got our lock the buffer changed, we need to retry */
if ((rb->write + 1) == rb->read) { if ((SC_ATOMIC_GET(rb->write) + 1) == SC_ATOMIC_GET(rb->read)) {
SCSpinUnlock(&rb->spin); SCSpinUnlock(&rb->spin);
goto retry; goto retry;
} }
SCLogDebug("rb->write %u, ptr %p", rb->write, ptr); SCLogDebug("rb->write %u, ptr %p", SC_ATOMIC_GET(rb->write), ptr);
/* update the ring buffer */ /* update the ring buffer */
rb->array[rb->write] = ptr; rb->array[SC_ATOMIC_GET(rb->write)] = ptr;
__sync_fetch_and_add(&rb->write, 1); SC_ATOMIC_ADD(rb->write, 1);
SCSpinUnlock(&rb->spin); SCSpinUnlock(&rb->spin);
SCLogDebug("ptr %p, done", ptr); SCLogDebug("ptr %p, done", ptr);
return 0; return 0;
@ -335,12 +399,18 @@ RingBufferMrMw *RingBufferMrMwInit(void) {
memset(rb, 0x00, sizeof(RingBufferMrMw)); memset(rb, 0x00, sizeof(RingBufferMrMw));
SC_ATOMIC_INIT(rb->write);
SC_ATOMIC_INIT(rb->read);
SCSpinInit(&rb->spin, 0); SCSpinInit(&rb->spin, 0);
return rb; return rb;
} }
void RingBufferMrMwDestroy(RingBufferMrMw *rb) { void RingBufferMrMwDestroy(RingBufferMrMw *rb) {
if (rb != NULL) { if (rb != NULL) {
SC_ATOMIC_DESTROY(rb->write);
SC_ATOMIC_DESTROY(rb->read);
SCSpinDestroy(&rb->spin); SCSpinDestroy(&rb->spin);
SCFree(rb); SCFree(rb);
} }
@ -355,7 +425,7 @@ void RingBufferMrMwDestroy(RingBufferMrMw *rb) {
*/ */
void *RingBufferMrMwGet(RingBufferMrMw *rb) { void *RingBufferMrMwGet(RingBufferMrMw *rb) {
void *ptr; void *ptr;
/** local pointer for data races. If __sync_bool_compare_and_swap (CAS) /** local pointer for data races. If SCAtomicCompareAndSwap (CAS)
* fails we increase our local array idx to try the next array member * fails we increase our local array idx to try the next array member
* until we succeed. Or when the buffer is empty again we jump back * until we succeed. Or when the buffer is empty again we jump back
* to the waiting loop. */ * to the waiting loop. */
@ -363,7 +433,7 @@ void *RingBufferMrMwGet(RingBufferMrMw *rb) {
/* buffer is empty, wait... */ /* buffer is empty, wait... */
retry: retry:
while (rb->read == rb->write) { while (SC_ATOMIC_GET(rb->read) == SC_ATOMIC_GET(rb->write)) {
/* break out if the engine wants to shutdown */ /* break out if the engine wants to shutdown */
if (rb->shutdown != 0) if (rb->shutdown != 0)
return NULL; return NULL;
@ -372,16 +442,16 @@ retry:
} }
/* atomically update rb->read */ /* atomically update rb->read */
readp = rb->read - 1; readp = SC_ATOMIC_GET(rb->read) - 1;
do { do {
/* with multiple readers we can get in the situation that we exitted /* with multiple readers we can get in the situation that we exitted
* from the wait loop but the rb is empty again once we get here. */ * from the wait loop but the rb is empty again once we get here. */
if (rb->read == rb->write) if (SC_ATOMIC_GET(rb->read) == SC_ATOMIC_GET(rb->write))
goto retry; goto retry;
readp++; readp++;
ptr = rb->array[readp]; ptr = rb->array[readp];
} while (!(__sync_bool_compare_and_swap(&rb->read, readp, (readp + 1)))); } while (!(SC_ATOMIC_CAS(&rb->read, readp, (readp + 1))));
SCLogDebug("ptr %p", ptr); SCLogDebug("ptr %p", ptr);
return ptr; return ptr;
@ -410,7 +480,7 @@ int RingBufferMrMwPut(RingBufferMrMw *rb, void *ptr) {
/* buffer is full, wait... */ /* buffer is full, wait... */
retry: retry:
while ((rb->write + 1) == rb->read) { while ((SC_ATOMIC_GET(rb->write) + 1) == SC_ATOMIC_GET(rb->read)) {
/* break out if the engine wants to shutdown */ /* break out if the engine wants to shutdown */
if (rb->shutdown != 0) if (rb->shutdown != 0)
return -1; return -1;
@ -421,16 +491,16 @@ retry:
/* get our lock */ /* get our lock */
SCSpinLock(&rb->spin); SCSpinLock(&rb->spin);
/* if while we got our lock the buffer changed, we need to retry */ /* if while we got our lock the buffer changed, we need to retry */
if ((rb->write + 1) == rb->read) { if ((SC_ATOMIC_GET(rb->write) + 1) == SC_ATOMIC_GET(rb->read)) {
SCSpinUnlock(&rb->spin); SCSpinUnlock(&rb->spin);
goto retry; goto retry;
} }
SCLogDebug("rb->write %u, ptr %p", rb->write, ptr); SCLogDebug("rb->write %u, ptr %p", SC_ATOMIC_GET(rb->write), ptr);
/* update the ring buffer */ /* update the ring buffer */
rb->array[rb->write] = ptr; rb->array[SC_ATOMIC_GET(rb->write)] = ptr;
__sync_fetch_and_add(&rb->write, 1); SC_ATOMIC_ADD(rb->write, 1);
SCSpinUnlock(&rb->spin); SCSpinUnlock(&rb->spin);
SCLogDebug("ptr %p, done", ptr); SCLogDebug("ptr %p, done", ptr);
return 0; return 0;

@ -1,5 +1,32 @@
/* Copyright (C) 2007-2010 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*/
/**
* \file
*
* \author Victor Julien <victor@inliniac.net>
*
* See the .c file for a full explanation.
*/
#ifndef __UTIL_RINGBUFFER_H__ #ifndef __UTIL_RINGBUFFER_H__
#include "util-atomic.h"
/** \brief ring buffer api /** \brief ring buffer api
* *
* Ring buffer api for a single writer and a single reader. It uses a * Ring buffer api for a single writer and a single reader. It uses a
@ -12,8 +39,8 @@
* 256 items so we can use unsigned char's that just * 256 items so we can use unsigned char's that just
* wrap around */ * wrap around */
typedef struct RingBufferMrSw8_ { typedef struct RingBufferMrSw8_ {
unsigned char write; /**< idx where we put data */ SC_ATOMIC_DECLARE(unsigned char, write); /**< idx where we put data */
unsigned char read; /**< idx where we read data */ SC_ATOMIC_DECLARE(unsigned char, read); /**< idx where we read data */
uint8_t shutdown; uint8_t shutdown;
void *array[RING_BUFFER_MRSW_8_SIZE]; void *array[RING_BUFFER_MRSW_8_SIZE];
} RingBufferMrSw8; } RingBufferMrSw8;
@ -29,8 +56,8 @@ void RingBufferMrSw8Destroy(RingBufferMrSw8 *);
* 65536 items so we can use unsigned shorts that just * 65536 items so we can use unsigned shorts that just
* wrap around */ * wrap around */
typedef struct RingBufferMrSw_ { typedef struct RingBufferMrSw_ {
unsigned short write; /**< idx where we put data */ SC_ATOMIC_DECLARE(unsigned short, write); /**< idx where we put data */
unsigned short read; /**< idx where we read data */ SC_ATOMIC_DECLARE(unsigned short, read); /**< idx where we read data */
uint8_t shutdown; uint8_t shutdown;
void *array[RING_BUFFER_MRSW_SIZE]; void *array[RING_BUFFER_MRSW_SIZE];
} RingBufferMrSw; } RingBufferMrSw;
@ -46,8 +73,8 @@ void RingBufferMrSwDestroy(RingBufferMrSw *);
* 65536 items so we can use unsigned shorts that just * 65536 items so we can use unsigned shorts that just
* wrap around */ * wrap around */
typedef struct RingBufferSrSw_ { typedef struct RingBufferSrSw_ {
unsigned short write; /**< idx where we put data */ SC_ATOMIC_DECLARE(unsigned short, write); /**< idx where we put data */
unsigned short read; /**< idx where we read data */ SC_ATOMIC_DECLARE(unsigned short, read); /**< idx where we read data */
uint8_t shutdown; uint8_t shutdown;
void *array[RING_BUFFER_SRSW_SIZE]; void *array[RING_BUFFER_SRSW_SIZE];
} RingBufferSrSw; } RingBufferSrSw;
@ -63,8 +90,8 @@ void RingBufferSrSwDestroy(RingBufferSrSw *);
* 256 items so we can use unsigned char's that just * 256 items so we can use unsigned char's that just
* wrap around */ * wrap around */
typedef struct RingBufferMrMw8_ { typedef struct RingBufferMrMw8_ {
unsigned char write; /**< idx where we put data */ SC_ATOMIC_DECLARE(unsigned char, write); /**< idx where we put data */
unsigned char read; /**< idx where we read data */ SC_ATOMIC_DECLARE(unsigned char, read); /**< idx where we read data */
uint8_t shutdown; uint8_t shutdown;
SCSpinlock spin; /**< lock protecting writes */ SCSpinlock spin; /**< lock protecting writes */
void *array[RING_BUFFER_MRMW_8_SIZE]; void *array[RING_BUFFER_MRMW_8_SIZE];
@ -81,8 +108,8 @@ void RingBufferMrMw8Destroy(RingBufferMrMw8 *);
* 65536 items so we can use unsigned char's that just * 65536 items so we can use unsigned char's that just
* wrap around */ * wrap around */
typedef struct RingBufferMrMw_ { typedef struct RingBufferMrMw_ {
unsigned short write; /**< idx where we put data */ SC_ATOMIC_DECLARE(unsigned short, write); /**< idx where we put data */
unsigned short read; /**< idx where we read data */ SC_ATOMIC_DECLARE(unsigned short, read); /**< idx where we read data */
uint8_t shutdown; uint8_t shutdown;
SCSpinlock spin; /**< lock protecting writes */ SCSpinlock spin; /**< lock protecting writes */
void *array[RING_BUFFER_MRMW_SIZE]; void *array[RING_BUFFER_MRMW_SIZE];

Loading…
Cancel
Save