lockfree ringbuffer wip2, including proper shutdown.

remotes/origin/master-1.0.x
Victor Julien 16 years ago
parent a48a767efc
commit 6f502f0da5

@ -142,13 +142,8 @@ volatile sig_atomic_t sigterm_count = 0;
/* Max packets processed simultaniously. */
#define DEFAULT_MAX_PENDING_PACKETS 50
#define SURICATA_SIGINT 0x01
#define SURICATA_SIGHUP 0x02
#define SURICATA_SIGTERM 0x04
#define SURICATA_STOP 0x08
#define SURICATA_KILL 0x10
static uint8_t sigflags = 0;
/** suricata engine control flags */
uint8_t suricata_ctl_flags = 0;
/** Run mode selected */
int run_mode = MODE_UNKNOWN;
@ -166,9 +161,20 @@ int RunmodeIsUnittests(void) {
return 0;
}
static void SignalHandlerSigint(/*@unused@*/ int sig) { sigint_count = 1; sigflags |= SURICATA_SIGINT; }
static void SignalHandlerSigterm(/*@unused@*/ int sig) { sigterm_count = 1; sigflags |= SURICATA_SIGTERM; }
static void SignalHandlerSighup(/*@unused@*/ int sig) { sighup_count = 1; sigflags |= SURICATA_SIGHUP; }
static void SignalHandlerSigint(/*@unused@*/ int sig) {
sigint_count = 1;
suricata_ctl_flags |= SURICATA_STOP;
}
static void SignalHandlerSigterm(/*@unused@*/ int sig) {
sigterm_count = 1;
suricata_ctl_flags |= SURICATA_KILL;
}
#if 0
static void SignalHandlerSighup(/*@unused@*/ int sig) {
sighup_count = 1;
suricata_ctl_flags |= SURICATA_SIGHUP;
}
#endif
#ifdef DBG_MEM_ALLOC
#ifndef _GLOBAL_MEM_
@ -225,11 +231,11 @@ void GlobalInits()
function. Purpose: pcap file mode needs to be able to tell the
engine the file eof is reached. */
void EngineStop(void) {
sigflags |= SURICATA_STOP;
suricata_ctl_flags |= SURICATA_STOP;
}
void EngineKill(void) {
sigflags |= SURICATA_KILL;
suricata_ctl_flags |= SURICATA_KILL;
}
static void SetBpfString(int optind, char *argv[]) {
@ -833,7 +839,7 @@ int main(int argc, char **argv)
/* registering signals we use */
SignalHandlerSetup(SIGINT, SignalHandlerSigint);
SignalHandlerSetup(SIGTERM, SignalHandlerSigterm);
SignalHandlerSetup(SIGHUP, SignalHandlerSighup);
//SignalHandlerSetup(SIGHUP, SignalHandlerSighup);
/* Get the suricata user ID to given user ID */
if (do_setuid == TRUE) {
@ -981,18 +987,18 @@ int main(int argc, char **argv)
#endif
while(1) {
if (sigflags) {
if (suricata_ctl_flags != 0) {
SCLogInfo("signal received");
if (sigflags & SURICATA_STOP) {
SCLogInfo("SIGINT or EngineStop received");
if (suricata_ctl_flags & SURICATA_STOP) {
SCLogInfo("EngineStop received");
/* Stop the engine so it quits after processing the pcap file
* but first make sure all packets are processed by all other
* threads. */
char done = 0;
do {
if (sigflags & SURICATA_SIGTERM || sigflags & SURICATA_KILL)
if (suricata_ctl_flags & SURICATA_KILL)
break;
SCMutexLock(&packet_q.mutex_q);
@ -1007,12 +1013,6 @@ int main(int argc, char **argv)
SCLogInfo("all packets processed by threads, stopping engine");
}
if (sigflags & SURICATA_SIGHUP) {
SCLogInfo("SIGHUP received");
}
if (sigflags & SURICATA_SIGTERM) {
SCLogInfo("SIGTERM received");
}
struct timeval end_time;
memset(&end_time, 0, sizeof(end_time));

@ -44,6 +44,11 @@
//SCMutex mutex_pending;
//SCCondT cond_pending;
/* runtime engine control flags */
#define SURICATA_STOP 0x01 /**< gracefully stop the engine: process all
outstanding packets first */
#define SURICATA_KILL 0x02 /**< shut down asap, discarding outstanding
packets. */
/* Run mode */
enum {

@ -1,6 +1,10 @@
#include "suricata-common.h"
#include "suricata.h"
#include "util-ringbuffer.h"
/* suricata engine control flags */
extern uint8_t suricata_ctl_flags;
/* Multi Reader, Single Writer */
RingBufferMrSw *RingBufferMrSwInit(void) {
@ -10,30 +14,42 @@ RingBufferMrSw *RingBufferMrSwInit(void) {
}
memset(rb, 0x00, sizeof(RingBufferMrSw));
return rb;
}
void RingBufferMrSwDestroy(RingBufferMrSw *rb) {
if (rb == NULL) {
if (rb != NULL) {
SCFree(rb);
}
}
/**
* \brief get the next ptr from the ring buffer
*
* Because we allow for multiple readers we take great care in making sure
* that the threads don't interfere with one another.
*
*/
void *RingBufferMrSwGet(RingBufferMrSw *rb) {
void *ptr;
/* counter for data races. If __sync_bool_compare_and_swap (CAS) fails,
* we increase cnt, get a new ptr and try to do CAS again. We init it to
* -1 so it's 0 when first used the do { } while() loop. */
unsigned short readp = -1;
/** local pointer for data races. If __sync_bool_compare_and_swap (CAS)
* fails we increase our local array idx to try the next array member
* until we succeed. Or when the buffer is empty again we jump back
* to the waiting loop. */
unsigned short readp;
/* buffer is empty, wait... */
retry:
while (rb->read == rb->write) {
/* break out if the engine wants to shutdown */
if (suricata_ctl_flags != 0)
return NULL;
usleep(1);
}
/* atomically update rb->read */
readp += rb->read;
readp = rb->read - 1;
do {
/* with multiple readers we can get in the situation that we exitted
* from the wait loop but the rb is empty again once we get here. */
@ -51,16 +67,21 @@ retry:
/**
* \brief put a ptr in the RingBuffer
*/
void RingBufferMrSwPut(RingBufferMrSw *rb, void *ptr) {
int RingBufferMrSwPut(RingBufferMrSw *rb, void *ptr) {
SCLogDebug("ptr %p", ptr);
/* buffer is full, wait... */
while ((rb->write + 1) == rb->read) {
/* break out if the engine wants to shutdown */
if (suricata_ctl_flags != 0)
return -1;
usleep(1);
}
rb->array[rb->write] = ptr;
__sync_fetch_and_add(&rb->write, 1);
return 0;
}
@ -73,12 +94,11 @@ RingBufferSrSw *RingBufferSrSwInit(void) {
}
memset(rb, 0x00, sizeof(RingBufferSrSw));
return rb;
}
void RingBufferSrSwDestroy(RingBufferSrSw *rb) {
if (rb == NULL) {
if (rb != NULL) {
SCFree(rb);
}
}
@ -88,6 +108,11 @@ void *RingBufferSrSwGet(RingBufferSrSw *rb) {
/* buffer is empty, wait... */
while (rb->read == rb->write) {
/* break out if the engine wants to shutdown */
if (suricata_ctl_flags != 0)
return NULL;
usleep(1);
}
ptr = rb->array[rb->read];
@ -96,12 +121,18 @@ void *RingBufferSrSwGet(RingBufferSrSw *rb) {
return ptr;
}
void RingBufferSrSwPut(RingBufferSrSw *rb, void *ptr) {
int RingBufferSrSwPut(RingBufferSrSw *rb, void *ptr) {
/* buffer is full, wait... */
while ((rb->write + 1) == rb->read) {
/* break out if the engine wants to shutdown */
if (suricata_ctl_flags != 0)
return -1;
usleep(1);
}
rb->array[rb->write] = ptr;
__sync_fetch_and_add(&rb->write, 1);
return 0;
}

@ -18,7 +18,7 @@ typedef struct RingBufferMrSw_ {
} RingBufferMrSw;
void *RingBufferMrSwGet(RingBufferMrSw *);
void RingBufferMrSwPut(RingBufferMrSw *, void *);
int RingBufferMrSwPut(RingBufferMrSw *, void *);
RingBufferMrSw *RingBufferMrSwInit(void);
void RingBufferMrSwDestroy(RingBufferMrSw *);
@ -34,7 +34,7 @@ typedef struct RingBufferSrSw_ {
} RingBufferSrSw;
void *RingBufferSrSwGet(RingBufferSrSw *);
void RingBufferSrSwPut(RingBufferSrSw *, void *);
int RingBufferSrSwPut(RingBufferSrSw *, void *);
RingBufferSrSw *RingBufferSrSwInit(void);
void RingBufferSrSwDestroy(RingBufferSrSw *);

Loading…
Cancel
Save