Add atomic stack implementation. Convert flow spare queue to use this stack. Remove now unused flow-queue code.

remotes/origin/HEAD
Victor Julien 14 years ago
parent 4a186bcf1d
commit 88b8f15663

@ -3,7 +3,7 @@ noinst_HEADERS = action-globals.h \
debug.h \
flow-private.h queue.h source-nfq-prototypes.h \
suricata-common.h threadvars.h util-binsearch.h \
util-atomic.h util-validate.h
util-atomic.h util-validate.h util-stack.h
bin_PROGRAMS = suricata
suricata_SOURCES = suricata.c suricata.h \
runmodes.c runmodes.h \
@ -46,7 +46,6 @@ decode-sctp.c decode-sctp.h \
flow.c flow.h \
flow-timeout.c flow-timeout.h \
flow-manager.c flow-manager.h \
flow-queue.c flow-queue.h \
flow-hash.c flow-hash.h \
flow-util.c flow-util.h \
util-mem.h \
@ -171,6 +170,7 @@ detect-replace.c detect-replace.h \
util-magic.c util-magic.h \
util-misc.c util-misc.h \
util-atomic.h \
util-stack.h \
util-print.c util-print.h \
util-fmemopen.c util-fmemopen.h \
util-cpu.c util-cpu.h \

