diff --git a/src/decode-icmpv4.h b/src/decode-icmpv4.h index 2bab207dcf..79eafd9f19 100644 --- a/src/decode-icmpv4.h +++ b/src/decode-icmpv4.h @@ -203,6 +203,27 @@ typedef struct ICMPV4Vars_ uint16_t emb_dport; } ICMPV4Vars; +#define CLEAR_ICMPV4_PACKET(p) do { \ + (p)->icmpv4vars.type = 0; \ + (p)->icmpv4vars.code = 0; \ + (p)->icmpv4vars.csum = -1; \ + (p)->icmpv4vars.id = 0; \ + (p)->icmpv4vars.seq = 0; \ + (p)->icmpv4vars.mtu = 0; \ + (p)->icmpv4vars.error_ptr = 0; \ + (p)->icmpv4vars.emb_ipv4h = NULL; \ + (p)->icmpv4vars.emb_tcph = NULL; \ + (p)->icmpv4vars.emb_udph = NULL; \ + (p)->icmpv4vars.emb_icmpv4h = NULL; \ + (p)->icmpv4vars.emb_ip4_src.s_addr = 0; \ + (p)->icmpv4vars.emb_ip4_dst.s_addr = 0; \ + (p)->icmpv4vars.emb_sport = 0; \ + (p)->icmpv4vars.emb_ip4_proto = 0; \ + (p)->icmpv4vars.emb_sport = 0; \ + (p)->icmpv4vars.emb_dport = 0; \ + (p)->icmpv4h = NULL; \ +} while(0) + #define ICMPV4_HEADER_PKT_OFFSET 8 /** macro for icmpv4 "type" access */ diff --git a/src/decode-icmpv6.h b/src/decode-icmpv6.h index b6a0c1b386..86d85ca966 100644 --- a/src/decode-icmpv6.h +++ b/src/decode-icmpv6.h @@ -160,6 +160,27 @@ typedef struct ICMPV6Vars_ { } ICMPV6Vars; +#define CLEAR_ICMPV6_PACKET(p) do { \ + (p)->icmpv6vars.type = 0; \ + (p)->icmpv6vars.code = 0; \ + (p)->icmpv6vars.csum = -1; \ + (p)->icmpv6vars.id = 0; \ + (p)->icmpv6vars.seq = 0; \ + (p)->icmpv6vars.mtu = 0; \ + (p)->icmpv6vars.error_ptr = 0; \ + (p)->icmpv6vars.emb_ipv6h = NULL; \ + (p)->icmpv6vars.emb_tcph = NULL; \ + (p)->icmpv6vars.emb_udph = NULL; \ + (p)->icmpv6vars.emb_icmpv6h = NULL; \ + (p)->icmpv6vars.emb_ip6_src[0] = 0; \ + (p)->icmpv6vars.emb_ip6_src[1] = 0; \ + (p)->icmpv6vars.emb_ip6_src[2] = 0; \ + (p)->icmpv6vars.emb_ip6_src[3] = 0; \ + (p)->icmpv6vars.emb_ip6_proto_next = 0; \ + (p)->icmpv6vars.emb_sport = 0; \ + (p)->icmpv6vars.emb_dport = 0; \ + (p)->icmpv6h = NULL; \ +} while(0) void DecodeICMPV6RegisterTests(void); /** -------- Inline functions --------- */ diff --git a/src/decode-ipv4.h b/src/decode-ipv4.h index 97c1e15e00..869b518ded 100644 --- a/src/decode-ipv4.h +++ b/src/decode-ipv4.h @@ -174,7 +174,6 @@ typedef struct IPV4Hdr_ typedef struct IPV4Cache_ { uint16_t flags; - uint8_t ver; uint8_t hl; uint8_t ip_tos; /* type of service */ @@ -194,6 +193,38 @@ typedef struct IPV4Cache_ } IPV4Cache; +#define CLEAR_IPV4_PACKET(p) do { \ + (p)->ip4h = NULL; \ + (p)->ip4vars.ip_opt_len = 0; \ + (p)->ip4vars.ip_opt_cnt = 0; \ + (p)->ip4vars.o_rr = NULL; \ + (p)->ip4vars.o_qs = NULL; \ + (p)->ip4vars.o_ts = NULL; \ + (p)->ip4vars.o_sec = NULL; \ + (p)->ip4vars.o_lsrr = NULL; \ + (p)->ip4vars.o_cipso = NULL; \ + (p)->ip4vars.o_sid = NULL; \ + (p)->ip4vars.o_ssrr = NULL; \ + (p)->ip4vars.o_rtralt = NULL; \ + (p)->ip4c.flags = 0; \ + (p)->ip4c.ver = 0; \ + (p)->ip4c.hl = 0; \ + (p)->ip4c.ip_tos = 0; \ + (p)->ip4c.ip_len = 0; \ + (p)->ip4c.ip_id = 0; \ + (p)->ip4c.ip_off = 0; \ + (p)->ip4c._ip_off = 0; \ + (p)->ip4c.rf = 0; \ + (p)->ip4c.df = 0; \ + (p)->ip4c.mf = 0; \ + (p)->ip4c.ip_ttl = 0; \ + (p)->ip4c.ip_proto = 0; \ + (p)->ip4c.ip_csum = 0; \ + (p)->ip4c.comp_csum = 0; \ + (p)->ip4c.ip_src_u32 = 0; \ + (p)->ip4c.ip_dst_u32 = 0; \ +} while (0) + /* helper structure with parsed ipv4 info */ typedef struct IPV4Vars_ { @@ -213,6 +244,7 @@ typedef struct IPV4Vars_ IPV4Opt *o_rtralt; } IPV4Vars; + void DecodeIPV4RegisterTests(void); /** ----- Inline functions ----- */ diff --git a/src/decode-ipv6.h b/src/decode-ipv6.h index 2641e963ec..28b6617d1d 100644 --- a/src/decode-ipv6.h +++ b/src/decode-ipv6.h @@ -118,6 +118,19 @@ typedef struct IPV6Vars_ * to loop through the exthdrs all the time */ } IPV6Vars; +#define CLEAR_IPV6_PACKET(p) do { \ + (p)->ip6h = NULL; \ + (p)->ip6c.flags = 0; \ + (p)->ip6c.ver = 0; \ + (p)->ip6c.cl = 0; \ + (p)->ip6c.flow = 0; \ + (p)->ip6c.nh = 0; \ + (p)->ip6c.plen = 0; \ + (p)->ip6c.hlim = 0; \ + (p)->ip6vars.ip_opts_len = 0; \ + (p)->ip6vars.l4proto = 0; \ +} while (0) + /* Fragment header */ typedef struct IPV6FragHdr_ { diff --git a/src/decode-tcp.h b/src/decode-tcp.h index 322320892a..7ffdc16894 100644 --- a/src/decode-tcp.h +++ b/src/decode-tcp.h @@ -155,6 +155,7 @@ typedef struct TCPCache_ { (p)->tcpvars.ts = NULL; \ (p)->tcpvars.ws = NULL; \ (p)->tcpvars.mss = NULL; \ + (p)->tcpc.comp_csum = -1; \ (p)->tcpc.ts1 = 0; \ (p)->tcpc.ts2 = 0; \ } diff --git a/src/decode-udp.h b/src/decode-udp.h index 6f9cb3187b..679c772ae3 100644 --- a/src/decode-udp.h +++ b/src/decode-udp.h @@ -54,6 +54,12 @@ typedef struct UDPCache_ { int32_t comp_csum; } UDPCache; +#define CLEAR_UDP_PACKET(p) do { \ + (p)->udpvars.hlen = 0; \ + (p)->udpc.comp_csum = -1; \ + (p)->udph = NULL; \ +} while (0) + void DecodeUDPV4RegisterTests(void); /** ------ Inline function ------ */ diff --git a/src/decode.c b/src/decode.c index 23ff0e5368..da932cee65 100644 --- a/src/decode.c +++ b/src/decode.c @@ -30,6 +30,7 @@ #include "app-layer-detect-proto.h" #include "tm-modules.h" #include "util-error.h" +#include "tmqh-packetpool.h" void DecodeTunnel(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, uint8_t *pkt, uint16_t len, PacketQueue *pq) { @@ -49,7 +50,7 @@ void DecodeTunnel(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, uint8_t *pkt } /** - * \brief Get a packet. We try to get a packet from the packet_q first, but + * \brief Get a packet. We try to get a packet from the packetpool first, but * if that is empty we alloc a packet that is free'd again after * processing. * @@ -58,10 +59,10 @@ void DecodeTunnel(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, uint8_t *pkt Packet *PacketGetFromQueueOrAlloc(void) { Packet *p = NULL; - /* try the queue first */ - SCMutexLock(&packet_q.mutex_q); - p = PacketDequeue(&packet_q); - SCMutexUnlock(&packet_q.mutex_q); + /* try the pool first */ + if (PacketPoolSize() > 0) { + p = PacketPoolGetPacket(); + } if (p == NULL) { /* non fatal, we're just not processing a packet then */ diff --git a/src/decode.h b/src/decode.h index 9b94703def..be22220655 100644 --- a/src/decode.h +++ b/src/decode.h @@ -210,6 +210,13 @@ typedef struct PacketAlerts_ { PacketAlert alerts[PACKET_ALERT_MAX]; } PacketAlerts; +#define PACKET_DECODER_EVENT_MAX 16 + +typedef struct PacketDecoderEvents_ { + uint8_t cnt; + uint8_t events[PACKET_DECODER_EVENT_MAX]; +} PacketDecoderEvents; + typedef struct PktVar_ { char *name; struct PktVar_ *next; /* right now just implement this as a list, @@ -258,7 +265,7 @@ typedef struct Packet_ * has the exact same tuple as the lower levels */ uint8_t recursion_level; - /*Pkt Flags*/ + /* Pkt Flags */ uint8_t flags; /* flow */ uint8_t flowflags; @@ -330,9 +337,6 @@ typedef struct Packet_ uint8_t pkt[IPV6_HEADER_LEN + 65536 + 28]; uint32_t pktlen; - /* decoder events: review how many events we have */ - uint8_t events[(DECODE_EVENT_MAX / 8) + 1]; - PacketAlerts alerts; /** packet number in the pcap file, matches wireshark */ @@ -353,6 +357,9 @@ typedef struct Packet_ char tunnel_pkt; char tunnel_verdicted; + /* decoder events */ + PacketDecoderEvents events; + /* tunnel/encapsulation handling */ struct Packet_ *root; /* in case of tunnel this is a ptr * to the 'real' packet, the one we @@ -464,12 +471,57 @@ typedef struct DecodeThreadVars_ * \todo the mutex destroy & init is necessary because of the memset, reconsider */ #define PACKET_RECYCLE(p) do { \ + (p)->recursion_level = 0; \ + (p)->flags = 0; \ + (p)->flowflags = 0; \ + (p)->flow = NULL; \ + (p)->ts.tv_sec = 0; \ + (p)->ts.tv_usec = 0; \ + (p)->datalink = 0; \ + (p)->action = 0; \ if ((p)->pktvar != NULL) { \ PktVarFree((p)->pktvar); \ + (p)->pktvar = NULL; \ } \ + (p)->ethh = NULL; \ + if ((p)->ip4h != NULL) { \ + CLEAR_IPV4_PACKET((p)); \ + } \ + if ((p)->ip6h != NULL) { \ + CLEAR_IPV6_PACKET((p)); \ + } \ + if ((p)->tcph != NULL) { \ + CLEAR_TCP_PACKET((p)); \ + } \ + if ((p)->udph != NULL) { \ + CLEAR_UDP_PACKET((p)); \ + } \ + if ((p)->icmpv4h != NULL) { \ + CLEAR_ICMPV4_PACKET((p)); \ + } \ + if ((p)->icmpv6h != NULL) { \ + CLEAR_ICMPV6_PACKET((p)); \ + } \ + (p)->ppph = NULL; \ + (p)->pppoesh = NULL; \ + (p)->pppoedh = NULL; \ + (p)->greh = NULL; \ + (p)->vlanh = NULL; \ + (p)->payload = NULL; \ + (p)->payload_len = 0; \ + (p)->pktlen = 0; \ + (p)->alerts.cnt = 0; \ + (p)->next = NULL; \ + (p)->prev = NULL; \ + (p)->rtv_cnt = 0; \ + (p)->tpr_cnt = 0; \ SCMutexDestroy(&(p)->mutex_rtv_cnt); \ - memset((p), 0x00, sizeof(Packet)); \ SCMutexInit(&(p)->mutex_rtv_cnt, NULL); \ + (p)->tunnel_proto = 0; \ + (p)->tunnel_pkt = 0; \ + (p)->tunnel_verdicted = 0; \ + (p)->events.cnt = 0; \ + (p)->root = NULL; \ PACKET_RESET_CHECKSUMS((p)); \ } while (0) @@ -565,9 +617,24 @@ void AddressDebugPrint(Address *); } while (0) -#define DECODER_SET_EVENT(p, e) ((p)->events[(e/8)] |= (1<<(e%8))) -#define DECODER_ISSET_EVENT(p, e) ((p)->events[(e/8)] & (1<<(e%8))) - +#define DECODER_SET_EVENT(p, e) do { \ + if ((p)->events.cnt < PACKET_DECODER_EVENT_MAX) { \ + (p)->events.events[(p)->events.cnt] = e; \ + (p)->events.cnt++; \ + } \ +} while(0) + +#define DECODER_ISSET_EVENT(p, e) ({ \ + int r = 0; \ + uint8_t u; \ + for (u = 0; u < (p)->events.cnt; u++) { \ + if ((p)->events.events[u] == (e)) { \ + r = 1; \ + break; \ + } \ + } \ + r; \ +}) /* older libcs don't contain a def for IPPROTO_DCCP * inside of diff --git a/src/runmodes.c b/src/runmodes.c index 845224a449..900faf7cf7 100644 --- a/src/runmodes.c +++ b/src/runmodes.c @@ -2078,7 +2078,7 @@ int RunModeIdsPcapAuto(DetectEngineCtx *de_ctx, char *iface) { TimeModeSetLive(); /* create the threads */ - ThreadVars *tv_receivepcap = TmThreadCreatePacketHandler("ReceivePcap","packetpool","packetpool","pickup-queue","ringbuffer","1slot"); + ThreadVars *tv_receivepcap = TmThreadCreatePacketHandler("ReceivePcap","packetpool","packetpool","pickup-queue","ringbuffer_srsw","1slot"); if (tv_receivepcap == NULL) { printf("ERROR: TmThreadsCreate failed\n"); exit(EXIT_FAILURE); @@ -2101,7 +2101,7 @@ int RunModeIdsPcapAuto(DetectEngineCtx *de_ctx, char *iface) { exit(EXIT_FAILURE); } - ThreadVars *tv_decode1 = TmThreadCreatePacketHandler("Decode1","pickup-queue","ringbuffer","decode-queue1","ringbuffer","1slot"); + ThreadVars *tv_decode1 = TmThreadCreatePacketHandler("Decode1","pickup-queue","ringbuffer_srsw","decode-queue1","ringbuffer_srsw","1slot"); if (tv_decode1 == NULL) { printf("ERROR: TmThreadsCreate failed for Decode1\n"); exit(EXIT_FAILURE); @@ -2124,7 +2124,7 @@ int RunModeIdsPcapAuto(DetectEngineCtx *de_ctx, char *iface) { exit(EXIT_FAILURE); } - ThreadVars *tv_stream1 = TmThreadCreatePacketHandler("Stream1","decode-queue1","ringbuffer","stream-queue1","ringbuffer","1slot"); + ThreadVars *tv_stream1 = TmThreadCreatePacketHandler("Stream1","decode-queue1","ringbuffer_srsw","stream-queue1","ringbuffer_mrsw","1slot"); if (tv_stream1 == NULL) { printf("ERROR: TmThreadsCreate failed for Stream1\n"); exit(EXIT_FAILURE); @@ -2166,7 +2166,7 @@ int RunModeIdsPcapAuto(DetectEngineCtx *de_ctx, char *iface) { char *thread_name = SCStrdup(tname); SCLogDebug("Assigning %s affinity to cpu %u", thread_name, cpu); - ThreadVars *tv_detect_ncpu = TmThreadCreatePacketHandler(thread_name,"stream-queue1","ringbuffer","verdict-queue","ringbuffer","1slot"); + ThreadVars *tv_detect_ncpu = TmThreadCreatePacketHandler(thread_name,"stream-queue1","ringbuffer_mrsw","verdict-queue","ringbuffer_srmw","1slot"); if (tv_detect_ncpu == NULL) { printf("ERROR: TmThreadsCreate failed\n"); exit(EXIT_FAILURE); @@ -2209,7 +2209,7 @@ int RunModeIdsPcapAuto(DetectEngineCtx *de_ctx, char *iface) { cpu++; } - ThreadVars *tv_rreject = TmThreadCreatePacketHandler("RespondReject","verdict-queue","ringbuffer","alert-queue","ringbuffer","1slot"); + ThreadVars *tv_rreject = TmThreadCreatePacketHandler("RespondReject","verdict-queue","ringbuffer_srmw","alert-queue","ringbuffer_srsw","1slot"); if (tv_rreject == NULL) { printf("ERROR: TmThreadsCreate failed\n"); exit(EXIT_FAILURE); @@ -2233,7 +2233,7 @@ int RunModeIdsPcapAuto(DetectEngineCtx *de_ctx, char *iface) { } ThreadVars *tv_outputs = TmThreadCreatePacketHandler("Outputs", - "alert-queue", "ringbuffer", "packetpool", "packetpool", "varslot"); + "alert-queue", "ringbuffer_srsw", "packetpool", "packetpool", "varslot"); SetupOutputs(tv_outputs); if (threading_set_cpu_affinity) { @@ -2282,6 +2282,7 @@ int RunModeFilePcapAuto(DetectEngineCtx *de_ctx, char *file) { TimeModeSetOffline(); /* create the threads */ + //ThreadVars *tv_receivepcap = TmThreadCreatePacketHandler("ReceivePcapFile","packetpool","packetpool","packetpool","packetpool","1slot"); ThreadVars *tv_receivepcap = TmThreadCreatePacketHandler("ReceivePcapFile","packetpool","packetpool","pickup-queue","ringbuffer_srsw","1slot"); if (tv_receivepcap == NULL) { printf("ERROR: TmThreadsCreate failed\n"); @@ -2304,7 +2305,8 @@ int RunModeFilePcapAuto(DetectEngineCtx *de_ctx, char *file) { printf("ERROR: TmThreadSpawn failed\n"); exit(EXIT_FAILURE); } - +//#if 0 + //ThreadVars *tv_decode1 = TmThreadCreatePacketHandler("Decode & Stream","pickup-queue","ringbuffer_srsw","packetpool","packetpool","varslot"); ThreadVars *tv_decode1 = TmThreadCreatePacketHandler("Decode & Stream","pickup-queue","ringbuffer_srsw","stream-queue1","ringbuffer_mrsw","varslot"); if (tv_decode1 == NULL) { printf("ERROR: TmThreadsCreate failed for Decode1\n"); @@ -2335,6 +2337,7 @@ int RunModeFilePcapAuto(DetectEngineCtx *de_ctx, char *file) { exit(EXIT_FAILURE); } +//#if 0 /* start with cpu 1 so that if we're creating an odd number of detect * threads we're not creating the most on CPU0. */ if (ncpus > 0) @@ -2411,7 +2414,7 @@ int RunModeFilePcapAuto(DetectEngineCtx *de_ctx, char *file) { printf("ERROR: TmThreadSpawn failed\n"); exit(EXIT_FAILURE); } - +//#endif return 0; } diff --git a/src/source-erf-dag.c b/src/source-erf-dag.c index 70c7377ce8..4c39b1d888 100644 --- a/src/source-erf-dag.c +++ b/src/source-erf-dag.c @@ -31,6 +31,7 @@ #include "tm-modules.h" #include "util-privs.h" +#include "tmqh-packetpool.h" #ifndef HAVE_DAG @@ -347,13 +348,10 @@ ReceiveErfDag(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, * prevent us from alloc'ing packets at line rate */ while (packet_q_len == 0) { - SCMutexLock(&packet_q.mutex_q); - packet_q_len = packet_q.len; - if (packet_q.len == 0) { - SCCondWait(&packet_q.cond_q, &packet_q.mutex_q); + packet_q_len = PacketPoolSize(); + if (packet_q_len == 0) { + PacketPoolWait(); } - packet_q_len = packet_q.len; - SCMutexUnlock(&packet_q.mutex_q); } if (postpq == NULL) { diff --git a/src/source-nfq.c b/src/source-nfq.c index ff80f4c80a..4e90bb8468 100644 --- a/src/source-nfq.c +++ b/src/source-nfq.c @@ -43,6 +43,7 @@ #include "util-error.h" #include "util-byte.h" #include "util-privs.h" +#include "tmqh-packetpool.h" #ifndef NFQ /** Handle the case where no NFQ support is compiled in. @@ -504,11 +505,9 @@ TmEcode ReceiveNFQ(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, Packe /* make sure we have at least one packet in the packet pool, to prevent * us from 1) alloc'ing packets at line rate, 2) have a race condition * for the nfq mutex lock with the verdict thread. */ - SCMutexLock(&packet_q.mutex_q); - if (packet_q.len == 0) { - SCCondWait(&packet_q.cond_q, &packet_q.mutex_q); + while (PacketPoolSize() == 0) { + PacketPoolWait(); } - SCMutexUnlock(&packet_q.mutex_q); /* do our nfq magic */ NFQRecvPkt(ntv); diff --git a/src/source-pcap-file.c b/src/source-pcap-file.c index 13a00aba03..481d3c9ba0 100644 --- a/src/source-pcap-file.c +++ b/src/source-pcap-file.c @@ -43,6 +43,7 @@ #include "conf.h" #include "util-error.h" #include "util-privs.h" +#include "tmqh-packetpool.h" extern uint8_t suricata_ctl_flags; extern int max_pending_packets; @@ -162,13 +163,10 @@ TmEcode ReceivePcapFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, /* make sure we have at least one packet in the packet pool, to prevent * us from alloc'ing packets at line rate */ while (packet_q_len == 0) { - SCMutexLock(&packet_q.mutex_q); - packet_q_len = packet_q.len; - if (packet_q.len == 0) { - SCCondWait(&packet_q.cond_q, &packet_q.mutex_q); + packet_q_len = PacketPoolSize(); + if (packet_q_len == 0) { + PacketPoolWait(); } - packet_q_len = packet_q.len; - SCMutexUnlock(&packet_q.mutex_q); } if (postpq == NULL) diff --git a/src/source-pcap.c b/src/source-pcap.c index 24a8d3fddc..bbf92d80c6 100644 --- a/src/source-pcap.c +++ b/src/source-pcap.c @@ -43,6 +43,7 @@ #include "util-debug.h" #include "util-error.h" #include "util-privs.h" +#include "tmqh-packetpool.h" extern uint8_t suricata_ctl_flags; extern int max_pending_packets; @@ -179,13 +180,10 @@ TmEcode ReceivePcap(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, Pack /* make sure we have at least one packet in the packet pool, to prevent * us from alloc'ing packets at line rate */ while (packet_q_len == 0) { - SCMutexLock(&packet_q.mutex_q); - packet_q_len = packet_q.len; - if (packet_q.len == 0) { - SCCondWait(&packet_q.cond_q, &packet_q.mutex_q); + packet_q_len = PacketPoolSize(); + if (packet_q_len == 0) { + PacketPoolWait(); } - packet_q_len = packet_q.len; - SCMutexUnlock(&packet_q.mutex_q); } if (postpq == NULL) diff --git a/src/suricata.c b/src/suricata.c index c0678338b4..f0dfd17e02 100644 --- a/src/suricata.c +++ b/src/suricata.c @@ -134,6 +134,8 @@ #include "output.h" #include "util-privs.h" +#include "tmqh-packetpool.h" + /* * we put this here, because we only use it here in main. */ @@ -224,11 +226,6 @@ void GlobalInits() SCLogInfo("Trans_Q Mutex not initialized correctly"); exit(EXIT_FAILURE); } - - /* initialize packet queues Here! */ - memset(&packet_q,0,sizeof(packet_q)); - SCMutexInit(&packet_q.mutex_q, NULL); - SCCondInit(&packet_q.cond_q, NULL); } /* XXX hack: make sure threads can stop the engine by calling this @@ -965,7 +962,7 @@ int main(int argc, char **argv) } PACKET_INITIALIZE(p); - PacketEnqueue(&packet_q,p); + PacketPoolStorePacket(p); } SCLogInfo("preallocated %"PRIiMAX" packets. Total memory %"PRIuMAX"", max_pending_packets, (uintmax_t)(max_pending_packets*sizeof(Packet))); @@ -1099,10 +1096,10 @@ int main(int argc, char **argv) if (suricata_ctl_flags & SURICATA_KILL) break; - SCMutexLock(&packet_q.mutex_q); - if (packet_q.len == max_pending_packets) + /* if all packets are returned to the packetpool + * we are done */ + if (PacketPoolSize() == max_pending_packets) done = 1; - SCMutexUnlock(&packet_q.mutex_q); if (done == 0) { usleep(100); diff --git a/src/suricata.h b/src/suricata.h index 0774636cee..a4ba160bd9 100644 --- a/src/suricata.h +++ b/src/suricata.h @@ -31,19 +31,6 @@ #define PROG_NAME "Suricata" #define PROG_VER "0.9.2" -/* number of packets in processing right now - * This is the diff between recv'd and verdicted - * pkts - * XXX this should be turned into an api located - * in the packetpool code - */ -//intmax_t pending; -#ifdef DBG_PERF -//uint32_t dbg_maxpending; -#endif /* DBG_PERF */ -//SCMutex mutex_pending; -//SCCondT cond_pending; - /* runtime engine control flags */ #define SURICATA_STOP 0x01 /**< gracefully stop the engine: process all outstanding packets first */ @@ -63,10 +50,6 @@ enum { MODE_DAG, }; -/* preallocated packet structures here - * XXX move to the packetpool queue handler code - */ -PacketQueue packet_q; /* queue's between various other threads * XXX move to the TmQueue structure later */ diff --git a/src/tmqh-packetpool.c b/src/tmqh-packetpool.c index eaa6c4ac07..5bb0423d03 100644 --- a/src/tmqh-packetpool.c +++ b/src/tmqh-packetpool.c @@ -20,9 +20,12 @@ * * \author Victor Julien * - * Packetpool queue handlers. Packet pool is a simple locking FIFO queue. - * - * \todo see if we can replace this by a lockless queue + * Packetpool queue handlers. Packet pool is implemented as a ringbuffer. + * We're using a multi reader / multi writer version of the ringbuffer, + * that is relatively expensive due to the CAS function. But it is necessary + * because every thread can return packets to the pool and multiple parts + * of the code retrieve packets (Decode, Defrag) and these can run in their + * own threads as well. */ #include "suricata.h" @@ -40,24 +43,58 @@ #include "tmqh-packetpool.h" +#include "util-ringbuffer.h" + +static RingBuffer16 *ringbuffer = NULL; + void TmqhPacketpoolRegister (void) { tmqh_table[TMQH_PACKETPOOL].name = "packetpool"; tmqh_table[TMQH_PACKETPOOL].InHandler = TmqhInputPacketpool; tmqh_table[TMQH_PACKETPOOL].OutHandler = TmqhOutputPacketpool; + + ringbuffer = RingBufferInit(); +} + +int PacketPoolIsEmpty(void) { + return RingBufferIsEmpty(ringbuffer); +} + +uint16_t PacketPoolSize(void) { + return RingBufferSize(ringbuffer); +} + +void PacketPoolWait(void) { + RingBufferWait(ringbuffer); +} + +/** \brief a initialized packet + * + * \warning Use *only* at init, not at packet runtime + */ +void PacketPoolStorePacket(Packet *p) { + if (RingBufferIsFull(ringbuffer)) { + exit(1); + } + + RingBufferMrMwPut(ringbuffer, (void *)p); + SCLogDebug("buffersize %u", RingBufferSize(ringbuffer)); +} + +Packet *PacketPoolGetPacket(void) { + if (RingBufferIsEmpty(ringbuffer)) + return NULL; + + Packet *p = RingBufferMrMwGet(ringbuffer); + return p; } Packet *TmqhInputPacketpool(ThreadVars *t) { Packet *p = NULL; - SCMutexLock(&packet_q.mutex_q); - while (p == NULL) { - p = PacketDequeue(&packet_q); - if (p == NULL) { - SCCondWait(&packet_q.cond_q, &packet_q.mutex_q); - } + while(p == NULL) { + p = RingBufferMrMwGet(ringbuffer); } - SCMutexUnlock(&packet_q.mutex_q); /* packet is clean */ @@ -70,7 +107,6 @@ void TmqhOutputPacketpool(ThreadVars *t, Packet *p) SCLogDebug("Packet %p, p->root %p, alloced %s", p, p->root, p->flags & PKT_ALLOC ? "true" : "false"); - PacketQueue *q = &packet_q; char proot = 0; if (IS_TUNNEL_PKT(p)) { @@ -144,11 +180,7 @@ void TmqhOutputPacketpool(ThreadVars *t, Packet *p) p->root = NULL; } else { PACKET_RECYCLE(p->root); - - SCMutexLock(&q->mutex_q); - PacketEnqueue(q, p->root); - SCCondSignal(&q->cond_q); - SCMutexUnlock(&q->mutex_q); + RingBufferMrMwPut(ringbuffer, (void *)p->root); } } @@ -158,11 +190,7 @@ void TmqhOutputPacketpool(ThreadVars *t, Packet *p) SCFree(p); } else { PACKET_RECYCLE(p); - - SCMutexLock(&q->mutex_q); - PacketEnqueue(q, p); - SCCondSignal(&q->cond_q); - SCMutexUnlock(&q->mutex_q); + RingBufferMrMwPut(ringbuffer, (void *)p); } SCReturn; diff --git a/src/tmqh-packetpool.h b/src/tmqh-packetpool.h index 0777b2479b..952bdb7ddb 100644 --- a/src/tmqh-packetpool.h +++ b/src/tmqh-packetpool.h @@ -28,5 +28,9 @@ Packet *TmqhInputPacketpool(ThreadVars *); void TmqhOutputPacketpool(ThreadVars *, Packet *); void TmqhReleasePacketsToPacketPool(PacketQueue *); void TmqhPacketpoolRegister (void); +Packet *PacketPoolGetPacket(void); +uint16_t PacketPoolSize(void); +void PacketPoolStorePacket(Packet *); +void PacketPoolWait(void); #endif /* __TMQH_PACKETPOOL_H__ */ diff --git a/src/util-ringbuffer.c b/src/util-ringbuffer.c index 3dbfdd76ef..0008607601 100644 --- a/src/util-ringbuffer.c +++ b/src/util-ringbuffer.c @@ -29,6 +29,7 @@ * * Implemented are: * Single reader, single writer (lockless) + * Single reader, multi writer (partly locked) * Multi reader, single writer (lockless) * Multi reader, multi writer (partly locked) */ @@ -39,24 +40,33 @@ #define USLEEP_TIME 5 -__thread uint32_t sleepytime = 5; - -static inline void Ringbuffer8Wait(RingBuffer8 *rb) { +/** \brief wait function for condition where ringbuffer is either + * full or empty. + * + * \param rb ringbuffer + * + * Based on RINGBUFFER_MUTEX_WAIT define, we either sleep and spin + * or use thread condition to wait. + */ +static inline void RingBuffer8DoWait(RingBuffer8 *rb) { #ifdef RINGBUFFER_MUTEX_WAIT - if (sleepytime <= 25) { - usleep(5); - sleepytime += 5; - } else { - SCMutexLock(&rb->wait_mutex); - SCCondWait(&rb->wait_cond, &rb->wait_mutex); - SCMutexUnlock(&rb->wait_mutex); - } + SCMutexLock(&rb->wait_mutex); + SCCondWait(&rb->wait_cond, &rb->wait_mutex); + SCMutexUnlock(&rb->wait_mutex); #else usleep(USLEEP_TIME); #endif } -static inline void RingbufferWait(RingBuffer16 *rb) { +/** \brief wait function for condition where ringbuffer is either + * full or empty. + * + * \param rb ringbuffer + * + * Based on RINGBUFFER_MUTEX_WAIT define, we either sleep and spin + * or use thread condition to wait. + */ +static inline void RingBufferDoWait(RingBuffer16 *rb) { #ifdef RINGBUFFER_MUTEX_WAIT SCMutexLock(&rb->wait_mutex); SCCondWait(&rb->wait_cond, &rb->wait_mutex); @@ -66,6 +76,22 @@ static inline void RingbufferWait(RingBuffer16 *rb) { #endif } +/** \brief wait function for condition where ringbuffer is either + * full or empty. + * + * \param rb ringbuffer + * + * Based on RINGBUFFER_MUTEX_WAIT define, we either sleep and spin + * or use thread condition to wait. + */ +void RingBufferWait(RingBuffer16 *rb) { + RingBufferDoWait(rb); +} + +/** \brief tell the ringbuffer to shut down + * + * \param rb ringbuffer + */ void RingBuffer8Shutdown(RingBuffer8 *rb) { rb->shutdown = 1; #ifdef RINGBUFFER_MUTEX_WAIT @@ -73,6 +99,10 @@ void RingBuffer8Shutdown(RingBuffer8 *rb) { #endif } +/** \brief tell the ringbuffer to shut down + * + * \param rb ringbuffer + */ void RingBufferShutdown(RingBuffer16 *rb) { rb->shutdown = 1; #ifdef RINGBUFFER_MUTEX_WAIT @@ -80,18 +110,55 @@ void RingBufferShutdown(RingBuffer16 *rb) { #endif } +/** \brief get number of items in the ringbuffer */ +uint16_t RingBufferSize(RingBuffer16 *rb) { + SCEnter(); + uint16_t size = (uint16_t)(SC_ATOMIC_GET(rb->write) - SC_ATOMIC_GET(rb->read)); + SCReturnUInt(size); +} + +/** \brief check the ringbuffer is empty (no data in it) + * + * \param rb ringbuffer + * + * \retval 1 empty + * \retval 0 not empty + */ +int RingBufferIsEmpty(RingBuffer16 *rb) { + if (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read)) { + return 1; + } + + return 0; +} + +/** \brief check the ringbuffer is full (no more data will fit) + * + * \param rb ringbuffer + * + * \retval 1 empty + * \retval 0 not empty + */ +int RingBufferIsFull(RingBuffer16 *rb) { + if ((unsigned short)(SC_ATOMIC_GET(rb->write) + 1) == SC_ATOMIC_GET(rb->read)) { + return 1; + } + + return 0; +} + /* Single Reader, Single Writer, 8 bits */ void *RingBufferSrSw8Get(RingBuffer8 *rb) { void *ptr = NULL; /* buffer is empty, wait... */ - while (SC_ATOMIC_GET(rb->read) == SC_ATOMIC_GET(rb->write)) { + while (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read)) { /* break out if the engine wants to shutdown */ if (rb->shutdown != 0) return NULL; - Ringbuffer8Wait(rb); + RingBuffer8DoWait(rb); } ptr = rb->array[SC_ATOMIC_GET(rb->read)]; @@ -100,7 +167,6 @@ void *RingBufferSrSw8Get(RingBuffer8 *rb) { #ifdef RINGBUFFER_MUTEX_WAIT SCCondSignal(&rb->wait_cond); #endif - sleepytime = 5; return ptr; } @@ -111,7 +177,7 @@ int RingBufferSrSw8Put(RingBuffer8 *rb, void *ptr) { if (rb->shutdown != 0) return -1; - Ringbuffer8Wait(rb); + RingBuffer8DoWait(rb); } rb->array[SC_ATOMIC_GET(rb->write)] = ptr; @@ -120,7 +186,6 @@ int RingBufferSrSw8Put(RingBuffer8 *rb, void *ptr) { #ifdef RINGBUFFER_MUTEX_WAIT SCCondSignal(&rb->wait_cond); #endif - sleepytime = 5; return 0; } @@ -130,12 +195,12 @@ void *RingBufferSrMw8Get(RingBuffer8 *rb) { void *ptr = NULL; /* buffer is empty, wait... */ - while (SC_ATOMIC_GET(rb->read) == SC_ATOMIC_GET(rb->write)) { + while (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read)) { /* break out if the engine wants to shutdown */ if (rb->shutdown != 0) return NULL; - Ringbuffer8Wait(rb); + RingBuffer8DoWait(rb); } ptr = rb->array[SC_ATOMIC_GET(rb->read)]; @@ -144,7 +209,6 @@ void *RingBufferSrMw8Get(RingBuffer8 *rb) { #ifdef RINGBUFFER_MUTEX_WAIT SCCondSignal(&rb->wait_cond); #endif - sleepytime = 5; return ptr; } @@ -176,7 +240,7 @@ retry: if (rb->shutdown != 0) return -1; - Ringbuffer8Wait(rb); + RingBuffer8DoWait(rb); } /* get our lock */ @@ -198,7 +262,6 @@ retry: #ifdef RINGBUFFER_MUTEX_WAIT SCCondSignal(&rb->wait_cond); #endif - sleepytime = 5; return 0; } @@ -221,12 +284,12 @@ void *RingBufferMrSw8Get(RingBuffer8 *rb) { /* buffer is empty, wait... */ retry: - while (SC_ATOMIC_GET(rb->read) == SC_ATOMIC_GET(rb->write)) { + while (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read)) { /* break out if the engine wants to shutdown */ if (rb->shutdown != 0) return NULL; - Ringbuffer8Wait(rb); + RingBuffer8DoWait(rb); } /* atomically update rb->read */ @@ -234,7 +297,7 @@ retry: do { /* with multiple readers we can get in the situation that we exitted * from the wait loop but the rb is empty again once we get here. */ - if (SC_ATOMIC_GET(rb->read) == SC_ATOMIC_GET(rb->write)) + if (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read)) goto retry; readp++; @@ -246,7 +309,6 @@ retry: #ifdef RINGBUFFER_MUTEX_WAIT SCCondSignal(&rb->wait_cond); #endif - sleepytime = 5; return ptr; } @@ -262,7 +324,7 @@ int RingBufferMrSw8Put(RingBuffer8 *rb, void *ptr) { if (rb->shutdown != 0) return -1; - Ringbuffer8Wait(rb); + RingBuffer8DoWait(rb); } rb->array[SC_ATOMIC_GET(rb->write)] = ptr; @@ -271,7 +333,6 @@ int RingBufferMrSw8Put(RingBuffer8 *rb, void *ptr) { #ifdef RINGBUFFER_MUTEX_WAIT SCCondSignal(&rb->wait_cond); #endif - sleepytime = 5; return 0; } @@ -295,12 +356,12 @@ void *RingBufferMrSwGet(RingBuffer16 *rb) { /* buffer is empty, wait... */ retry: - while (SC_ATOMIC_GET(rb->read) == SC_ATOMIC_GET(rb->write)) { + while (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read)) { /* break out if the engine wants to shutdown */ if (rb->shutdown != 0) return NULL; - RingbufferWait(rb); + RingBufferDoWait(rb); } /* atomically update rb->read */ @@ -308,7 +369,7 @@ retry: do { /* with multiple readers we can get in the situation that we exitted * from the wait loop but the rb is empty again once we get here. */ - if (SC_ATOMIC_GET(rb->read) == SC_ATOMIC_GET(rb->write)) + if (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read)) goto retry; readp++; @@ -320,7 +381,6 @@ retry: #ifdef RINGBUFFER_MUTEX_WAIT SCCondSignal(&rb->wait_cond); #endif - sleepytime = 5; return ptr; } @@ -336,7 +396,7 @@ int RingBufferMrSwPut(RingBuffer16 *rb, void *ptr) { if (rb->shutdown != 0) return -1; - RingbufferWait(rb); + RingBufferDoWait(rb); } rb->array[SC_ATOMIC_GET(rb->write)] = ptr; @@ -345,7 +405,6 @@ int RingBufferMrSwPut(RingBuffer16 *rb, void *ptr) { #ifdef RINGBUFFER_MUTEX_WAIT SCCondSignal(&rb->wait_cond); #endif - sleepytime = 5; return 0; } @@ -356,12 +415,12 @@ void *RingBufferSrSwGet(RingBuffer16 *rb) { void *ptr = NULL; /* buffer is empty, wait... */ - while (SC_ATOMIC_GET(rb->read) == SC_ATOMIC_GET(rb->write)) { + while (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read)) { /* break out if the engine wants to shutdown */ if (rb->shutdown != 0) return NULL; - RingbufferWait(rb); + RingBufferDoWait(rb); } ptr = rb->array[SC_ATOMIC_GET(rb->read)]; @@ -370,7 +429,6 @@ void *RingBufferSrSwGet(RingBuffer16 *rb) { #ifdef RINGBUFFER_MUTEX_WAIT SCCondSignal(&rb->wait_cond); #endif - sleepytime = 5; return ptr; } @@ -381,7 +439,7 @@ int RingBufferSrSwPut(RingBuffer16 *rb, void *ptr) { if (rb->shutdown != 0) return -1; - RingbufferWait(rb); + RingBufferDoWait(rb); } rb->array[SC_ATOMIC_GET(rb->write)] = ptr; @@ -390,7 +448,6 @@ int RingBufferSrSwPut(RingBuffer16 *rb, void *ptr) { #ifdef RINGBUFFER_MUTEX_WAIT SCCondSignal(&rb->wait_cond); #endif - sleepytime = 5; return 0; } @@ -447,12 +504,12 @@ void *RingBufferMrMw8Get(RingBuffer8 *rb) { /* buffer is empty, wait... */ retry: - while (SC_ATOMIC_GET(rb->read) == SC_ATOMIC_GET(rb->write)) { + while (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read)) { /* break out if the engine wants to shutdown */ if (rb->shutdown != 0) return NULL; - Ringbuffer8Wait(rb); + RingBuffer8DoWait(rb); } /* atomically update rb->read */ @@ -460,7 +517,7 @@ retry: do { /* with multiple readers we can get in the situation that we exitted * from the wait loop but the rb is empty again once we get here. */ - if (SC_ATOMIC_GET(rb->read) == SC_ATOMIC_GET(rb->write)) + if (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read)) goto retry; readp++; @@ -471,7 +528,6 @@ retry: #ifdef RINGBUFFER_MUTEX_WAIT SCCondSignal(&rb->wait_cond); #endif - sleepytime = 5; return ptr; } @@ -503,7 +559,7 @@ retry: if (rb->shutdown != 0) return -1; - Ringbuffer8Wait(rb); + RingBuffer8DoWait(rb); } /* get our lock */ @@ -525,7 +581,6 @@ retry: #ifdef RINGBUFFER_MUTEX_WAIT SCCondSignal(&rb->wait_cond); #endif - sleepytime = 5; return 0; } @@ -583,12 +638,12 @@ void *RingBufferMrMwGet(RingBuffer16 *rb) { /* buffer is empty, wait... */ retry: - while (SC_ATOMIC_GET(rb->read) == SC_ATOMIC_GET(rb->write)) { + while (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read)) { /* break out if the engine wants to shutdown */ if (rb->shutdown != 0) return NULL; - RingbufferWait(rb); + RingBufferDoWait(rb); } /* atomically update rb->read */ @@ -596,7 +651,7 @@ retry: do { /* with multiple readers we can get in the situation that we exitted * from the wait loop but the rb is empty again once we get here. */ - if (SC_ATOMIC_GET(rb->read) == SC_ATOMIC_GET(rb->write)) + if (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read)) goto retry; readp++; @@ -608,7 +663,6 @@ retry: #ifdef RINGBUFFER_MUTEX_WAIT SCCondSignal(&rb->wait_cond); #endif - sleepytime = 5; return ptr; } @@ -640,7 +694,7 @@ retry: if (rb->shutdown != 0) return -1; - RingbufferWait(rb); + RingBufferDoWait(rb); } /* get our lock */ @@ -662,7 +716,6 @@ retry: #ifdef RINGBUFFER_MUTEX_WAIT SCCondSignal(&rb->wait_cond); #endif - sleepytime = 5; return 0; } diff --git a/src/util-ringbuffer.h b/src/util-ringbuffer.h index 8848d260f8..b9a60a77af 100644 --- a/src/util-ringbuffer.h +++ b/src/util-ringbuffer.h @@ -66,10 +66,18 @@ typedef struct RingBuffer16_ { RingBuffer8 *RingBuffer8Init(void); void RingBuffer8Destroy(RingBuffer8 *); +RingBuffer16 *RingBufferInit(void); +void RingBuffer16Destroy(RingBuffer16 *); + +int RingBufferIsEmpty(RingBuffer16 *); +int RingBufferIsFull(RingBuffer16 *); +uint16_t RingBufferSize(RingBuffer16 *); void RingBuffer8Shutdown(RingBuffer8 *); void RingBufferShutdown(RingBuffer16 *); +void RingBufferWait(RingBuffer16 *rb); + /** Single Reader, Single Writer ring buffer, fixed at * 256 items so we can use unsigned char's that just * wrap around */ @@ -106,5 +114,8 @@ int RingBufferMrMw8Put(RingBuffer8 *, void *); void *RingBufferMrMwGet(RingBuffer16 *); int RingBufferMrMwPut(RingBuffer16 *, void *); +void *RingBufferSrMw8Get(RingBuffer8 *); +int RingBufferSrMw8Put(RingBuffer8 *, void *); + #endif /* __UTIL_RINGBUFFER_H__ */ diff --git a/suricata.yaml b/suricata.yaml index ecc2dfa2d6..247f4037cf 100644 --- a/suricata.yaml +++ b/suricata.yaml @@ -104,7 +104,7 @@ threading: # thread being created. Regardless of the setting at a minimum 1 detect # thread will always be created. # - detect_thread_ratio: 1.0 + detect_thread_ratio: 1.5 # Select the multi pattern algorithm you want to run for scan/search the # in the engine. The supported algorithms are b2g, b3g and wumanber.