From 6eb7eea705a82daa2fb4d368538d88392b1816e9 Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Tue, 22 Jun 2010 17:44:49 +0200 Subject: [PATCH] Fix a data race for packet pool packets when defrag/tunnel code needs a packet. --- src/tmqh-packetpool.c | 7 +++++-- src/util-ringbuffer.c | 45 +++++++++++++++++++++++++++++++++++++++++++ src/util-ringbuffer.h | 15 ++++++++++++++- 3 files changed, 64 insertions(+), 3 deletions(-) diff --git a/src/tmqh-packetpool.c b/src/tmqh-packetpool.c index 5bb0423d03..dff81ba38e 100644 --- a/src/tmqh-packetpool.c +++ b/src/tmqh-packetpool.c @@ -80,11 +80,14 @@ void PacketPoolStorePacket(Packet *p) { SCLogDebug("buffersize %u", RingBufferSize(ringbuffer)); } +/** \brief get a packet from the packet pool, but if the + * pool is empty, don't wait, just return NULL + */ Packet *PacketPoolGetPacket(void) { if (RingBufferIsEmpty(ringbuffer)) return NULL; - Packet *p = RingBufferMrMwGet(ringbuffer); + Packet *p = RingBufferMrMwGetNoWait(ringbuffer); return p; } @@ -92,7 +95,7 @@ Packet *TmqhInputPacketpool(ThreadVars *t) { Packet *p = NULL; - while(p == NULL) { + while (p == NULL && ringbuffer->shutdown == FALSE) { p = RingBufferMrMwGet(ringbuffer); } diff --git a/src/util-ringbuffer.c b/src/util-ringbuffer.c index 0008607601..54d4b48cb0 100644 --- a/src/util-ringbuffer.c +++ b/src/util-ringbuffer.c @@ -666,6 +666,51 @@ retry: return ptr; } +/** + * \brief get the next ptr from the ring buffer + * + * Because we allow for multiple readers we take great care in making sure + * that the threads don't interfere with one another. + * + * This version does NOT enter a wait if the buffer is empty loop. + * + * \retval ptr pointer to the data, or NULL if buffer is empty + */ +void *RingBufferMrMwGetNoWait(RingBuffer16 *rb) { + void *ptr; + /** local pointer for data races. If SCAtomicCompareAndSwap (CAS) + * fails we increase our local array idx to try the next array member + * until we succeed. Or when the buffer is empty again we jump back + * to the waiting loop. */ + unsigned short readp; + + /* buffer is empty, wait... */ +retry: + while (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read)) { + /* break if buffer is empty */ + return NULL; + } + + /* atomically update rb->read */ + readp = SC_ATOMIC_GET(rb->read) - 1; + do { + /* with multiple readers we can get in the situation that we exitted + * from the wait loop but the rb is empty again once we get here. */ + if (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read)) + goto retry; + + readp++; + ptr = rb->array[readp]; + } while (!(SC_ATOMIC_CAS(&rb->read, readp, (readp + 1)))); + + SCLogDebug("ptr %p", ptr); + +#ifdef RINGBUFFER_MUTEX_WAIT + SCCondSignal(&rb->wait_cond); +#endif + return ptr; +} + /** * \brief put a ptr in the RingBuffer. * diff --git a/src/util-ringbuffer.h b/src/util-ringbuffer.h index 15f6474574..4de93edb27 100644 --- a/src/util-ringbuffer.h +++ b/src/util-ringbuffer.h @@ -29,7 +29,19 @@ #include "threads.h" /** When the ringbuffer is full we have two options, either we spin & sleep - * or we use a pthread condition to wait. */ + * or we use a pthread condition to wait. + * + * \warning this approach isn't working due to a race condition between the + * time it takes for a thread to enter the condwait and the + * signalling. I've obverved the following case: T1 sees that the + * ringbuffer is empty, so it decides to start the wait condition. + * While it is acquiring the lock and entering the wait, T0 puts a + * number of items in the buffer. For each of these it signals T1. + * However, as that thread isn't in the "wait" mode yet, the signals + * are lost. T0 now is done as well and enters it's own wait + * condition. T1 completes it's "wait" initialization. It waits for + * signals, but T0 won't be able to send them as it's waiting itself. + */ //#define RINGBUFFER_MUTEX_WAIT /** \brief ring buffer api @@ -112,6 +124,7 @@ int RingBufferMrMw8Put(RingBuffer8 *, void *); * 65536 items so we can use unsigned char's that just * wrap around */ void *RingBufferMrMwGet(RingBuffer16 *); +void *RingBufferMrMwGetNoWait(RingBuffer16 *); int RingBufferMrMwPut(RingBuffer16 *, void *); void *RingBufferSrMw8Get(RingBuffer8 *);