@ -306,7 +306,7 @@ static Flow *FlowGetNew(Packet *p) {
}
/* get a flow from the spare queue */
f = FlowDequeue(&flow_spare_q);
f = FlowSpareGet();
if (f == NULL) {
/* If we reached the max memcap, we get a used flow */
if ((SC_ATOMIC_GET(flow_memuse) + sizeof(Flow)) > flow_config.memcap) {

@ -34,7 +34,6 @@
#include "util-time.h"
#include "flow.h"
#include "flow-queue.h"
#include "flow-hash.h"
#include "flow-util.h"
#include "flow-var.h"
@ -293,7 +292,7 @@ static uint32_t FlowManagerHashRowTimeout(Flow *f, struct timeval *ts,
SCMutexUnlock(&f->m);
/* move to spare list */
FlowMoveToSpare(f);
FlowSpareStore(f);
cnt++;
@ -458,10 +457,7 @@ void *FlowManagerThread(void *td)
long long unsigned int flow_memuse = SC_ATOMIC_GET(flow_memuse);
SCPerfCounterSetUI64(flow_mgr_memuse, th_v->sc_perf_pca, (uint64_t)flow_memuse);
uint32_t len = 0;
FQLOCK_LOCK(&flow_spare_q);
len = flow_spare_q.len;
FQLOCK_UNLOCK(&flow_spare_q);
uint32_t len = FlowSpareSize();
SCPerfCounterSetUI64(flow_mgr_spare, th_v->sc_perf_pca, (uint64_t)len);
/* Don't fear, FlowManagerThread is here...
@ -558,7 +554,7 @@ static int FlowMgrTest01 (void) {
FlowBucket fb;
struct timeval ts;
FlowQueueInit(&flow_spare_q);
FlowSpareInit();
memset(&ssn, 0, sizeof(TcpSession));
memset(&f, 0, sizeof(Flow));
@ -581,14 +577,15 @@ static int FlowMgrTest01 (void) {
if (FlowManagerFlowTimeout(&f, state, &ts, 0) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
FBLOCK_DESTROY(&fb);
FLOW_DESTROY(&f);
FlowQueueDestroy(&flow_spare_q);
FlowSpareDestroy();
return 0;
}
FBLOCK_DESTROY(&fb);
FLOW_DESTROY(&f);
FlowQueueDestroy(&flow_spare_q);
FlowSpareDestroy();
return 1;
}
@ -608,7 +605,7 @@ static int FlowMgrTest02 (void) {
TcpStream client;
uint8_t payload[3] = {0x41, 0x41, 0x41};
FlowQueueInit(&flow_spare_q);
FlowSpareInit();
memset(&ssn, 0, sizeof(TcpSession));
memset(&f, 0, sizeof(Flow));
@ -639,12 +636,12 @@ static int FlowMgrTest02 (void) {
if (FlowManagerFlowTimeout(&f, state, &ts, 0) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
FBLOCK_DESTROY(&fb);
FLOW_DESTROY(&f);
FlowQueueDestroy(&flow_spare_q);
FlowSpareDestroy();
return 0;
}
FBLOCK_DESTROY(&fb);
FLOW_DESTROY(&f);
FlowQueueDestroy(&flow_spare_q);
FlowSpareDestroy();
return 1;
}
@ -662,7 +659,7 @@ static int FlowMgrTest03 (void) {
FlowBucket fb;
struct timeval ts;
FlowQueueInit(&flow_spare_q);
FlowSpareInit();
memset(&ssn, 0, sizeof(TcpSession));
memset(&f, 0, sizeof(Flow));
@ -685,13 +682,13 @@ static int FlowMgrTest03 (void) {
if (FlowManagerFlowTimeout(&f, state, &ts, 0) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
FBLOCK_DESTROY(&fb);
FLOW_DESTROY(&f);
FlowQueueDestroy(&flow_spare_q);
FlowSpareDestroy();
return 0;
}
FBLOCK_DESTROY(&fb);
FLOW_DESTROY(&f);
FlowQueueDestroy(&flow_spare_q);
FlowSpareDestroy();
return 1;
}
@ -712,7 +709,7 @@ static int FlowMgrTest04 (void) {
TcpStream client;
uint8_t payload[3] = {0x41, 0x41, 0x41};
FlowQueueInit(&flow_spare_q);
FlowSpareInit();
memset(&ssn, 0, sizeof(TcpSession));
memset(&f, 0, sizeof(Flow));
@ -744,13 +741,13 @@ static int FlowMgrTest04 (void) {
if (FlowManagerFlowTimeout(&f, state, &ts, 0) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
FBLOCK_DESTROY(&fb);
FLOW_DESTROY(&f);
FlowQueueDestroy(&flow_spare_q);
FlowSpareDestroy();
return 0;
}
FBLOCK_DESTROY(&fb);
FLOW_DESTROY(&f);
FlowQueueDestroy(&flow_spare_q);
FlowSpareDestroy();
return 1;
}
@ -769,7 +766,7 @@ static int FlowMgrTest05 (void) {
memcpy(&backup, &flow_config, sizeof(FlowConfig));
uint32_t ini = 0;
uint32_t end = flow_spare_q.len;
uint32_t end = FlowSpareSize();
flow_config.memcap = 10000;
flow_config.prealloc = 100;
@ -795,7 +792,7 @@ static int FlowMgrTest05 (void) {
FlowTimeoutCounters counters = { 0, 0, 0, };
FlowTimeoutHash(&ts, 0 /* check all */, &counters);
if (flow_spare_q.len > 0) {
if (FlowSpareSize() > 0) {
result = 1;
}

@ -25,9 +25,8 @@
#define __FLOW_PRIVATE_H__
#include "flow-hash.h"
#include "flow-queue.h"
#include "util-atomic.h"
#include "util-stack.h"
/* global flow flags */
@ -76,10 +75,41 @@ enum {
FlowProto flow_proto[FLOW_PROTO_MAX];
/** spare/unused/prealloced flows live here */
FlowQueue flow_spare_q;
Stack flow_spare_stack;
#define FlowSpareInit(void) do { \
STACK_INIT(&flow_spare_stack); \
SC_ATOMIC_INIT(flow_spare_cnt); \
} while (0)
#define FlowSpareDestroy(void) do { \
STACK_DESTROY(&flow_spare_stack); \
SC_ATOMIC_DESTROY(flow_spare_cnt); \
} while (0)
#define FlowSpareGet(void) ({ \
Flow *f = STACK_POP(&flow_spare_stack, Flow_); \
if (f != NULL) { \
FlowSpareDecr(); \
} \
f; \
})
#define FlowSpareStore(f) ({ \
STACK_PUSH(&flow_spare_stack, (f), Flow_); \
FlowSpareIncr(); \
})
#define FlowSpareSize(void) \
SC_ATOMIC_GET(flow_spare_cnt)
#define FlowSpareIncr(void) \
SC_ATOMIC_ADD(flow_spare_cnt, 1)
#define FlowSpareDecr(void) \
SC_ATOMIC_SUB(flow_spare_cnt, 1)
FlowBucket *flow_hash;
FlowConfig flow_config;
SC_ATOMIC_DECLARE(unsigned int, flow_spare_cnt);
/** flow memuse counter (atomic), for enforcing memcap limit */
SC_ATOMIC_DECLARE(long long unsigned int, flow_memuse);

