From 298d4be7bbe107d8504f0a95077295aa55f434af Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Fri, 31 Oct 2008 10:23:45 +0100 Subject: [PATCH] Threading update for tunneling and high load --- src/packet-queue.c | 8 ++- src/tm-queues.c | 12 ++++ src/tm-threads.c | 74 +++++++++++++++++++++- src/vips.c | 152 ++++++++++----------------------------------- 4 files changed, 123 insertions(+), 123 deletions(-) diff --git a/src/packet-queue.c b/src/packet-queue.c index 05111af0e8..492eb5e728 100644 --- a/src/packet-queue.c +++ b/src/packet-queue.c @@ -27,9 +27,11 @@ Packet *PacketDequeue (PacketQueue *q) { /* if the queue is empty there are no packets left. * In that case we sleep and try again. */ if (q->len == 0) { - printf("PacketDequeue: queue is empty, waiting...\n"); - usleep(100000); /* sleep 100ms */ - return PacketDequeue(q); +// printf("PacketDequeue: queue is empty, waiting...\n"); +// TmqDebugList(); +// usleep(100000); /* sleep 100ms */ +// return PacketDequeue(q); + return NULL; } /* pull the bottom packet from the queue */ diff --git a/src/tm-queues.c b/src/tm-queues.c index b51100b36a..8cb68256d3 100644 --- a/src/tm-queues.c +++ b/src/tm-queues.c @@ -1,4 +1,6 @@ #include "vips.h" +#include "threads.h" + #include "tm-queues.h" #define TMQ_MAX_QUEUES 256 @@ -42,3 +44,13 @@ Tmq* TmqGetQueueByName(char *name) { return NULL; } +void TmqDebugList(void) { + u_int16_t i = 0; + for (i = 0; i < tmq_id; i++) { + /* get a lock accessing the len */ + mutex_lock(&trans_q[tmqs[i].id].mutex_q); + printf("TmqDebugList: id %u, name \'%s\', len %u\n", tmqs[i].id, tmqs[i].name, trans_q[tmqs[i].id].len); + mutex_unlock(&trans_q[tmqs[i].id].mutex_q); + } +} + diff --git a/src/tm-threads.c b/src/tm-threads.c index afe2c431a0..5d2ebf1039 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -186,19 +186,23 @@ void *TmThreadsSlot1(void *td) { memset(&s1->slot1_pq, 0, sizeof(PacketQueue)); while(run) { + /* input a packet */ p = tv->tmqh_in(tv); + if (p == NULL) { //printf("%s: TmThreadsSlot1: p == NULL\n", tv->name); } else { r = s1->Slot1Func(tv, p, s1->slot1_data, &s1->slot1_pq); while (s1->slot1_pq.len > 0) { - Packet *extra = PacketDequeue(&s1->slot1_pq); - tv->tmqh_out(tv, extra); + /* handle new packets from this func */ + Packet *extra_p = PacketDequeue(&s1->slot1_pq); + tv->tmqh_out(tv, extra_p); } //printf("%s: TmThreadsSlot1: p %p, r %d\n", tv->name, p, r); /* XXX handle error */ + /* output the packet */ tv->tmqh_out(tv, p); } @@ -242,15 +246,36 @@ void *TmThreadsSlot2(void *td) { } while(run) { + /* input a packet */ p = tv->tmqh_in(tv); + if (p == NULL) { //printf("%s: TmThreadsSlot1: p == NULL\n", tv->name); } else { r = s2->Slot1Func(tv, p, s2->slot1_data, &s2->slot1_pq); + while (s2->slot1_pq.len > 0) { + /* handle new packets from this func */ + Packet *extra_p = PacketDequeue(&s2->slot1_pq); + + r = s2->Slot2Func(tv, extra_p, s2->slot2_data, &s2->slot2_pq); + while (s2->slot2_pq.len > 0) { + /* handle new packets from this func */ + Packet *extra_p2 = PacketDequeue(&s2->slot2_pq); + tv->tmqh_out(tv, extra_p2); + } + tv->tmqh_out(tv, extra_p); + } r = s2->Slot2Func(tv, p, s2->slot2_data, &s2->slot2_pq); + while (s2->slot2_pq.len > 0) { + /* handle new packets from this func */ + Packet *extra_p = PacketDequeue(&s2->slot2_pq); + tv->tmqh_out(tv, extra_p); + } + //printf("%s: TmThreadsSlot1: p %p, r %d\n", tv->name, p, r); /* XXX handle error */ + /* output the packet */ tv->tmqh_out(tv, p); } @@ -306,16 +331,61 @@ void *TmThreadsSlot3(void *td) { } while(run) { + /* input a packet */ p = tv->tmqh_in(tv); + if (p == NULL) { //printf("%s: TmThreadsSlot1: p == NULL\n", tv->name); } else { + /* slot 1 */ r = s3->Slot1Func(tv, p, s3->slot1_data, &s3->slot1_pq); + while (s3->slot1_pq.len > 0) { + /* handle new packets from this func */ + Packet *extra_p = PacketDequeue(&s3->slot1_pq); + + r = s3->Slot2Func(tv, extra_p, s3->slot2_data, &s3->slot2_pq); + while (s3->slot2_pq.len > 0) { + /* handle new packets from this func */ + Packet *extra_p2 = PacketDequeue(&s3->slot2_pq); + + r = s3->Slot3Func(tv, extra_p2, s3->slot3_data, &s3->slot3_pq); + while (s3->slot3_pq.len > 0) { + /* handle new packets from this func */ + Packet *extra_p3 = PacketDequeue(&s3->slot3_pq); + tv->tmqh_out(tv, extra_p3); + } + tv->tmqh_out(tv, extra_p2); + } + tv->tmqh_out(tv, extra_p); + } + + /* slot 2 */ r = s3->Slot2Func(tv, p, s3->slot2_data, &s3->slot2_pq); + while (s3->slot2_pq.len > 0) { + /* handle new packets from this func */ + Packet *extra_p = PacketDequeue(&s3->slot2_pq); + + r = s3->Slot3Func(tv, extra_p, s3->slot3_data, &s3->slot3_pq); + while (s3->slot3_pq.len > 0) { + /* handle new packets from this func */ + Packet *extra_p2 = PacketDequeue(&s3->slot3_pq); + tv->tmqh_out(tv, extra_p2); + } + tv->tmqh_out(tv, extra_p); + } + + /* slot 3 */ r = s3->Slot3Func(tv, p, s3->slot3_data, &s3->slot3_pq); + while (s3->slot3_pq.len > 0) { + /* handle new packets from this func */ + Packet *extra_p = PacketDequeue(&s3->slot3_pq); + tv->tmqh_out(tv, extra_p); + } + //printf("%s: TmThreadsSlot1: p %p, r %d\n", tv->name, p, r); /* XXX handle error */ + /* output the packet */ tv->tmqh_out(tv, p); } diff --git a/src/vips.c b/src/vips.c index 83fdc17246..dc99663c67 100644 --- a/src/vips.c +++ b/src/vips.c @@ -42,7 +42,10 @@ #endif /* NFQ */ #include "respond-reject.h" + #include "flow.h" +#include "flow-var.h" +#include "pkt-var.h" #include "util-cidr.h" #include "util-unittest.h" @@ -80,9 +83,20 @@ setup_signal_handler(int sig, void (*handler)()) Packet *SetupPkt (void) { - mutex_lock(&packet_q.mutex_q); - Packet *p = PacketDequeue(&packet_q); - mutex_unlock(&packet_q.mutex_q); + Packet *p = NULL; + do { + mutex_lock(&packet_q.mutex_q); + p = PacketDequeue(&packet_q); + mutex_unlock(&packet_q.mutex_q); + + if (p == NULL) { + //TmqDebugList(); + usleep(1000); /* sleep 1ms */ + + /* XXX check for recv'd signals, so + * we can exit on signals received */ + } + } while (p == NULL); CLEAR_PACKET(p); return p; @@ -93,9 +107,20 @@ Packet *TunnelPktSetup(ThreadVars *t, Packet *parent, u_int8_t *pkt, u_int16_t l //printf("TunnelPktSetup: pkt %p, len %u, proto %u\n", pkt, len, proto); /* get us a packet */ - mutex_lock(&packet_q.mutex_q); - Packet *p = PacketDequeue(&packet_q); - mutex_unlock(&packet_q.mutex_q); + Packet *p = NULL; + do { + mutex_lock(&packet_q.mutex_q); + p = PacketDequeue(&packet_q); + mutex_unlock(&packet_q.mutex_q); + + if (p == NULL) { + //TmqDebugList(); + usleep(1000); /* sleep 1ms */ + + /* XXX check for recv'd signals, so + * we can exit on signals received */ + } + } while (p == NULL); mutex_lock(&mutex_pending); pending++; @@ -124,111 +149,6 @@ Packet *TunnelPktSetup(ThreadVars *t, Packet *parent, u_int8_t *pkt, u_int16_t l return p; } -/* this function should only be called for tunnel packets - * ( I could also add a check for that here, but better do - * that at the caller, it saves us a functioncall for all - * non-tunnel packets) - * - * the problem we have is this: we reinject a pseudo packet - * into the pickup queue when we encounter a tunnel. This way - * we can independently inspect both the raw packet and any - * tunneled packet. We can however, reinject only one, and - * we can only do it when all are inspected. This is why - * all packets that are done set the RTV (Ready To Verdict) - * flag. Each time a packet is done, it checks if it is the - * last one. If not, we do nothing except return it to the - * memory pool. If we have handled everything, verdict this - * one. - * - */ -#if 0 -static Packet * VerdictTunnelPacket(Packet *p) { - char verdict = 1; - Packet *vp = NULL; - - INCR_PKT_RTV(p); - - pthread_mutex_t *m = p->root ? &p->root->mutex_rtv_cnt : &p->mutex_rtv_cnt; - - mutex_lock(m); - /* if there are more tunnel packets than ready to verdict packets, - * we won't verdict this one */ - if ((PKT_TPR(p)+1) > PKT_RTV(p)) { - verdict = 0; - } - mutex_unlock(m); - - /* don't set a verdict, we are not done yet with all packets */ - if (verdict == 0) { - /* if this is not the root, we don't need it any longer */ - if (!(IS_TUNNEL_ROOT_PKT(p))) { - mutex_lock(&packet_q.mutex_q); - PacketEnqueue(&packet_q, p); - mutex_unlock(&packet_q.mutex_q); - } - return NULL; - } - - /* okay, we are going to set a verdict */ - - /* just verdict this one if it is the root */ - if (IS_TUNNEL_ROOT_PKT(p)) { - return p; - } - - /* not a tunnel root, so verdict p->root and get p - * into the packet_q */ - vp = p->root; - - mutex_lock(&packet_q.mutex_q); - PacketEnqueue(&packet_q, p); - mutex_unlock(&packet_q.mutex_q); - return vp; -} -#endif -#if 0 -void *DetectThread(void *td) { - ThreadVars *th_v = (ThreadVars *)td; - int run = 1; - u_int32_t cnt = 0; - - printf("DetectThread[%d] started... th_v %p\n", th_v->tid, th_v); - - while(run) { - Packet *p = th_v->tmqh_in(th_v); - if (p == NULL) { - if (threadflags & VIPS_KILLDETECT) - run = 0; - } else { -#ifdef COUNTERS - cnt++; -#endif /* COUNTERS */ - - SigMatchSignatures(th_v, p); - - /* handle normal packets and packets containing tunnels - * differently. Normal packets are just forwarded to the - * next queue. Tunnel packets need more care. */ - if (!(IS_TUNNEL_PKT(p))) { - th_v->tmqh_out(th_v, p); - } else { - /* verdict the packet VerdictTunnelPacket returns. The - * function handles the rest */ - Packet *vp = VerdictTunnelPacket(p); - if (vp != NULL) { - th_v->tmqh_out(th_v, p); - } - } - } - } - - printf("DetectThread[%d] cnt %u\n", th_v->tid, cnt); - printf("DetectThread[%d] ended...\n", th_v->tid); - pthread_exit((void *) 0); -} -#endif - - int main(int argc, char **argv) { int rc; @@ -288,12 +208,14 @@ int main(int argc, char **argv) printf("Preallocating packets... packet size %u\n", sizeof(Packet)); int i = 0; for (i = 0; i < MAX_PENDING; i++) { + /* XXX pkt alloc function */ Packet *p = malloc(sizeof(Packet)); if (p == NULL) { printf("ERROR: malloc failed: %s\n", strerror(errno)); exit(1); } + p->pktvar = NULL; CLEAR_TCP_PACKET(p); CLEAR_PACKET(p); @@ -373,9 +295,6 @@ int main(int argc, char **argv) } Tm1SlotSetFunc(tv_detect1,tm_module); - /* XXX this needs an api way of doing this */ - //PatternMatcherThreadInit(tv_detect1); - if (TmThreadSpawn(tv_detect1) != 0) { printf("ERROR: TmThreadSpawn failed\n"); exit(1); @@ -393,9 +312,6 @@ int main(int argc, char **argv) } Tm1SlotSetFunc(tv_detect2,tm_module); - /* XXX this needs an api way of doing this */ - //PatternMatcherThreadInit(tv_detect2); - if (TmThreadSpawn(tv_detect2) != 0) { printf("ERROR: TmThreadSpawn failed\n"); exit(1); @@ -430,7 +346,6 @@ int main(int argc, char **argv) } Tm1SlotSetFunc(tv_rreject,tm_module); - /* XXX this needs an api way of doing this */ if (TmThreadSpawn(tv_rreject) != 0) { printf("ERROR: TmThreadSpawn failed\n"); exit(1); @@ -484,6 +399,7 @@ int main(int argc, char **argv) printf("ERROR: TmThreadSpawn failed\n"); exit(1); } + /* ThreadVars *tv_unifiedalert = TmThreadCreate("AlertUnifiedAlert","alert-queue3","simple","packetpool","packetpool","1slot"); if (tv_unifiedalert == NULL) {