Split ringbuffer queue handler into multiple, for mrsw, srsw, srmw modes.

remotes/origin/master-1.0.x
Victor Julien 16 years ago
parent 1ad289dfff
commit c7a744c937

@ -2282,7 +2282,7 @@ int RunModeFilePcapAuto(DetectEngineCtx *de_ctx, char *file) {
TimeModeSetOffline();
/* create the threads */
ThreadVars *tv_receivepcap = TmThreadCreatePacketHandler("ReceivePcapFile","packetpool","packetpool","pickup-queue","ringbuffer","1slot");
ThreadVars *tv_receivepcap = TmThreadCreatePacketHandler("ReceivePcapFile","packetpool","packetpool","pickup-queue","ringbuffer_srsw","1slot");
if (tv_receivepcap == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -2305,7 +2305,7 @@ int RunModeFilePcapAuto(DetectEngineCtx *de_ctx, char *file) {
exit(EXIT_FAILURE);
}
ThreadVars *tv_decode1 = TmThreadCreatePacketHandler("Decode & Stream","pickup-queue","ringbuffer","stream-queue1","ringbuffer","varslot");
ThreadVars *tv_decode1 = TmThreadCreatePacketHandler("Decode & Stream","pickup-queue","ringbuffer_srsw","stream-queue1","ringbuffer_mrsw","varslot");
if (tv_decode1 == NULL) {
printf("ERROR: TmThreadsCreate failed for Decode1\n");
exit(EXIT_FAILURE);
@ -2354,7 +2354,7 @@ int RunModeFilePcapAuto(DetectEngineCtx *de_ctx, char *file) {
char *thread_name = SCStrdup(tname);
SCLogDebug("Assigning %s affinity to cpu %u", thread_name, cpu);
ThreadVars *tv_detect_ncpu = TmThreadCreatePacketHandler(thread_name,"stream-queue1","ringbuffer","alert-queue1","ringbuffer","1slot");
ThreadVars *tv_detect_ncpu = TmThreadCreatePacketHandler(thread_name,"stream-queue1","ringbuffer_mrsw","alert-queue1","ringbuffer_srmw","1slot");
if (tv_detect_ncpu == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -2398,7 +2398,7 @@ int RunModeFilePcapAuto(DetectEngineCtx *de_ctx, char *file) {
}
ThreadVars *tv_outputs = TmThreadCreatePacketHandler("Outputs",
"alert-queue1", "ringbuffer", "packetpool", "packetpool", "varslot");
"alert-queue1", "ringbuffer_srmw", "packetpool", "packetpool", "varslot");
SetupOutputs(tv_outputs);
if (threading_set_cpu_affinity) {

@ -29,7 +29,9 @@ enum {
TMQH_NFQ,
TMQH_PACKETPOOL,
TMQH_FLOW,
TMQH_RINGBUFFER,
TMQH_RINGBUFFER_MRSW,
TMQH_RINGBUFFER_SRSW,
TMQH_RINGBUFFER_SRMW,
TMQH_SIZE,
};

@ -35,29 +35,56 @@
static RingBuffer8 *ringbuffers[256];
Packet *TmqhInputRingBuffer(ThreadVars *t);
void TmqhOutputRingBuffer(ThreadVars *t, Packet *p);
Packet *TmqhInputRingBufferMrSw(ThreadVars *t);
void TmqhOutputRingBufferMrSw(ThreadVars *t, Packet *p);
Packet *TmqhInputRingBufferSrSw(ThreadVars *t);
void TmqhOutputRingBufferSrSw(ThreadVars *t, Packet *p);
Packet *TmqhInputRingBufferSrMw(ThreadVars *t);
void TmqhOutputRingBufferSrMw(ThreadVars *t, Packet *p);
void TmqhInputRingBufferShutdownHandler(ThreadVars *);
void TmqhRingBufferRegister (void) {
tmqh_table[TMQH_RINGBUFFER].name = "ringbuffer";
tmqh_table[TMQH_RINGBUFFER].InHandler = TmqhInputRingBuffer;
tmqh_table[TMQH_RINGBUFFER].InShutdownHandler = TmqhInputRingBufferShutdownHandler;
tmqh_table[TMQH_RINGBUFFER].OutHandler = TmqhOutputRingBuffer;
tmqh_table[TMQH_RINGBUFFER_MRSW].name = "ringbuffer_mrsw";
tmqh_table[TMQH_RINGBUFFER_MRSW].InHandler = TmqhInputRingBufferMrSw;
tmqh_table[TMQH_RINGBUFFER_MRSW].InShutdownHandler = TmqhInputRingBufferShutdownHandler;
tmqh_table[TMQH_RINGBUFFER_MRSW].OutHandler = TmqhOutputRingBufferMrSw;
tmqh_table[TMQH_RINGBUFFER_SRSW].name = "ringbuffer_srsw";
tmqh_table[TMQH_RINGBUFFER_SRSW].InHandler = TmqhInputRingBufferSrSw;
tmqh_table[TMQH_RINGBUFFER_SRSW].InShutdownHandler = TmqhInputRingBufferShutdownHandler;
tmqh_table[TMQH_RINGBUFFER_SRSW].OutHandler = TmqhOutputRingBufferSrSw;
tmqh_table[TMQH_RINGBUFFER_SRMW].name = "ringbuffer_srmw";
tmqh_table[TMQH_RINGBUFFER_SRMW].InHandler = TmqhInputRingBufferSrMw;
tmqh_table[TMQH_RINGBUFFER_SRMW].InShutdownHandler = TmqhInputRingBufferShutdownHandler;
tmqh_table[TMQH_RINGBUFFER_SRMW].OutHandler = TmqhOutputRingBufferSrMw;
memset(ringbuffers, 0, sizeof(ringbuffers));
int i = 0;
for (i = 0; i < 256; i++) {
ringbuffers[i] = RingBufferMrMw8Init();
ringbuffers[i] = RingBuffer8Init();
}
}
void TmqhInputRingBufferShutdownHandler(ThreadVars *tv) {
if (tv == NULL || tv->inq == NULL) {
return;
}
RingBuffer8 *rb = ringbuffers[tv->inq->id];
if (rb == NULL) {
return;
}
rb->shutdown = 1;
}
Packet *TmqhInputRingBuffer(ThreadVars *t)
Packet *TmqhInputRingBufferMrSw(ThreadVars *t)
{
RingBuffer8 *rb = ringbuffers[t->inq->id];
Packet *p = (Packet *)RingBufferMrMw8Get(rb);
Packet *p = (Packet *)RingBufferMrSw8Get(rb);
if (t->sc_perf_pctx.perf_flag == 1)
SCPerfUpdateCounterArray(t->sc_perf_pca, &t->sc_perf_pctx, 0);
@ -65,22 +92,45 @@ Packet *TmqhInputRingBuffer(ThreadVars *t)
return p;
}
void TmqhInputRingBufferShutdownHandler(ThreadVars *tv) {
if (tv == NULL || tv->inq == NULL) {
return;
}
void TmqhOutputRingBufferMrSw(ThreadVars *t, Packet *p)
{
RingBuffer8 *rb = ringbuffers[t->outq->id];
RingBufferMrSw8Put(rb, (void *)p);
}
RingBuffer8 *rb = ringbuffers[tv->inq->id];
if (rb == NULL) {
return;
}
Packet *TmqhInputRingBufferSrSw(ThreadVars *t)
{
RingBuffer8 *rb = ringbuffers[t->inq->id];
rb->shutdown = 1;
Packet *p = (Packet *)RingBufferSrSw8Get(rb);
if (t->sc_perf_pctx.perf_flag == 1)
SCPerfUpdateCounterArray(t->sc_perf_pca, &t->sc_perf_pctx, 0);
return p;
}
void TmqhOutputRingBufferSrSw(ThreadVars *t, Packet *p)
{
RingBuffer8 *rb = ringbuffers[t->outq->id];
RingBufferSrSw8Put(rb, (void *)p);
}
Packet *TmqhInputRingBufferSrMw(ThreadVars *t)
{
RingBuffer8 *rb = ringbuffers[t->inq->id];
Packet *p = (Packet *)RingBufferSrMw8Get(rb);
if (t->sc_perf_pctx.perf_flag == 1)
SCPerfUpdateCounterArray(t->sc_perf_pca, &t->sc_perf_pctx, 0);
return p;
}
void TmqhOutputRingBuffer(ThreadVars *t, Packet *p)
void TmqhOutputRingBufferSrMw(ThreadVars *t, Packet *p)
{
RingBuffer8 *rb = ringbuffers[t->outq->id];
RingBufferMrMw8Put(rb, (void *)p);
RingBufferSrMw8Put(rb, (void *)p);
}

@ -40,39 +40,162 @@
#define USLEEP_TIME 5
/* Multi Reader, Single Writer, 8 bits */
/* Single Reader, Single Writer, 8 bits */
RingBuffer8 *RingBufferMrSw8Init(void) {
RingBuffer8 *rb = SCMalloc(sizeof(RingBuffer8));
if (rb == NULL) {
return NULL;
void *RingBufferSrSw8Get(RingBuffer8 *rb) {
void *ptr = NULL;
/* buffer is empty, wait... */
while (SC_ATOMIC_GET(rb->read) == SC_ATOMIC_GET(rb->write)) {
/* break out if the engine wants to shutdown */
if (rb->shutdown != 0)
return NULL;
#ifdef RINGBUFFER_MUTEX_WAIT
struct timespec cond_time;
cond_time.tv_sec = time(NULL) + 1;
cond_time.tv_nsec = 0;
SCMutexLock(&rb->wait_mutex);
SCCondTimedwait(&rb->wait_cond, &rb->wait_mutex, &cond_time);
SCMutexUnlock(&rb->wait_mutex);
#else
usleep(USLEEP_TIME);
#endif
}
memset(rb, 0x00, sizeof(RingBuffer8));
ptr = rb->array[SC_ATOMIC_GET(rb->read)];
SC_ATOMIC_ADD(rb->read, 1);
SC_ATOMIC_INIT(rb->write);
SC_ATOMIC_INIT(rb->read);
#ifdef RINGBUFFER_MUTEX_WAIT
SCCondSignal(&rb->wait_cond);
#endif
return ptr;
}
int RingBufferSrSw8Put(RingBuffer8 *rb, void *ptr) {
/* buffer is full, wait... */
while ((unsigned char)(SC_ATOMIC_GET(rb->write) + 1) == SC_ATOMIC_GET(rb->read)) {
/* break out if the engine wants to shutdown */
if (rb->shutdown != 0)
return -1;
#ifdef RINGBUFFER_MUTEX_WAIT
SCMutexInit(&rb->wait_mutex, NULL);
SCCondInit(&rb->wait_cond, NULL);
struct timespec cond_time;
cond_time.tv_sec = time(NULL) + 1;
cond_time.tv_nsec = 0;
SCMutexLock(&rb->wait_mutex);
SCCondTimedwait(&rb->wait_cond, &rb->wait_mutex, &cond_time);
SCMutexUnlock(&rb->wait_mutex);
#else
usleep(USLEEP_TIME);
#endif
return rb;
}
rb->array[SC_ATOMIC_GET(rb->write)] = ptr;
SC_ATOMIC_ADD(rb->write, 1);
#ifdef RINGBUFFER_MUTEX_WAIT
SCCondSignal(&rb->wait_cond);
#endif
return 0;
}
void RingBufferMrSw8Destroy(RingBuffer8 *rb) {
if (rb != NULL) {
SC_ATOMIC_DESTROY(rb->write);
SC_ATOMIC_DESTROY(rb->read);
/* Single Reader, Multi Writer, 8 bites */
void *RingBufferSrMw8Get(RingBuffer8 *rb) {
void *ptr = NULL;
/* buffer is empty, wait... */
while (SC_ATOMIC_GET(rb->read) == SC_ATOMIC_GET(rb->write)) {
/* break out if the engine wants to shutdown */
if (rb->shutdown != 0)
return NULL;
#ifdef RINGBUFFER_MUTEX_WAIT
SCMutexDestroy(&rb->wait_mutex);
SCCondDestroy(&rb->wait_cond);
struct timespec cond_time;
cond_time.tv_sec = time(NULL) + 1;
cond_time.tv_nsec = 0;
SCMutexLock(&rb->wait_mutex);
SCCondTimedwait(&rb->wait_cond, &rb->wait_mutex, &cond_time);
SCMutexUnlock(&rb->wait_mutex);
#else
usleep(USLEEP_TIME);
#endif
}
ptr = rb->array[SC_ATOMIC_GET(rb->read)];
SC_ATOMIC_ADD(rb->read, 1);
#ifdef RINGBUFFER_MUTEX_WAIT
SCCondSignal(&rb->wait_cond);
#endif
return ptr;
}
/**
* \brief put a ptr in the RingBuffer.
*
* As we support multiple writers we need to protect 2 things:
* 1. writing the ptr to the array
* 2. incrementing the rb->write idx
*
* We can't do both at the same time in one atomic operation, so
* we need to (spin) lock it. We do increment rb->write atomically
* after that, so that we don't need to use the lock in our *Get
* function.
*
* \param rb the ringbuffer
* \param ptr ptr to store
*
* \retval 0 ok
* \retval -1 wait loop interrupted because of engine flags
*/
int RingBufferSrMw8Put(RingBuffer8 *rb, void *ptr) {
SCLogDebug("ptr %p", ptr);
/* buffer is full, wait... */
retry:
while ((unsigned char)(SC_ATOMIC_GET(rb->write) + 1) == SC_ATOMIC_GET(rb->read)) {
/* break out if the engine wants to shutdown */
if (rb->shutdown != 0)
return -1;
#ifdef RINGBUFFER_MUTEX_WAIT
struct timespec cond_time;
cond_time.tv_sec = time(NULL) + 1;
cond_time.tv_nsec = 0;
SCMutexLock(&rb->wait_mutex);
SCCondTimedwait(&rb->wait_cond, &rb->wait_mutex, &cond_time);
SCMutexUnlock(&rb->wait_mutex);
#else
usleep(USLEEP_TIME);
#endif
SCFree(rb);
}
/* get our lock */
SCSpinLock(&rb->spin);
/* if while we got our lock the buffer changed, we need to retry */
if ((unsigned char)(SC_ATOMIC_GET(rb->write) + 1) == SC_ATOMIC_GET(rb->read)) {
SCSpinUnlock(&rb->spin);
goto retry;
}
SCLogDebug("rb->write %u, ptr %p", SC_ATOMIC_GET(rb->write), ptr);
/* update the ring buffer */
rb->array[SC_ATOMIC_GET(rb->write)] = ptr;
SC_ATOMIC_ADD(rb->write, 1);
SCSpinUnlock(&rb->spin);
SCLogDebug("ptr %p, done", ptr);
#ifdef RINGBUFFER_MUTEX_WAIT
SCCondSignal(&rb->wait_cond);
#endif
return 0;
}
/* Multi Reader, Single Writer, 8 bits */
/**
* \brief get the next ptr from the ring buffer
*
@ -163,37 +286,6 @@ int RingBufferMrSw8Put(RingBuffer8 *rb, void *ptr) {
/* Multi Reader, Single Writer */
RingBuffer16 *RingBufferMrSwInit(void) {
RingBuffer16 *rb = SCMalloc(sizeof(RingBuffer16));
if (rb == NULL) {
return NULL;
}
memset(rb, 0x00, sizeof(RingBuffer16));
SC_ATOMIC_INIT(rb->write);
SC_ATOMIC_INIT(rb->read);
#ifdef RINGBUFFER_MUTEX_WAIT
SCMutexInit(&rb->wait_mutex, NULL);
SCCondInit(&rb->wait_cond, NULL);
#endif
return rb;
}
void RingBufferMrSwDestroy(RingBuffer16 *rb) {
if (rb != NULL) {
SC_ATOMIC_DESTROY(rb->write);
SC_ATOMIC_DESTROY(rb->read);
#ifdef RINGBUFFER_MUTEX_WAIT
SCMutexDestroy(&rb->wait_mutex);
SCCondDestroy(&rb->wait_cond);
#endif
SCFree(rb);
}
}
/**
* \brief get the next ptr from the ring buffer
*
@ -284,38 +376,6 @@ int RingBufferMrSwPut(RingBuffer16 *rb, void *ptr) {
/* Single Reader, Single Writer */
RingBuffer16 *RingBufferSrSwInit(void) {
RingBuffer16 *rb = SCMalloc(sizeof(RingBuffer16));
if (rb == NULL) {
return NULL;
}
memset(rb, 0x00, sizeof(RingBuffer16));
SC_ATOMIC_INIT(rb->write);
SC_ATOMIC_INIT(rb->read);
#ifdef RINGBUFFER_MUTEX_WAIT
SCMutexInit(&rb->wait_mutex, NULL);
SCCondInit(&rb->wait_cond, NULL);
#endif
return rb;
}
void RingBufferSrSwDestroy(RingBuffer16 *rb) {
if (rb != NULL) {
SC_ATOMIC_DESTROY(rb->write);
SC_ATOMIC_DESTROY(rb->read);
#ifdef RINGBUFFER_MUTEX_WAIT
SCMutexDestroy(&rb->wait_mutex);
SCCondDestroy(&rb->wait_cond);
#endif
SCFree(rb);
}
}
void *RingBufferSrSwGet(RingBuffer16 *rb) {
void *ptr = NULL;
@ -376,7 +436,7 @@ int RingBufferSrSwPut(RingBuffer16 *rb, void *ptr) {
/* Multi Reader, Multi Writer, 8 bits */
RingBuffer8 *RingBufferMrMw8Init(void) {
RingBuffer8 *RingBuffer8Init(void) {
RingBuffer8 *rb = SCMalloc(sizeof(RingBuffer8));
if (rb == NULL) {
return NULL;
@ -395,7 +455,7 @@ RingBuffer8 *RingBufferMrMw8Init(void) {
return rb;
}
void RingBufferMrMw8Destroy(RingBuffer8 *rb) {
void RingBuffer8Destroy(RingBuffer8 *rb) {
if (rb != NULL) {
SC_ATOMIC_DESTROY(rb->write);
SC_ATOMIC_DESTROY(rb->read);
@ -526,7 +586,7 @@ retry:
/* Multi Reader, Multi Writer, 16 bits */
RingBuffer16 *RingBufferMrMwInit(void) {
RingBuffer16 *RingBufferInit(void) {
RingBuffer16 *rb = SCMalloc(sizeof(RingBuffer16));
if (rb == NULL) {
return NULL;
@ -545,7 +605,7 @@ RingBuffer16 *RingBufferMrMwInit(void) {
return rb;
}
void RingBufferMrMwDestroy(RingBuffer16 *rb) {
void RingBufferDestroy(RingBuffer16 *rb) {
if (rb != NULL) {
SC_ATOMIC_DESTROY(rb->write);
SC_ATOMIC_DESTROY(rb->read);

@ -64,46 +64,44 @@ typedef struct RingBuffer16_ {
void *array[RING_BUFFER_16_SIZE];
} RingBuffer16;
RingBuffer8 *RingBuffer8Init(void);
void RingBuffer8Destroy(RingBuffer8 *);
/** Single Reader, Single Writer ring buffer, fixed at
* 256 items so we can use unsigned char's that just
* wrap around */
void *RingBufferSrSw8Get(RingBuffer8 *);
int RingBufferSrSw8Put(RingBuffer8 *, void *);
/** Multiple Reader, Single Writer ring buffer, fixed at
* 256 items so we can use unsigned char's that just
* wrap around */
void *RingBufferMrSw8Get(RingBuffer8 *);
int RingBufferMrSw8Put(RingBuffer8 *, void *);
RingBuffer8 *RingBufferMrSw8Init(void);
void RingBufferMrSw8Destroy(RingBuffer8 *);
/** Multiple Reader, Single Writer ring buffer, fixed at
* 65536 items so we can use unsigned shorts that just
* wrap around */
void *RingBufferMrSwGet(RingBuffer16 *);
int RingBufferMrSwPut(RingBuffer16 *, void *);
RingBuffer16 *RingBufferMrSwInit(void);
void RingBufferMrSwDestroy(RingBuffer16 *);
/** Single Reader, Single Writer ring buffer, fixed at
* 65536 items so we can use unsigned shorts that just
* wrap around */
void *RingBufferSrSwGet(RingBuffer16 *);
int RingBufferSrSwPut(RingBuffer16 *, void *);
RingBuffer16 *RingBufferSrSwInit(void);
void RingBufferSrSwDestroy(RingBuffer16 *);
/** Multiple Reader, Multi Writer ring buffer, fixed at
* 256 items so we can use unsigned char's that just
* wrap around */
void *RingBufferMrMw8Get(RingBuffer8 *);
int RingBufferMrMw8Put(RingBuffer8 *, void *);
RingBuffer8 *RingBufferMrMw8Init(void);
void RingBufferMrMw8Destroy(RingBuffer8 *);
/** Multiple Reader, Multi Writer ring buffer, fixed at
* 65536 items so we can use unsigned char's that just
* wrap around */
void *RingBufferMrMwGet(RingBuffer16 *);
int RingBufferMrMwPut(RingBuffer16 *, void *);
RingBuffer16 *RingBufferMrMwInit(void);
void RingBufferMrMwDestroy(RingBuffer16 *);
#endif /* __UTIL_RINGBUFFER_H__ */

Loading…
Cancel
Save