@ -1,163 +0,0 @@
/* Copyright (C) 2007-2012 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*/
/**
* \file
*
* \author Victor Julien <victor@inliniac.net>
*
* Flow queue handler functions
*/
#include "suricata-common.h"
#include "threads.h"
#include "debug.h"
#include "flow-private.h"
#include "flow-queue.h"
#include "flow-util.h"
#include "util-error.h"
#include "util-debug.h"
#include "util-print.h"
FlowQueue *FlowQueueNew() {
FlowQueue *q = (FlowQueue *)SCMalloc(sizeof(FlowQueue));
if (q == NULL) {
SCLogError(SC_ERR_FATAL, "Fatal error encountered in FlowQueueNew. Exiting...");
exit(EXIT_SUCCESS);
}
q = FlowQueueInit(q);
return q;
}
FlowQueue *FlowQueueInit (FlowQueue *q) {
if (q != NULL) {
memset(q, 0, sizeof(FlowQueue));
FQLOCK_INIT(q);
}
return q;
}
/**
* \brief Destroy a flow queue
*
* \param q the flow queue to destroy
*/
void FlowQueueDestroy (FlowQueue *q) {
FQLOCK_DESTROY(q);
}
/**
* \brief add a flow to a queue
*
* \param q queue
* \param f flow
*/
void FlowEnqueue (FlowQueue *q, Flow *f) {
#ifdef DEBUG
BUG_ON(q == NULL || f == NULL);
#endif
FQLOCK_LOCK(q);
/* more flows in queue */
if (q->top != NULL) {
f->lnext = q->top;
q->top->lprev = f;
q->top = f;
/* only flow */
} else {
q->top = f;
q->bot = f;
}
q->len++;
#ifdef DBG_PERF
if (q->len > q->dbg_maxlen)
q->dbg_maxlen = q->len;
#endif /* DBG_PERF */
FQLOCK_UNLOCK(q);
}
/**
* \brief remove a flow from the queue
*
* \param q queue
*
* \retval f flow or NULL if empty list.
*/
Flow *FlowDequeue (FlowQueue *q) {
FQLOCK_LOCK(q);
Flow *f = q->bot;
if (f == NULL) {
FQLOCK_UNLOCK(q);
return NULL;
}
/* more packets in queue */
if (q->bot->lprev != NULL) {
q->bot = q->bot->lprev;
q->bot->lnext = NULL;
/* just the one we remove, so now empty */
} else {
q->top = NULL;
q->bot = NULL;
}
#ifdef DEBUG
BUG_ON(q->len == 0);
#endif
if (q->len > 0)
q->len--;
f->lnext = NULL;
f->lprev = NULL;
FQLOCK_UNLOCK(q);
return f;
}
/**
* \brief Transfer a flow from a queue to the spare queue
*
* \param f the flow to be transfered
* \param q the source queue, where the flow will be removed. This queue is locked.
*
* \note spare queue needs locking
*/
void FlowMoveToSpare(Flow *f)
{
/* now put it in spare */
FQLOCK_LOCK(&flow_spare_q);
/* add to new queue (append) */
f->lprev = flow_spare_q.bot;
if (f->lprev != NULL)
f->lprev->lnext = f;
f->lnext = NULL;
flow_spare_q.bot = f;
if (flow_spare_q.top == NULL)
flow_spare_q.top = f;
flow_spare_q.len++;
#ifdef DBG_PERF
if (flow_spare_q.len > flow_spare_q.dbg_maxlen)
flow_spare_q.dbg_maxlen = flow_spare_q.len;
#endif /* DBG_PERF */
FQLOCK_UNLOCK(&flow_spare_q);
}

@ -1,85 +0,0 @@
/* Copyright (C) 2007-2012 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*/
/**
* \file
*
* \author Victor Julien <victor@inliniac.net>
*/
#ifndef __FLOW_QUEUE_H__
#define __FLOW_QUEUE_H__
#include "suricata-common.h"
#include "flow.h"
/** Spinlocks or Mutex for the flow queues. */
//#define FQLOCK_SPIN
#define FQLOCK_MUTEX
#ifdef FQLOCK_SPIN
#ifdef FQLOCK_MUTEX
#error Cannot enable both FQLOCK_SPIN and FQLOCK_MUTEX
#endif
#endif
/* Define a queue for storing flows */
typedef struct FlowQueue_
{
Flow *top;
Flow *bot;
uint32_t len;
#ifdef DBG_PERF
uint32_t dbg_maxlen;
#endif /* DBG_PERF */
#ifdef FQLOCK_MUTEX
SCMutex m;
#elif defined FQLOCK_SPIN
SCSpinlock s;
#else
#error Enable FQLOCK_SPIN or FQLOCK_MUTEX
#endif
} FlowQueue;
#ifdef FQLOCK_SPIN
#define FQLOCK_INIT(q) SCSpinInit(&(q)->s, 0)
#define FQLOCK_DESTROY(q) SCSpinDestroy(&(q)->s)
#define FQLOCK_LOCK(q) SCSpinLock(&(q)->s)
#define FQLOCK_TRYLOCK(q) SCSpinTrylock(&(q)->s)
#define FQLOCK_UNLOCK(q) SCSpinUnlock(&(q)->s)
#elif defined FQLOCK_MUTEX
#define FQLOCK_INIT(q) SCMutexInit(&(q)->m, NULL)
#define FQLOCK_DESTROY(q) SCMutexDestroy(&(q)->m)
#define FQLOCK_LOCK(q) SCMutexLock(&(q)->m)
#define FQLOCK_TRYLOCK(q) SCMutexTrylock(&(q)->m)
#define FQLOCK_UNLOCK(q) SCMutexUnlock(&(q)->m)
#else
#error Enable FQLOCK_SPIN or FQLOCK_MUTEX
#endif
/* prototypes */
FlowQueue *FlowQueueNew();
FlowQueue *FlowQueueInit(FlowQueue *);
void FlowQueueDestroy (FlowQueue *);
void FlowEnqueue (FlowQueue *, Flow *);
Flow *FlowDequeue (FlowQueue *);
void FlowMoveToSpare(Flow *);
#endif /* __FLOW_QUEUE_H__ */

