diff --git a/src/flow-util.h b/src/flow-util.h index e90d9c44db..57bcbf7e94 100644 --- a/src/flow-util.h +++ b/src/flow-util.h @@ -62,7 +62,7 @@ (f)->hprev = NULL; \ (f)->lnext = NULL; \ (f)->lprev = NULL; \ - (f)->autofp_tmqh_flow_qid = -1; \ + SC_ATOMIC_SET((f)->autofp_tmqh_flow_qid, -1); \ RESET_COUNTERS((f)); \ } while (0) @@ -93,9 +93,8 @@ (f)->tag_list = NULL; \ GenericVarFree((f)->flowvar); \ (f)->flowvar = NULL; \ - if ((f)->autofp_tmqh_flow_qid != -1) { \ - TmqhFlowUpdateActiveFlows((f)); \ - (f)->autofp_tmqh_flow_qid = -1; \ + if (SC_ATOMIC_GET((f)->autofp_tmqh_flow_qid) != -1) { \ + SC_ATOMIC_SET((f)->autofp_tmqh_flow_qid, -1); \ } \ RESET_COUNTERS((f)); \ } while(0) @@ -111,8 +110,8 @@ DetectTagDataListFree((f)->tag_list); \ GenericVarFree((f)->flowvar); \ SCMutexDestroy(&(f)->de_state_m); \ - if ((f)->autofp_tmqh_flow_qid != -1) { \ - TmqhFlowUpdateActiveFlows((f)); \ + if (SC_ATOMIC_GET((f)->autofp_tmqh_flow_qid) != -1) { \ + SC_ATOMIC_DESTROY((f)->autofp_tmqh_flow_qid); \ } \ (f)->tag_list = NULL; \ } while(0) diff --git a/src/flow.h b/src/flow.h index fe95cd6e28..c5a2af8993 100644 --- a/src/flow.h +++ b/src/flow.h @@ -312,7 +312,7 @@ typedef struct Flow_ uint64_t bytecnt; #endif - int32_t autofp_tmqh_flow_qid; + SC_ATOMIC_DECLARE(int, autofp_tmqh_flow_qid); } Flow; enum { diff --git a/src/suricata.c b/src/suricata.c index cf1636d286..766951ce15 100644 --- a/src/suricata.c +++ b/src/suricata.c @@ -1902,8 +1902,6 @@ int main(int argc, char **argv) #endif /* OS_WIN32 */ SC_ATOMIC_DESTROY(engine_stage); - if (strcasecmp(RunmodeGetActive(), "autofp") == 0) - TmqhFlowPrintStatistics(); exit(engine_retval); } diff --git a/src/threadvars.h b/src/threadvars.h index 9f81c347ce..c8f760e558 100644 --- a/src/threadvars.h +++ b/src/threadvars.h @@ -73,6 +73,7 @@ typedef struct ThreadVars_ { Tmq *inq; Tmq *outq; void *outctx; + char *outqh_name; /** queue handlers */ struct Packet_ * (*tmqh_in)(struct ThreadVars_ *); diff --git a/src/tm-threads.c b/src/tm-threads.c index c5eb4c9664..9625f1a78b 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -1192,6 +1192,7 @@ ThreadVars *TmThreadCreate(char *name, char *inq_name, char *inqh_name, goto error; tv->tmqh_out = tmqh->OutHandler; + tv->outqh_name = tmqh->name; if (outq_name != NULL && strcmp(outq_name, "packetpool") != 0) { SCLogDebug("outq_name \"%s\"", outq_name); @@ -1430,6 +1431,16 @@ void TmThreadKillThread(ThreadVars *tv) SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id); } + if (tv->outctx != NULL) { + Tmqh *tmqh = TmqhGetQueueHandlerByName(tv->outqh_name); + if (tmqh == NULL) + BUG_ON(1); + + if (tmqh->OutHandlerCtxFree != NULL) { + tmqh->OutHandlerCtxFree(tv->outctx); + } + } + if (tv->cond != NULL ) { int cnt = 0; while (1) { diff --git a/src/tmqh-flow.c b/src/tmqh-flow.c index 038a8fd8d3..ec2a634d44 100644 --- a/src/tmqh-flow.c +++ b/src/tmqh-flow.c @@ -62,9 +62,6 @@ void TmqhFlowRegister(void) if (strcmp(scheduler, "round_robin") == 0) { SCLogInfo("AutoFP mode using \"Round Robin\" Q Handler"); tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowRoundRobin; - } else if (strcmp(scheduler, "active_flows") == 0) { - SCLogInfo("AutoFP mode using \"Active Flows\" Q Handler"); - tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActiveFlows; } else if (strcmp(scheduler, "active_packets") == 0) { SCLogInfo("AutoFP mode using \"Active Packets\" Q Handler"); tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets; @@ -75,8 +72,8 @@ void TmqhFlowRegister(void) exit(EXIT_FAILURE); } } else { - SCLogInfo("AutoFP mode using default \"Round Robin\" Q Handler"); - tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowRoundRobin; + SCLogInfo("AutoFP mode using default \"Active Packets\" Q Handler"); + tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets; } return; @@ -134,7 +131,8 @@ static int StoreQueueId(TmqhFlowCtx *ctx, char *name) memset(ctx->queues + (ctx->size - 1), 0, sizeof(TmqhFlowMode)); } ctx->queues[ctx->size - 1].q = &trans_q[id]; - SC_ATOMIC_INIT(ctx->queues[ctx->size - 1].active_flows); + SC_ATOMIC_INIT(ctx->queues[ctx->size - 1].total_packets); + SC_ATOMIC_INIT(ctx->queues[ctx->size - 1].total_flows); return 0; } @@ -185,6 +183,8 @@ void *TmqhOutputFlowSetupCtx(char *queue_str) tmqh_flow_outctx = ctx; + SC_ATOMIC_INIT(ctx->round_robin_idx); + SCFree(str); return (void *)ctx; @@ -200,8 +200,15 @@ void TmqhOutputFlowFreeCtx(void *ctx) int i; TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx; + SCLogInfo("AutoFP - Total flow handler queues - %" PRIu16, + tmqh_flow_outctx->size); for (i = 0; i < fctx->size; i++) { - SC_ATOMIC_DESTROY(fctx->queues[i].active_flows); + SCLogInfo("AutoFP - Total Packets - Queue %"PRIu32 " - %"PRIu64 , i, + SC_ATOMIC_GET(fctx->queues[i].total_packets)); + SCLogInfo("AutoFP - Total Flows - Queue %"PRIu32 " - %"PRIu64 , i, + SC_ATOMIC_GET(fctx->queues[i].total_flows)); + SC_ATOMIC_DESTROY(fctx->queues[i].total_packets); + SC_ATOMIC_DESTROY(fctx->queues[i].total_flows); } SCFree(fctx->queues); @@ -226,20 +233,23 @@ void TmqhOutputFlowRoundRobin(ThreadVars *tv, Packet *p) /* if no flow we use the first queue, * should be rare */ if (p->flow != NULL) { - qid = p->flow->autofp_tmqh_flow_qid; + qid = SC_ATOMIC_GET(p->flow->autofp_tmqh_flow_qid); if (qid == -1) { - p->flow->autofp_tmqh_flow_qid = qid = ctx->round_robin_idx++; - ctx->round_robin_idx = ctx->round_robin_idx % ctx->size; - SC_ATOMIC_ADD(ctx->queues[qid].active_flows, 1); - ctx->queues[qid].total_flows++; + qid = SC_ATOMIC_ADD(ctx->round_robin_idx, 1); + if (qid >= ctx->size) { + SC_ATOMIC_RESET(ctx->round_robin_idx); + qid = 0; + } + SC_ATOMIC_ADD(ctx->queues[qid].total_flows, 1); + SC_ATOMIC_SET(p->flow->autofp_tmqh_flow_qid, qid); } - ctx->queues[qid].total_packets++; } else { qid = ctx->last++; if (ctx->last == ctx->size) ctx->last = 0; } + SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1); PacketQueue *q = ctx->queues[qid].q; SCMutexLock(&q->mutex_q); @@ -265,7 +275,7 @@ void TmqhOutputFlowActivePackets(ThreadVars *tv, Packet *p) /* if no flow we use the first queue, * should be rare */ if (p->flow != NULL) { - qid = p->flow->autofp_tmqh_flow_qid; + qid = SC_ATOMIC_GET(p->flow->autofp_tmqh_flow_qid); if (qid == -1) { uint16_t i = 0; int lowest_id = 0; @@ -277,17 +287,17 @@ void TmqhOutputFlowActivePackets(ThreadVars *tv, Packet *p) lowest_id = i; } } - p->flow->autofp_tmqh_flow_qid = qid = lowest_id; - SC_ATOMIC_ADD(ctx->queues[qid].active_flows, 1); - queues[qid].total_flows++; + qid = lowest_id; + SC_ATOMIC_SET(p->flow->autofp_tmqh_flow_qid, lowest_id); + SC_ATOMIC_ADD(ctx->queues[qid].total_flows, 1); } - ctx->queues[qid].total_packets++; } else { qid = ctx->last++; if (ctx->last == ctx->size) ctx->last = 0; } + SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1); PacketQueue *q = ctx->queues[qid].q; SCMutexLock(&q->mutex_q); @@ -298,76 +308,6 @@ void TmqhOutputFlowActivePackets(ThreadVars *tv, Packet *p) return; } -/** - * \brief select the queue to output to based on active flows. - * - * \param tv thread vars - * \param p packet - */ -void TmqhOutputFlowActiveFlows(ThreadVars *tv, Packet *p) -{ - int32_t qid = 0; - - TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx; - - /* if no flow we use the first queue, - * should be rare */ - if (p->flow != NULL) { - qid = p->flow->autofp_tmqh_flow_qid; - if (qid == -1) { - uint32_t i = 0; - int lowest_id = 0; - uint32_t lowest = ctx->queues[i].active_flows_sc_atomic__; - for (i = 1; i < ctx->size; i++) { - if (ctx->queues[i].active_flows_sc_atomic__ < lowest) { - lowest = ctx->queues[i].active_flows_sc_atomic__; - lowest_id = i; - } - } - p->flow->autofp_tmqh_flow_qid = qid = lowest_id; - SC_ATOMIC_ADD(ctx->queues[qid].active_flows, 1); - ctx->queues[qid].total_flows++; - } - ctx->queues[qid].total_packets++; - } else { - qid = ctx->last++; - - if (ctx->last == ctx->size) - ctx->last = 0; - } - - PacketQueue *q = ctx->queues[qid].q; - SCMutexLock(&q->mutex_q); - PacketEnqueue(q, p); - SCCondSignal(&q->cond_q); - SCMutexUnlock(&q->mutex_q); - - return; -} - -/** - * \brief Prints flow q handler statistics. - * - * Requires engine to be dead when this function's called. - */ -void TmqhFlowPrintStatistics(void) -{ - uint32_t i; - - SCLogInfo("AutoFP - Total flow handler queues - %" PRIu16, - tmqh_flow_outctx->size); - for (i = 0; i < tmqh_flow_outctx->size; i++) { - SCLogInfo("AutoFP - Total Packets - Queue %"PRIu32 " - %"PRIu64 , i, - tmqh_flow_outctx->queues[i].total_packets); - } - for (i = 0; i < tmqh_flow_outctx->size; i++) { - SCLogInfo("AutoFP - Total Flows - Queue %"PRIu32 " - %"PRIu64 , i, - tmqh_flow_outctx->queues[i].total_flows); - } - - return; -} - #ifdef UNITTESTS static int TmqhOutputFlowSetupCtxTest01(void) diff --git a/src/tmqh-flow.h b/src/tmqh-flow.h index 84ea3bcdf7..adcc70eb84 100644 --- a/src/tmqh-flow.h +++ b/src/tmqh-flow.h @@ -27,9 +27,8 @@ typedef struct TmqhFlowMode_ { PacketQueue *q; - SC_ATOMIC_DECLARE(uint64_t, active_flows); - uint64_t total_packets; - uint64_t total_flows; + SC_ATOMIC_DECLARE(uint64_t, total_packets); + SC_ATOMIC_DECLARE(uint64_t, total_flows); } TmqhFlowMode; /** \brief Ctx for the flow queue handler @@ -41,20 +40,10 @@ typedef struct TmqhFlowCtx_ { TmqhFlowMode *queues; - uint16_t round_robin_idx; + SC_ATOMIC_DECLARE(uint16_t, round_robin_idx); } TmqhFlowCtx; -extern TmqhFlowCtx *tmqh_flow_outctx; - -static inline void TmqhFlowUpdateActiveFlows(Flow *f) -{ - SC_ATOMIC_SUB(tmqh_flow_outctx->queues[f->autofp_tmqh_flow_qid].active_flows, 1); - - return; -} - void TmqhFlowRegister (void); void TmqhFlowRegisterTests(void); -void TmqhFlowPrintStatistics(void); #endif /* __TMQH_FLOW_H__ */ diff --git a/src/util-atomic.h b/src/util-atomic.h index d48ce6c36d..adf4194189 100644 --- a/src/util-atomic.h +++ b/src/util-atomic.h @@ -198,6 +198,21 @@ var; \ }) +/** + * \brief Set the value for the atomic variable. + * + * \retval var value + */ +#define SC_ATOMIC_SET(name, val) ({ \ + typeof(name ## _sc_atomic__) var; \ + do { \ + SCSpinLock(&(name ## _sc_lock__)); \ + var = (name ## _sc_atomic__) = val; \ + SCSpinUnlock(&(name ## _sc_lock__)); \ + } while (0); \ + var; \ +}) + /** * \brief atomic Compare and Switch * @@ -441,6 +456,16 @@ #define SC_ATOMIC_GET(name) \ (name ## _sc_atomic__) +/** + * \brief Set the value for the atomic variable. + * + * \retval var value + */ +#define SC_ATOMIC_SET(name, val) ({ \ + while (SC_ATOMIC_CAS(&name, SC_ATOMIC_GET(name), val) == 0) \ + ; \ + }) + #endif /* !no atomic operations */ #endif /* __UTIL_ATOMIC_H__ */ diff --git a/suricata.yaml.in b/suricata.yaml.in index cde848a957..b9a9e91a45 100644 --- a/suricata.yaml.in +++ b/suricata.yaml.in @@ -24,8 +24,7 @@ # round_robin - Flow alloted to queue in a round robin fashion. # active-packets - Flow alloted to queue that has the least no of # unprocessed packets. -# active-flows - Flow alloted to queue that has least no of active flows. -autofp-scheduler: round-robin +#autofp-scheduler: active_packets # Default pid file. # Will use this file if no --pidfile in command options.