diff --git a/src/tmqh-flow.c b/src/tmqh-flow.c index ec2a634d44..7f25861adc 100644 --- a/src/tmqh-flow.c +++ b/src/tmqh-flow.c @@ -40,7 +40,7 @@ #include "util-unittest.h" Packet *TmqhInputFlow(ThreadVars *t); -void TmqhOutputFlowActiveFlows(ThreadVars *t, Packet *p); +void TmqhOutputFlowHash(ThreadVars *t, Packet *p); void TmqhOutputFlowActivePackets(ThreadVars *t, Packet *p); void TmqhOutputFlowRoundRobin(ThreadVars *t, Packet *p); void *TmqhOutputFlowSetupCtx(char *queue_str); @@ -59,12 +59,15 @@ void TmqhFlowRegister(void) char *scheduler = NULL; if (ConfGet("autofp-scheduler", &scheduler) == 1) { - if (strcmp(scheduler, "round_robin") == 0) { + if (strcasecmp(scheduler, "round_robin") == 0) { SCLogInfo("AutoFP mode using \"Round Robin\" Q Handler"); tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowRoundRobin; - } else if (strcmp(scheduler, "active_packets") == 0) { + } else if (strcasecmp(scheduler, "active_packets") == 0) { SCLogInfo("AutoFP mode using \"Active Packets\" Q Handler"); tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets; + } else if (strcasecmp(scheduler, "hash") == 0) { + SCLogInfo("AutoFP mode using \"Hash\" Q Handler"); + tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash; } else { SCLogError(SC_ERR_INVALID_YAML_CONF_ENTRY, "Invalid entry \"%s\" " "for autofp-scheduler in conf. Killing engine.", @@ -308,6 +311,53 @@ void TmqhOutputFlowActivePackets(ThreadVars *tv, Packet *p) return; } +/** + * \brief select the queue to output based on address hash. + * + * \param tv thread vars. + * \param p packet. + */ +void TmqhOutputFlowHash(ThreadVars *tv, Packet *p) +{ + int32_t qid = 0; + + TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx; + + /* if no flow we use the first queue, + * should be rare */ + if (p->flow != NULL) { + qid = SC_ATOMIC_GET(p->flow->autofp_tmqh_flow_qid); + if (qid == -1) { +#if __WORDSIZE == 64 + uint64_t addr = (uint64_t)p->flow; +#else + uint32_t addr = (uint32_t)p->flow; +#endif + addr >>= 7; + + /* we don't have to worry about possible overflow, since + * ctx->size will be lesser than 2 ** 31 for sure */ + qid = addr % ctx->size; + SC_ATOMIC_SET(p->flow->autofp_tmqh_flow_qid, qid); + SC_ATOMIC_ADD(ctx->queues[qid].total_flows, 1); + } + } else { + qid = ctx->last++; + + if (ctx->last == ctx->size) + ctx->last = 0; + } + SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1); + + PacketQueue *q = ctx->queues[qid].q; + SCMutexLock(&q->mutex_q); + PacketEnqueue(q, p); + SCCondSignal(&q->cond_q); + SCMutexUnlock(&q->mutex_q); + + return; +} + #ifdef UNITTESTS static int TmqhOutputFlowSetupCtxTest01(void) diff --git a/suricata.yaml.in b/suricata.yaml.in index b9a9e91a45..7b0f71f695 100644 --- a/suricata.yaml.in +++ b/suricata.yaml.in @@ -24,6 +24,7 @@ # round_robin - Flow alloted to queue in a round robin fashion. # active-packets - Flow alloted to queue that has the least no of # unprocessed packets. +# hash - Flow alloted usihng the address hash. More of a random technique. #autofp-scheduler: active_packets # Default pid file.