@ -33,7 +33,6 @@
#include "util-time.h"
#include "flow.h"
#include "flow-queue.h"
#include "flow-hash.h"
#include "flow-util.h"
#include "flow-var.h"

@ -59,8 +59,7 @@
SCMutexInit(&(f)->de_state_m, NULL); \
(f)->hnext = NULL; \
(f)->hprev = NULL; \
(f)->lnext = NULL; \
(f)->lprev = NULL; \
(f)->stack_next = NULL; \
RESET_COUNTERS((f)); \
} while (0)

@ -35,7 +35,6 @@
#include "util-time.h"
#include "flow.h"
#include "flow-queue.h"
#include "flow-hash.h"
#include "flow-util.h"
#include "flow-var.h"
@ -114,9 +113,7 @@ int FlowUpdateSpareFlows(void)
SCEnter();
uint32_t toalloc = 0, tofree = 0, len;
FQLOCK_LOCK(&flow_spare_q);
len = flow_spare_q.len;
FQLOCK_UNLOCK(&flow_spare_q);
len = FlowSpareSize();
if (len < flow_config.prealloc) {
toalloc = flow_config.prealloc - len;
@ -127,15 +124,14 @@ int FlowUpdateSpareFlows(void)
if (f == NULL)
return 0;
FlowEnqueue(&flow_spare_q,f);
FlowSpareStore(f);
}
} else if (len > flow_config.prealloc) {
tofree = len - flow_config.prealloc;
uint32_t i;
for (i = 0; i < tofree; i++) {
/* FlowDequeue locks the queue */
Flow *f = FlowDequeue(&flow_spare_q);
Flow *f = FlowSpareGet();
if (f == NULL)
return 1;
@ -326,7 +322,7 @@ void FlowInitConfig(char quiet)
SC_ATOMIC_INIT(flow_flags);
SC_ATOMIC_INIT(flow_memuse);
SC_ATOMIC_INIT(flow_prune_idx);
FlowQueueInit(&flow_spare_q);
FlowSpareInit();
unsigned int seed = RandomTimePreseed();
/* set defaults */
@ -423,12 +419,13 @@ void FlowInitConfig(char quiet)
printf("ERROR: FlowAlloc failed: %s\n", strerror(errno));
exit(1);
}
FlowEnqueue(&flow_spare_q,f);
FlowSpareStore(f);
}
if (quiet == FALSE) {
SCLogInfo("preallocated %" PRIu32 " flows of size %" PRIuMAX "",
flow_spare_q.len, (uintmax_t)sizeof(Flow));
FlowSpareSize(), (uintmax_t)sizeof(Flow));
SCLogInfo("flow memory usage: %llu bytes, maximum: %"PRIu64,
SC_ATOMIC_GET(flow_memuse), flow_config.memcap);
}
@ -458,8 +455,7 @@ void FlowShutdown(void)
FlowPrintStats();
/* free spare queue */
while((f = FlowDequeue(&flow_spare_q))) {
while ((f = FlowSpareGet())) {
FlowFree(f);
}
@ -482,7 +478,7 @@ void FlowShutdown(void)
flow_hash = NULL;
}
SC_ATOMIC_SUB(flow_memuse, flow_config.hash_size * sizeof(FlowBucket));
FlowQueueDestroy(&flow_spare_q);
FlowSpareDestroy();
SC_ATOMIC_DESTROY(flow_prune_idx);
SC_ATOMIC_DESTROY(flow_memuse);
@ -939,7 +935,7 @@ static int FlowTest07 (void) {
memcpy(&backup, &flow_config, sizeof(FlowConfig));
uint32_t ini = 0;
uint32_t end = flow_spare_q.len;
uint32_t end = FlowSpareSize();
flow_config.memcap = 10000;
flow_config.prealloc = 100;
@ -986,7 +982,7 @@ static int FlowTest08 (void) {
memcpy(&backup, &flow_config, sizeof(FlowConfig));
uint32_t ini = 0;
uint32_t end = flow_spare_q.len;
uint32_t end = FlowSpareSize();
flow_config.memcap = 10000;
flow_config.prealloc = 100;
@ -1033,7 +1029,7 @@ static int FlowTest09 (void) {
memcpy(&backup, &flow_config, sizeof(FlowConfig));
uint32_t ini = 0;
uint32_t end = flow_spare_q.len;
uint32_t end = FlowSpareSize();
flow_config.memcap = 10000;
flow_config.prealloc = 100;

@ -27,6 +27,7 @@
#include "decode.h"
#include "util-var.h"
#include "util-atomic.h"
#include "util-stack.h"
#include "detect-tag.h"
#define FLOW_QUIET TRUE
@ -298,13 +299,13 @@ typedef struct Flow_
SCMutex de_state_m; /**< mutex lock for the de_state object */
/** hash list pointers, protected by fb->s */
struct FlowBucket_ *fb;
struct Flow_ *hnext; /* hash list */
struct Flow_ *hprev;
struct FlowBucket_ *fb;
/** queue list pointers, protected by queue mutex */
struct Flow_ *lnext; /* list */
struct Flow_ *lprev;
/* stack pointer for the spare stack */
struct Flow_ *stack_next;
struct timeval startts;
#ifdef DEBUG
uint32_t todstpktcnt;

@ -0,0 +1,103 @@
/* Copyright (C) 2007-2012 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*/
/**
* \file
*
* \author Victor Julien <victor@inliniac.net>
*
* Simple low level stack implementation using atomics.
*
* Data types using this stack should have a ptr of their own type called:
* stack_next.
*
* E.g.
* typedef struct MyStruct_ {
* int abc;
* struct MyStruct_ *stack_next;
* } MyStruct;
*
*/
#ifndef __UTIL_STACK_H__
#define __UTIL_STACK_H__
#include "util-atomic.h"
typedef struct Stack_ {
/** stack head ptr */
SC_ATOMIC_DECLARE(void *, head);
} Stack;
/** \brief int the stack
*
* \param stack the stack
*/
#define STACK_INIT(stack) ({ \
SC_ATOMIC_INIT((stack)->head); \
})
/** \brief destroy the stack
*
* \param stack the stack
*/
#define STACK_DESTROY(stack) ({ \
SC_ATOMIC_DESTROY((stack)->head); \
})
/** \brief check if a stack is empty
*
* \param stack the stack
*
* \retval 1 empty
* \retval 0 not empty
*/
#define STACK_EMPTY(stack) \
(SC_ATOMIC_GET((stack)->head) == NULL)
/** \brief pop from the stack
*
* \param stack the stack
* \param type data type
*
* \retval ptr or NULL
*/
#define STACK_POP(stack, type) ({ \
struct type *ptr; \
do { \
ptr = (struct type *)SC_ATOMIC_GET((stack)->head); \
if (ptr == NULL) { \
break; \
} \
} while (!(SC_ATOMIC_CAS(&(stack)->head, ptr, ptr->stack_next))); \
ptr; \
})
/** \brief push to the stack
*
* \param stack the stack
* \param ptr pointer to data to push to the stack
* \param type data type
*/
#define STACK_PUSH(stack, ptr, type) ({ \
do { \
(ptr)->stack_next = (struct type *)SC_ATOMIC_GET((stack)->head); \
} while (!(SC_ATOMIC_CAS(&(stack)->head, (ptr)->stack_next, (ptr)))); \
})
#endif /* __UTIL_STACK_H__ */

@ -960,11 +960,11 @@ int UTHBuildPacketOfFlowsTest01(void) {
int result = 0;
FlowInitConfig(FLOW_QUIET);
uint32_t flow_spare_q_len = flow_spare_q.len;
uint32_t flow_spare_q_len = FlowSpareSize();
UTHBuildPacketOfFlows(0, 100, 0);
if (flow_spare_q.len != flow_spare_q_len - 100)
if (FlowSpareSize() != flow_spare_q_len - 100)
result = 0;
else
result = 1;

Loading…
Cancel
Save