Move packet pool to ringbuffer, update packet pool api and ringbuffer api. Remove memset usage from PACKET_RECYCLE, add proper cleanup macros.

remotes/origin/master-1.0.x
Victor Julien 16 years ago
parent cb2fef8680
commit 6519a86ec7

@ -203,6 +203,27 @@ typedef struct ICMPV4Vars_
uint16_t emb_dport;
} ICMPV4Vars;
#define CLEAR_ICMPV4_PACKET(p) do { \
(p)->icmpv4vars.type = 0; \
(p)->icmpv4vars.code = 0; \
(p)->icmpv4vars.csum = -1; \
(p)->icmpv4vars.id = 0; \
(p)->icmpv4vars.seq = 0; \
(p)->icmpv4vars.mtu = 0; \
(p)->icmpv4vars.error_ptr = 0; \
(p)->icmpv4vars.emb_ipv4h = NULL; \
(p)->icmpv4vars.emb_tcph = NULL; \
(p)->icmpv4vars.emb_udph = NULL; \
(p)->icmpv4vars.emb_icmpv4h = NULL; \
(p)->icmpv4vars.emb_ip4_src.s_addr = 0; \
(p)->icmpv4vars.emb_ip4_dst.s_addr = 0; \
(p)->icmpv4vars.emb_sport = 0; \
(p)->icmpv4vars.emb_ip4_proto = 0; \
(p)->icmpv4vars.emb_sport = 0; \
(p)->icmpv4vars.emb_dport = 0; \
(p)->icmpv4h = NULL; \
} while(0)
#define ICMPV4_HEADER_PKT_OFFSET 8
/** macro for icmpv4 "type" access */

@ -160,6 +160,27 @@ typedef struct ICMPV6Vars_ {
} ICMPV6Vars;
#define CLEAR_ICMPV6_PACKET(p) do { \
(p)->icmpv6vars.type = 0; \
(p)->icmpv6vars.code = 0; \
(p)->icmpv6vars.csum = -1; \
(p)->icmpv6vars.id = 0; \
(p)->icmpv6vars.seq = 0; \
(p)->icmpv6vars.mtu = 0; \
(p)->icmpv6vars.error_ptr = 0; \
(p)->icmpv6vars.emb_ipv6h = NULL; \
(p)->icmpv6vars.emb_tcph = NULL; \
(p)->icmpv6vars.emb_udph = NULL; \
(p)->icmpv6vars.emb_icmpv6h = NULL; \
(p)->icmpv6vars.emb_ip6_src[0] = 0; \
(p)->icmpv6vars.emb_ip6_src[1] = 0; \
(p)->icmpv6vars.emb_ip6_src[2] = 0; \
(p)->icmpv6vars.emb_ip6_src[3] = 0; \
(p)->icmpv6vars.emb_ip6_proto_next = 0; \
(p)->icmpv6vars.emb_sport = 0; \
(p)->icmpv6vars.emb_dport = 0; \
(p)->icmpv6h = NULL; \
} while(0)
void DecodeICMPV6RegisterTests(void);
/** -------- Inline functions --------- */

@ -174,7 +174,6 @@ typedef struct IPV4Hdr_
typedef struct IPV4Cache_
{
uint16_t flags;
uint8_t ver;
uint8_t hl;
uint8_t ip_tos; /* type of service */
@ -194,6 +193,38 @@ typedef struct IPV4Cache_
} IPV4Cache;
#define CLEAR_IPV4_PACKET(p) do { \
(p)->ip4h = NULL; \
(p)->ip4vars.ip_opt_len = 0; \
(p)->ip4vars.ip_opt_cnt = 0; \
(p)->ip4vars.o_rr = NULL; \
(p)->ip4vars.o_qs = NULL; \
(p)->ip4vars.o_ts = NULL; \
(p)->ip4vars.o_sec = NULL; \
(p)->ip4vars.o_lsrr = NULL; \
(p)->ip4vars.o_cipso = NULL; \
(p)->ip4vars.o_sid = NULL; \
(p)->ip4vars.o_ssrr = NULL; \
(p)->ip4vars.o_rtralt = NULL; \
(p)->ip4c.flags = 0; \
(p)->ip4c.ver = 0; \
(p)->ip4c.hl = 0; \
(p)->ip4c.ip_tos = 0; \
(p)->ip4c.ip_len = 0; \
(p)->ip4c.ip_id = 0; \
(p)->ip4c.ip_off = 0; \
(p)->ip4c._ip_off = 0; \
(p)->ip4c.rf = 0; \
(p)->ip4c.df = 0; \
(p)->ip4c.mf = 0; \
(p)->ip4c.ip_ttl = 0; \
(p)->ip4c.ip_proto = 0; \
(p)->ip4c.ip_csum = 0; \
(p)->ip4c.comp_csum = 0; \
(p)->ip4c.ip_src_u32 = 0; \
(p)->ip4c.ip_dst_u32 = 0; \
} while (0)
/* helper structure with parsed ipv4 info */
typedef struct IPV4Vars_
{
@ -213,6 +244,7 @@ typedef struct IPV4Vars_
IPV4Opt *o_rtralt;
} IPV4Vars;
void DecodeIPV4RegisterTests(void);
/** ----- Inline functions ----- */

@ -118,6 +118,19 @@ typedef struct IPV6Vars_
* to loop through the exthdrs all the time */
} IPV6Vars;
#define CLEAR_IPV6_PACKET(p) do { \
(p)->ip6h = NULL; \
(p)->ip6c.flags = 0; \
(p)->ip6c.ver = 0; \
(p)->ip6c.cl = 0; \
(p)->ip6c.flow = 0; \
(p)->ip6c.nh = 0; \
(p)->ip6c.plen = 0; \
(p)->ip6c.hlim = 0; \
(p)->ip6vars.ip_opts_len = 0; \
(p)->ip6vars.l4proto = 0; \
} while (0)
/* Fragment header */
typedef struct IPV6FragHdr_
{

@ -155,6 +155,7 @@ typedef struct TCPCache_ {
(p)->tcpvars.ts = NULL; \
(p)->tcpvars.ws = NULL; \
(p)->tcpvars.mss = NULL; \
(p)->tcpc.comp_csum = -1; \
(p)->tcpc.ts1 = 0; \
(p)->tcpc.ts2 = 0; \
}

@ -54,6 +54,12 @@ typedef struct UDPCache_ {
int32_t comp_csum;
} UDPCache;
#define CLEAR_UDP_PACKET(p) do { \
(p)->udpvars.hlen = 0; \
(p)->udpc.comp_csum = -1; \
(p)->udph = NULL; \
} while (0)
void DecodeUDPV4RegisterTests(void);
/** ------ Inline function ------ */

@ -30,6 +30,7 @@
#include "app-layer-detect-proto.h"
#include "tm-modules.h"
#include "util-error.h"
#include "tmqh-packetpool.h"
void DecodeTunnel(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, uint8_t *pkt, uint16_t len, PacketQueue *pq)
{
@ -49,7 +50,7 @@ void DecodeTunnel(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, uint8_t *pkt
}
/**
* \brief Get a packet. We try to get a packet from the packet_q first, but
* \brief Get a packet. We try to get a packet from the packetpool first, but
* if that is empty we alloc a packet that is free'd again after
* processing.
*
@ -58,10 +59,10 @@ void DecodeTunnel(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, uint8_t *pkt
Packet *PacketGetFromQueueOrAlloc(void) {
Packet *p = NULL;
/* try the queue first */
SCMutexLock(&packet_q.mutex_q);
p = PacketDequeue(&packet_q);
SCMutexUnlock(&packet_q.mutex_q);
/* try the pool first */
if (PacketPoolSize() > 0) {
p = PacketPoolGetPacket();
}
if (p == NULL) {
/* non fatal, we're just not processing a packet then */

@ -210,6 +210,13 @@ typedef struct PacketAlerts_ {
PacketAlert alerts[PACKET_ALERT_MAX];
} PacketAlerts;
#define PACKET_DECODER_EVENT_MAX 16
typedef struct PacketDecoderEvents_ {
uint8_t cnt;
uint8_t events[PACKET_DECODER_EVENT_MAX];
} PacketDecoderEvents;
typedef struct PktVar_ {
char *name;
struct PktVar_ *next; /* right now just implement this as a list,
@ -258,7 +265,7 @@ typedef struct Packet_
* has the exact same tuple as the lower levels */
uint8_t recursion_level;
/*Pkt Flags*/
/* Pkt Flags */
uint8_t flags;
/* flow */
uint8_t flowflags;
@ -330,9 +337,6 @@ typedef struct Packet_
uint8_t pkt[IPV6_HEADER_LEN + 65536 + 28];
uint32_t pktlen;
/* decoder events: review how many events we have */
uint8_t events[(DECODE_EVENT_MAX / 8) + 1];
PacketAlerts alerts;
/** packet number in the pcap file, matches wireshark */
@ -353,6 +357,9 @@ typedef struct Packet_
char tunnel_pkt;
char tunnel_verdicted;
/* decoder events */
PacketDecoderEvents events;
/* tunnel/encapsulation handling */
struct Packet_ *root; /* in case of tunnel this is a ptr
* to the 'real' packet, the one we
@ -464,12 +471,57 @@ typedef struct DecodeThreadVars_
* \todo the mutex destroy & init is necessary because of the memset, reconsider
*/
#define PACKET_RECYCLE(p) do { \
(p)->recursion_level = 0; \
(p)->flags = 0; \
(p)->flowflags = 0; \
(p)->flow = NULL; \
(p)->ts.tv_sec = 0; \
(p)->ts.tv_usec = 0; \
(p)->datalink = 0; \
(p)->action = 0; \
if ((p)->pktvar != NULL) { \
PktVarFree((p)->pktvar); \
(p)->pktvar = NULL; \
} \
(p)->ethh = NULL; \
if ((p)->ip4h != NULL) { \
CLEAR_IPV4_PACKET((p)); \
} \
if ((p)->ip6h != NULL) { \
CLEAR_IPV6_PACKET((p)); \
} \
if ((p)->tcph != NULL) { \
CLEAR_TCP_PACKET((p)); \
} \
if ((p)->udph != NULL) { \
CLEAR_UDP_PACKET((p)); \
} \
if ((p)->icmpv4h != NULL) { \
CLEAR_ICMPV4_PACKET((p)); \
} \
if ((p)->icmpv6h != NULL) { \
CLEAR_ICMPV6_PACKET((p)); \
} \
(p)->ppph = NULL; \
(p)->pppoesh = NULL; \
(p)->pppoedh = NULL; \
(p)->greh = NULL; \
(p)->vlanh = NULL; \
(p)->payload = NULL; \
(p)->payload_len = 0; \
(p)->pktlen = 0; \
(p)->alerts.cnt = 0; \
(p)->next = NULL; \
(p)->prev = NULL; \
(p)->rtv_cnt = 0; \
(p)->tpr_cnt = 0; \
SCMutexDestroy(&(p)->mutex_rtv_cnt); \
memset((p), 0x00, sizeof(Packet)); \
SCMutexInit(&(p)->mutex_rtv_cnt, NULL); \
(p)->tunnel_proto = 0; \
(p)->tunnel_pkt = 0; \
(p)->tunnel_verdicted = 0; \
(p)->events.cnt = 0; \
(p)->root = NULL; \
PACKET_RESET_CHECKSUMS((p)); \
} while (0)
@ -565,9 +617,24 @@ void AddressDebugPrint(Address *);
} while (0)
#define DECODER_SET_EVENT(p, e) ((p)->events[(e/8)] |= (1<<(e%8)))
#define DECODER_ISSET_EVENT(p, e) ((p)->events[(e/8)] & (1<<(e%8)))
#define DECODER_SET_EVENT(p, e) do { \
if ((p)->events.cnt < PACKET_DECODER_EVENT_MAX) { \
(p)->events.events[(p)->events.cnt] = e; \
(p)->events.cnt++; \
} \
} while(0)
#define DECODER_ISSET_EVENT(p, e) ({ \
int r = 0; \
uint8_t u; \
for (u = 0; u < (p)->events.cnt; u++) { \
if ((p)->events.events[u] == (e)) { \
r = 1; \
break; \
} \
} \
r; \
})
/* older libcs don't contain a def for IPPROTO_DCCP
* inside of <netinet/in.h>

@ -2078,7 +2078,7 @@ int RunModeIdsPcapAuto(DetectEngineCtx *de_ctx, char *iface) {
TimeModeSetLive();
/* create the threads */
ThreadVars *tv_receivepcap = TmThreadCreatePacketHandler("ReceivePcap","packetpool","packetpool","pickup-queue","ringbuffer","1slot");
ThreadVars *tv_receivepcap = TmThreadCreatePacketHandler("ReceivePcap","packetpool","packetpool","pickup-queue","ringbuffer_srsw","1slot");
if (tv_receivepcap == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -2101,7 +2101,7 @@ int RunModeIdsPcapAuto(DetectEngineCtx *de_ctx, char *iface) {
exit(EXIT_FAILURE);
}
ThreadVars *tv_decode1 = TmThreadCreatePacketHandler("Decode1","pickup-queue","ringbuffer","decode-queue1","ringbuffer","1slot");
ThreadVars *tv_decode1 = TmThreadCreatePacketHandler("Decode1","pickup-queue","ringbuffer_srsw","decode-queue1","ringbuffer_srsw","1slot");
if (tv_decode1 == NULL) {
printf("ERROR: TmThreadsCreate failed for Decode1\n");
exit(EXIT_FAILURE);
@ -2124,7 +2124,7 @@ int RunModeIdsPcapAuto(DetectEngineCtx *de_ctx, char *iface) {
exit(EXIT_FAILURE);
}
ThreadVars *tv_stream1 = TmThreadCreatePacketHandler("Stream1","decode-queue1","ringbuffer","stream-queue1","ringbuffer","1slot");
ThreadVars *tv_stream1 = TmThreadCreatePacketHandler("Stream1","decode-queue1","ringbuffer_srsw","stream-queue1","ringbuffer_mrsw","1slot");
if (tv_stream1 == NULL) {
printf("ERROR: TmThreadsCreate failed for Stream1\n");
exit(EXIT_FAILURE);
@ -2166,7 +2166,7 @@ int RunModeIdsPcapAuto(DetectEngineCtx *de_ctx, char *iface) {
char *thread_name = SCStrdup(tname);
SCLogDebug("Assigning %s affinity to cpu %u", thread_name, cpu);
ThreadVars *tv_detect_ncpu = TmThreadCreatePacketHandler(thread_name,"stream-queue1","ringbuffer","verdict-queue","ringbuffer","1slot");
ThreadVars *tv_detect_ncpu = TmThreadCreatePacketHandler(thread_name,"stream-queue1","ringbuffer_mrsw","verdict-queue","ringbuffer_srmw","1slot");
if (tv_detect_ncpu == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -2209,7 +2209,7 @@ int RunModeIdsPcapAuto(DetectEngineCtx *de_ctx, char *iface) {
cpu++;
}
ThreadVars *tv_rreject = TmThreadCreatePacketHandler("RespondReject","verdict-queue","ringbuffer","alert-queue","ringbuffer","1slot");
ThreadVars *tv_rreject = TmThreadCreatePacketHandler("RespondReject","verdict-queue","ringbuffer_srmw","alert-queue","ringbuffer_srsw","1slot");
if (tv_rreject == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -2233,7 +2233,7 @@ int RunModeIdsPcapAuto(DetectEngineCtx *de_ctx, char *iface) {
}
ThreadVars *tv_outputs = TmThreadCreatePacketHandler("Outputs",
"alert-queue", "ringbuffer", "packetpool", "packetpool", "varslot");
"alert-queue", "ringbuffer_srsw", "packetpool", "packetpool", "varslot");
SetupOutputs(tv_outputs);
if (threading_set_cpu_affinity) {
@ -2282,6 +2282,7 @@ int RunModeFilePcapAuto(DetectEngineCtx *de_ctx, char *file) {
TimeModeSetOffline();
/* create the threads */
//ThreadVars *tv_receivepcap = TmThreadCreatePacketHandler("ReceivePcapFile","packetpool","packetpool","packetpool","packetpool","1slot");
ThreadVars *tv_receivepcap = TmThreadCreatePacketHandler("ReceivePcapFile","packetpool","packetpool","pickup-queue","ringbuffer_srsw","1slot");
if (tv_receivepcap == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
@ -2304,7 +2305,8 @@ int RunModeFilePcapAuto(DetectEngineCtx *de_ctx, char *file) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
//#if 0
//ThreadVars *tv_decode1 = TmThreadCreatePacketHandler("Decode & Stream","pickup-queue","ringbuffer_srsw","packetpool","packetpool","varslot");
ThreadVars *tv_decode1 = TmThreadCreatePacketHandler("Decode & Stream","pickup-queue","ringbuffer_srsw","stream-queue1","ringbuffer_mrsw","varslot");
if (tv_decode1 == NULL) {
printf("ERROR: TmThreadsCreate failed for Decode1\n");
@ -2335,6 +2337,7 @@ int RunModeFilePcapAuto(DetectEngineCtx *de_ctx, char *file) {
exit(EXIT_FAILURE);
}
//#if 0
/* start with cpu 1 so that if we're creating an odd number of detect
* threads we're not creating the most on CPU0. */
if (ncpus > 0)
@ -2411,7 +2414,7 @@ int RunModeFilePcapAuto(DetectEngineCtx *de_ctx, char *file) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
//#endif
return 0;
}

@ -31,6 +31,7 @@
#include "tm-modules.h"
#include "util-privs.h"
#include "tmqh-packetpool.h"
#ifndef HAVE_DAG
@ -347,13 +348,10 @@ ReceiveErfDag(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq,
* prevent us from alloc'ing packets at line rate
*/
while (packet_q_len == 0) {
SCMutexLock(&packet_q.mutex_q);
packet_q_len = packet_q.len;
if (packet_q.len == 0) {
SCCondWait(&packet_q.cond_q, &packet_q.mutex_q);
packet_q_len = PacketPoolSize();
if (packet_q_len == 0) {
PacketPoolWait();
}
packet_q_len = packet_q.len;
SCMutexUnlock(&packet_q.mutex_q);
}
if (postpq == NULL) {

@ -43,6 +43,7 @@
#include "util-error.h"
#include "util-byte.h"
#include "util-privs.h"
#include "tmqh-packetpool.h"
#ifndef NFQ
/** Handle the case where no NFQ support is compiled in.
@ -504,11 +505,9 @@ TmEcode ReceiveNFQ(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, Packe
/* make sure we have at least one packet in the packet pool, to prevent
* us from 1) alloc'ing packets at line rate, 2) have a race condition
* for the nfq mutex lock with the verdict thread. */
SCMutexLock(&packet_q.mutex_q);
if (packet_q.len == 0) {
SCCondWait(&packet_q.cond_q, &packet_q.mutex_q);
while (PacketPoolSize() == 0) {
PacketPoolWait();
}
SCMutexUnlock(&packet_q.mutex_q);
/* do our nfq magic */
NFQRecvPkt(ntv);

@ -43,6 +43,7 @@
#include "conf.h"
#include "util-error.h"
#include "util-privs.h"
#include "tmqh-packetpool.h"
extern uint8_t suricata_ctl_flags;
extern int max_pending_packets;
@ -162,13 +163,10 @@ TmEcode ReceivePcapFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq,
/* make sure we have at least one packet in the packet pool, to prevent
* us from alloc'ing packets at line rate */
while (packet_q_len == 0) {
SCMutexLock(&packet_q.mutex_q);
packet_q_len = packet_q.len;
if (packet_q.len == 0) {
SCCondWait(&packet_q.cond_q, &packet_q.mutex_q);
packet_q_len = PacketPoolSize();
if (packet_q_len == 0) {
PacketPoolWait();
}
packet_q_len = packet_q.len;
SCMutexUnlock(&packet_q.mutex_q);
}
if (postpq == NULL)

@ -43,6 +43,7 @@
#include "util-debug.h"
#include "util-error.h"
#include "util-privs.h"
#include "tmqh-packetpool.h"
extern uint8_t suricata_ctl_flags;
extern int max_pending_packets;
@ -179,13 +180,10 @@ TmEcode ReceivePcap(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, Pack
/* make sure we have at least one packet in the packet pool, to prevent
* us from alloc'ing packets at line rate */
while (packet_q_len == 0) {
SCMutexLock(&packet_q.mutex_q);
packet_q_len = packet_q.len;
if (packet_q.len == 0) {
SCCondWait(&packet_q.cond_q, &packet_q.mutex_q);
packet_q_len = PacketPoolSize();
if (packet_q_len == 0) {
PacketPoolWait();
}
packet_q_len = packet_q.len;
SCMutexUnlock(&packet_q.mutex_q);
}
if (postpq == NULL)

@ -134,6 +134,8 @@
#include "output.h"
#include "util-privs.h"
#include "tmqh-packetpool.h"
/*
* we put this here, because we only use it here in main.
*/
@ -224,11 +226,6 @@ void GlobalInits()
SCLogInfo("Trans_Q Mutex not initialized correctly");
exit(EXIT_FAILURE);
}
/* initialize packet queues Here! */
memset(&packet_q,0,sizeof(packet_q));
SCMutexInit(&packet_q.mutex_q, NULL);
SCCondInit(&packet_q.cond_q, NULL);
}
/* XXX hack: make sure threads can stop the engine by calling this
@ -965,7 +962,7 @@ int main(int argc, char **argv)
}
PACKET_INITIALIZE(p);
PacketEnqueue(&packet_q,p);
PacketPoolStorePacket(p);
}
SCLogInfo("preallocated %"PRIiMAX" packets. Total memory %"PRIuMAX"",
max_pending_packets, (uintmax_t)(max_pending_packets*sizeof(Packet)));
@ -1099,10 +1096,10 @@ int main(int argc, char **argv)
if (suricata_ctl_flags & SURICATA_KILL)
break;
SCMutexLock(&packet_q.mutex_q);
if (packet_q.len == max_pending_packets)
/* if all packets are returned to the packetpool
* we are done */
if (PacketPoolSize() == max_pending_packets)
done = 1;
SCMutexUnlock(&packet_q.mutex_q);
if (done == 0) {
usleep(100);

@ -31,19 +31,6 @@
#define PROG_NAME "Suricata"
#define PROG_VER "0.9.2"
/* number of packets in processing right now
* This is the diff between recv'd and verdicted
* pkts
* XXX this should be turned into an api located
* in the packetpool code
*/
//intmax_t pending;
#ifdef DBG_PERF
//uint32_t dbg_maxpending;
#endif /* DBG_PERF */
//SCMutex mutex_pending;
//SCCondT cond_pending;
/* runtime engine control flags */
#define SURICATA_STOP 0x01 /**< gracefully stop the engine: process all
outstanding packets first */
@ -63,10 +50,6 @@ enum {
MODE_DAG,
};
/* preallocated packet structures here
* XXX move to the packetpool queue handler code
*/
PacketQueue packet_q;
/* queue's between various other threads
* XXX move to the TmQueue structure later
*/

@ -20,9 +20,12 @@
*
* \author Victor Julien <victor@inliniac.net>
*
* Packetpool queue handlers. Packet pool is a simple locking FIFO queue.
*
* \todo see if we can replace this by a lockless queue
* Packetpool queue handlers. Packet pool is implemented as a ringbuffer.
* We're using a multi reader / multi writer version of the ringbuffer,
* that is relatively expensive due to the CAS function. But it is necessary
* because every thread can return packets to the pool and multiple parts
* of the code retrieve packets (Decode, Defrag) and these can run in their
* own threads as well.
*/
#include "suricata.h"
@ -40,24 +43,58 @@
#include "tmqh-packetpool.h"
#include "util-ringbuffer.h"
static RingBuffer16 *ringbuffer = NULL;
void TmqhPacketpoolRegister (void) {
tmqh_table[TMQH_PACKETPOOL].name = "packetpool";
tmqh_table[TMQH_PACKETPOOL].InHandler = TmqhInputPacketpool;
tmqh_table[TMQH_PACKETPOOL].OutHandler = TmqhOutputPacketpool;
ringbuffer = RingBufferInit();
}
int PacketPoolIsEmpty(void) {
return RingBufferIsEmpty(ringbuffer);
}
uint16_t PacketPoolSize(void) {
return RingBufferSize(ringbuffer);
}
void PacketPoolWait(void) {
RingBufferWait(ringbuffer);
}
/** \brief a initialized packet
*
* \warning Use *only* at init, not at packet runtime
*/
void PacketPoolStorePacket(Packet *p) {
if (RingBufferIsFull(ringbuffer)) {
exit(1);
}
RingBufferMrMwPut(ringbuffer, (void *)p);
SCLogDebug("buffersize %u", RingBufferSize(ringbuffer));
}
Packet *PacketPoolGetPacket(void) {
if (RingBufferIsEmpty(ringbuffer))
return NULL;
Packet *p = RingBufferMrMwGet(ringbuffer);
return p;
}
Packet *TmqhInputPacketpool(ThreadVars *t)
{
Packet *p = NULL;
SCMutexLock(&packet_q.mutex_q);
while (p == NULL) {
p = PacketDequeue(&packet_q);
if (p == NULL) {
SCCondWait(&packet_q.cond_q, &packet_q.mutex_q);
}
while(p == NULL) {
p = RingBufferMrMwGet(ringbuffer);
}
SCMutexUnlock(&packet_q.mutex_q);
/* packet is clean */
@ -70,7 +107,6 @@ void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
SCLogDebug("Packet %p, p->root %p, alloced %s", p, p->root, p->flags & PKT_ALLOC ? "true" : "false");
PacketQueue *q = &packet_q;
char proot = 0;
if (IS_TUNNEL_PKT(p)) {
@ -144,11 +180,7 @@ void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
p->root = NULL;
} else {
PACKET_RECYCLE(p->root);
SCMutexLock(&q->mutex_q);
PacketEnqueue(q, p->root);
SCCondSignal(&q->cond_q);
SCMutexUnlock(&q->mutex_q);
RingBufferMrMwPut(ringbuffer, (void *)p->root);
}
}
@ -158,11 +190,7 @@ void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
SCFree(p);
} else {
PACKET_RECYCLE(p);
SCMutexLock(&q->mutex_q);
PacketEnqueue(q, p);
SCCondSignal(&q->cond_q);
SCMutexUnlock(&q->mutex_q);
RingBufferMrMwPut(ringbuffer, (void *)p);
}
SCReturn;

@ -28,5 +28,9 @@ Packet *TmqhInputPacketpool(ThreadVars *);
void TmqhOutputPacketpool(ThreadVars *, Packet *);
void TmqhReleasePacketsToPacketPool(PacketQueue *);
void TmqhPacketpoolRegister (void);
Packet *PacketPoolGetPacket(void);
uint16_t PacketPoolSize(void);
void PacketPoolStorePacket(Packet *);
void PacketPoolWait(void);
#endif /* __TMQH_PACKETPOOL_H__ */

@ -29,6 +29,7 @@
*
* Implemented are:
* Single reader, single writer (lockless)
* Single reader, multi writer (partly locked)
* Multi reader, single writer (lockless)
* Multi reader, multi writer (partly locked)
*/
@ -39,24 +40,33 @@
#define USLEEP_TIME 5
__thread uint32_t sleepytime = 5;
static inline void Ringbuffer8Wait(RingBuffer8 *rb) {
/** \brief wait function for condition where ringbuffer is either
* full or empty.
*
* \param rb ringbuffer
*
* Based on RINGBUFFER_MUTEX_WAIT define, we either sleep and spin
* or use thread condition to wait.
*/
static inline void RingBuffer8DoWait(RingBuffer8 *rb) {
#ifdef RINGBUFFER_MUTEX_WAIT
if (sleepytime <= 25) {
usleep(5);
sleepytime += 5;
} else {
SCMutexLock(&rb->wait_mutex);
SCCondWait(&rb->wait_cond, &rb->wait_mutex);
SCMutexUnlock(&rb->wait_mutex);
}
SCMutexLock(&rb->wait_mutex);
SCCondWait(&rb->wait_cond, &rb->wait_mutex);
SCMutexUnlock(&rb->wait_mutex);
#else
usleep(USLEEP_TIME);
#endif
}
static inline void RingbufferWait(RingBuffer16 *rb) {
/** \brief wait function for condition where ringbuffer is either
* full or empty.
*
* \param rb ringbuffer
*
* Based on RINGBUFFER_MUTEX_WAIT define, we either sleep and spin
* or use thread condition to wait.
*/
static inline void RingBufferDoWait(RingBuffer16 *rb) {
#ifdef RINGBUFFER_MUTEX_WAIT
SCMutexLock(&rb->wait_mutex);
SCCondWait(&rb->wait_cond, &rb->wait_mutex);
@ -66,6 +76,22 @@ static inline void RingbufferWait(RingBuffer16 *rb) {
#endif
}
/** \brief wait function for condition where ringbuffer is either
* full or empty.
*
* \param rb ringbuffer
*
* Based on RINGBUFFER_MUTEX_WAIT define, we either sleep and spin
* or use thread condition to wait.
*/
void RingBufferWait(RingBuffer16 *rb) {
RingBufferDoWait(rb);
}
/** \brief tell the ringbuffer to shut down
*
* \param rb ringbuffer
*/
void RingBuffer8Shutdown(RingBuffer8 *rb) {
rb->shutdown = 1;
#ifdef RINGBUFFER_MUTEX_WAIT
@ -73,6 +99,10 @@ void RingBuffer8Shutdown(RingBuffer8 *rb) {
#endif
}
/** \brief tell the ringbuffer to shut down
*
* \param rb ringbuffer
*/
void RingBufferShutdown(RingBuffer16 *rb) {
rb->shutdown = 1;
#ifdef RINGBUFFER_MUTEX_WAIT
@ -80,18 +110,55 @@ void RingBufferShutdown(RingBuffer16 *rb) {
#endif
}
/** \brief get number of items in the ringbuffer */
uint16_t RingBufferSize(RingBuffer16 *rb) {
SCEnter();
uint16_t size = (uint16_t)(SC_ATOMIC_GET(rb->write) - SC_ATOMIC_GET(rb->read));
SCReturnUInt(size);
}
/** \brief check the ringbuffer is empty (no data in it)
*
* \param rb ringbuffer
*
* \retval 1 empty
* \retval 0 not empty
*/
int RingBufferIsEmpty(RingBuffer16 *rb) {
if (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read)) {
return 1;
}
return 0;
}
/** \brief check the ringbuffer is full (no more data will fit)
*
* \param rb ringbuffer
*
* \retval 1 empty
* \retval 0 not empty
*/
int RingBufferIsFull(RingBuffer16 *rb) {
if ((unsigned short)(SC_ATOMIC_GET(rb->write) + 1) == SC_ATOMIC_GET(rb->read)) {
return 1;
}
return 0;
}
/* Single Reader, Single Writer, 8 bits */
void *RingBufferSrSw8Get(RingBuffer8 *rb) {
void *ptr = NULL;
/* buffer is empty, wait... */
while (SC_ATOMIC_GET(rb->read) == SC_ATOMIC_GET(rb->write)) {
while (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read)) {
/* break out if the engine wants to shutdown */
if (rb->shutdown != 0)
return NULL;
Ringbuffer8Wait(rb);
RingBuffer8DoWait(rb);
}
ptr = rb->array[SC_ATOMIC_GET(rb->read)];
@ -100,7 +167,6 @@ void *RingBufferSrSw8Get(RingBuffer8 *rb) {
#ifdef RINGBUFFER_MUTEX_WAIT
SCCondSignal(&rb->wait_cond);
#endif
sleepytime = 5;
return ptr;
}
@ -111,7 +177,7 @@ int RingBufferSrSw8Put(RingBuffer8 *rb, void *ptr) {
if (rb->shutdown != 0)
return -1;
Ringbuffer8Wait(rb);
RingBuffer8DoWait(rb);
}
rb->array[SC_ATOMIC_GET(rb->write)] = ptr;
@ -120,7 +186,6 @@ int RingBufferSrSw8Put(RingBuffer8 *rb, void *ptr) {
#ifdef RINGBUFFER_MUTEX_WAIT
SCCondSignal(&rb->wait_cond);
#endif
sleepytime = 5;
return 0;
}
@ -130,12 +195,12 @@ void *RingBufferSrMw8Get(RingBuffer8 *rb) {
void *ptr = NULL;
/* buffer is empty, wait... */
while (SC_ATOMIC_GET(rb->read) == SC_ATOMIC_GET(rb->write)) {
while (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read)) {
/* break out if the engine wants to shutdown */
if (rb->shutdown != 0)
return NULL;
Ringbuffer8Wait(rb);
RingBuffer8DoWait(rb);
}
ptr = rb->array[SC_ATOMIC_GET(rb->read)];
@ -144,7 +209,6 @@ void *RingBufferSrMw8Get(RingBuffer8 *rb) {
#ifdef RINGBUFFER_MUTEX_WAIT
SCCondSignal(&rb->wait_cond);
#endif
sleepytime = 5;
return ptr;
}
@ -176,7 +240,7 @@ retry:
if (rb->shutdown != 0)
return -1;
Ringbuffer8Wait(rb);
RingBuffer8DoWait(rb);
}
/* get our lock */
@ -198,7 +262,6 @@ retry:
#ifdef RINGBUFFER_MUTEX_WAIT
SCCondSignal(&rb->wait_cond);
#endif
sleepytime = 5;
return 0;
}
@ -221,12 +284,12 @@ void *RingBufferMrSw8Get(RingBuffer8 *rb) {
/* buffer is empty, wait... */
retry:
while (SC_ATOMIC_GET(rb->read) == SC_ATOMIC_GET(rb->write)) {
while (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read)) {
/* break out if the engine wants to shutdown */
if (rb->shutdown != 0)
return NULL;
Ringbuffer8Wait(rb);
RingBuffer8DoWait(rb);
}
/* atomically update rb->read */
@ -234,7 +297,7 @@ retry:
do {
/* with multiple readers we can get in the situation that we exitted
* from the wait loop but the rb is empty again once we get here. */
if (SC_ATOMIC_GET(rb->read) == SC_ATOMIC_GET(rb->write))
if (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read))
goto retry;
readp++;
@ -246,7 +309,6 @@ retry:
#ifdef RINGBUFFER_MUTEX_WAIT
SCCondSignal(&rb->wait_cond);
#endif
sleepytime = 5;
return ptr;
}
@ -262,7 +324,7 @@ int RingBufferMrSw8Put(RingBuffer8 *rb, void *ptr) {
if (rb->shutdown != 0)
return -1;
Ringbuffer8Wait(rb);
RingBuffer8DoWait(rb);
}
rb->array[SC_ATOMIC_GET(rb->write)] = ptr;
@ -271,7 +333,6 @@ int RingBufferMrSw8Put(RingBuffer8 *rb, void *ptr) {
#ifdef RINGBUFFER_MUTEX_WAIT
SCCondSignal(&rb->wait_cond);
#endif
sleepytime = 5;
return 0;
}
@ -295,12 +356,12 @@ void *RingBufferMrSwGet(RingBuffer16 *rb) {
/* buffer is empty, wait... */
retry:
while (SC_ATOMIC_GET(rb->read) == SC_ATOMIC_GET(rb->write)) {
while (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read)) {
/* break out if the engine wants to shutdown */
if (rb->shutdown != 0)
return NULL;
RingbufferWait(rb);
RingBufferDoWait(rb);
}
/* atomically update rb->read */
@ -308,7 +369,7 @@ retry:
do {
/* with multiple readers we can get in the situation that we exitted
* from the wait loop but the rb is empty again once we get here. */
if (SC_ATOMIC_GET(rb->read) == SC_ATOMIC_GET(rb->write))
if (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read))
goto retry;
readp++;
@ -320,7 +381,6 @@ retry:
#ifdef RINGBUFFER_MUTEX_WAIT
SCCondSignal(&rb->wait_cond);
#endif
sleepytime = 5;
return ptr;
}
@ -336,7 +396,7 @@ int RingBufferMrSwPut(RingBuffer16 *rb, void *ptr) {
if (rb->shutdown != 0)
return -1;
RingbufferWait(rb);
RingBufferDoWait(rb);
}
rb->array[SC_ATOMIC_GET(rb->write)] = ptr;
@ -345,7 +405,6 @@ int RingBufferMrSwPut(RingBuffer16 *rb, void *ptr) {
#ifdef RINGBUFFER_MUTEX_WAIT
SCCondSignal(&rb->wait_cond);
#endif
sleepytime = 5;
return 0;
}
@ -356,12 +415,12 @@ void *RingBufferSrSwGet(RingBuffer16 *rb) {
void *ptr = NULL;
/* buffer is empty, wait... */
while (SC_ATOMIC_GET(rb->read) == SC_ATOMIC_GET(rb->write)) {
while (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read)) {
/* break out if the engine wants to shutdown */
if (rb->shutdown != 0)
return NULL;
RingbufferWait(rb);
RingBufferDoWait(rb);
}
ptr = rb->array[SC_ATOMIC_GET(rb->read)];
@ -370,7 +429,6 @@ void *RingBufferSrSwGet(RingBuffer16 *rb) {
#ifdef RINGBUFFER_MUTEX_WAIT
SCCondSignal(&rb->wait_cond);
#endif
sleepytime = 5;
return ptr;
}
@ -381,7 +439,7 @@ int RingBufferSrSwPut(RingBuffer16 *rb, void *ptr) {
if (rb->shutdown != 0)
return -1;
RingbufferWait(rb);
RingBufferDoWait(rb);
}
rb->array[SC_ATOMIC_GET(rb->write)] = ptr;
@ -390,7 +448,6 @@ int RingBufferSrSwPut(RingBuffer16 *rb, void *ptr) {
#ifdef RINGBUFFER_MUTEX_WAIT
SCCondSignal(&rb->wait_cond);
#endif
sleepytime = 5;
return 0;
}
@ -447,12 +504,12 @@ void *RingBufferMrMw8Get(RingBuffer8 *rb) {
/* buffer is empty, wait... */
retry:
while (SC_ATOMIC_GET(rb->read) == SC_ATOMIC_GET(rb->write)) {
while (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read)) {
/* break out if the engine wants to shutdown */
if (rb->shutdown != 0)
return NULL;
Ringbuffer8Wait(rb);
RingBuffer8DoWait(rb);
}
/* atomically update rb->read */
@ -460,7 +517,7 @@ retry:
do {
/* with multiple readers we can get in the situation that we exitted
* from the wait loop but the rb is empty again once we get here. */
if (SC_ATOMIC_GET(rb->read) == SC_ATOMIC_GET(rb->write))
if (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read))
goto retry;
readp++;
@ -471,7 +528,6 @@ retry:
#ifdef RINGBUFFER_MUTEX_WAIT
SCCondSignal(&rb->wait_cond);
#endif
sleepytime = 5;
return ptr;
}
@ -503,7 +559,7 @@ retry:
if (rb->shutdown != 0)
return -1;
Ringbuffer8Wait(rb);
RingBuffer8DoWait(rb);
}
/* get our lock */
@ -525,7 +581,6 @@ retry:
#ifdef RINGBUFFER_MUTEX_WAIT
SCCondSignal(&rb->wait_cond);
#endif
sleepytime = 5;
return 0;
}
@ -583,12 +638,12 @@ void *RingBufferMrMwGet(RingBuffer16 *rb) {
/* buffer is empty, wait... */
retry:
while (SC_ATOMIC_GET(rb->read) == SC_ATOMIC_GET(rb->write)) {
while (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read)) {
/* break out if the engine wants to shutdown */
if (rb->shutdown != 0)
return NULL;
RingbufferWait(rb);
RingBufferDoWait(rb);
}
/* atomically update rb->read */
@ -596,7 +651,7 @@ retry:
do {
/* with multiple readers we can get in the situation that we exitted
* from the wait loop but the rb is empty again once we get here. */
if (SC_ATOMIC_GET(rb->read) == SC_ATOMIC_GET(rb->write))
if (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read))
goto retry;
readp++;
@ -608,7 +663,6 @@ retry:
#ifdef RINGBUFFER_MUTEX_WAIT
SCCondSignal(&rb->wait_cond);
#endif
sleepytime = 5;
return ptr;
}
@ -640,7 +694,7 @@ retry:
if (rb->shutdown != 0)
return -1;
RingbufferWait(rb);
RingBufferDoWait(rb);
}
/* get our lock */
@ -662,7 +716,6 @@ retry:
#ifdef RINGBUFFER_MUTEX_WAIT
SCCondSignal(&rb->wait_cond);
#endif
sleepytime = 5;
return 0;
}

@ -66,10 +66,18 @@ typedef struct RingBuffer16_ {
RingBuffer8 *RingBuffer8Init(void);
void RingBuffer8Destroy(RingBuffer8 *);
RingBuffer16 *RingBufferInit(void);
void RingBuffer16Destroy(RingBuffer16 *);
int RingBufferIsEmpty(RingBuffer16 *);
int RingBufferIsFull(RingBuffer16 *);
uint16_t RingBufferSize(RingBuffer16 *);
void RingBuffer8Shutdown(RingBuffer8 *);
void RingBufferShutdown(RingBuffer16 *);
void RingBufferWait(RingBuffer16 *rb);
/** Single Reader, Single Writer ring buffer, fixed at
* 256 items so we can use unsigned char's that just
* wrap around */
@ -106,5 +114,8 @@ int RingBufferMrMw8Put(RingBuffer8 *, void *);
void *RingBufferMrMwGet(RingBuffer16 *);
int RingBufferMrMwPut(RingBuffer16 *, void *);
void *RingBufferSrMw8Get(RingBuffer8 *);
int RingBufferSrMw8Put(RingBuffer8 *, void *);
#endif /* __UTIL_RINGBUFFER_H__ */

@ -104,7 +104,7 @@ threading:
# thread being created. Regardless of the setting at a minimum 1 detect
# thread will always be created.
#
detect_thread_ratio: 1.0
detect_thread_ratio: 1.5
# Select the multi pattern algorithm you want to run for scan/search the
# in the engine. The supported algorithms are b2g, b3g and wumanber.

Loading…
Cancel
Save