|
|
|
@ -40,7 +40,7 @@
|
|
|
|
|
#include "util-unittest.h"
|
|
|
|
|
|
|
|
|
|
Packet *TmqhInputFlow(ThreadVars *t);
|
|
|
|
|
void TmqhOutputFlowActiveFlows(ThreadVars *t, Packet *p);
|
|
|
|
|
void TmqhOutputFlowHash(ThreadVars *t, Packet *p);
|
|
|
|
|
void TmqhOutputFlowActivePackets(ThreadVars *t, Packet *p);
|
|
|
|
|
void TmqhOutputFlowRoundRobin(ThreadVars *t, Packet *p);
|
|
|
|
|
void *TmqhOutputFlowSetupCtx(char *queue_str);
|
|
|
|
@ -59,12 +59,15 @@ void TmqhFlowRegister(void)
|
|
|
|
|
|
|
|
|
|
char *scheduler = NULL;
|
|
|
|
|
if (ConfGet("autofp-scheduler", &scheduler) == 1) {
|
|
|
|
|
if (strcmp(scheduler, "round_robin") == 0) {
|
|
|
|
|
if (strcasecmp(scheduler, "round_robin") == 0) {
|
|
|
|
|
SCLogInfo("AutoFP mode using \"Round Robin\" Q Handler");
|
|
|
|
|
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowRoundRobin;
|
|
|
|
|
} else if (strcmp(scheduler, "active_packets") == 0) {
|
|
|
|
|
} else if (strcasecmp(scheduler, "active_packets") == 0) {
|
|
|
|
|
SCLogInfo("AutoFP mode using \"Active Packets\" Q Handler");
|
|
|
|
|
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets;
|
|
|
|
|
} else if (strcasecmp(scheduler, "hash") == 0) {
|
|
|
|
|
SCLogInfo("AutoFP mode using \"Hash\" Q Handler");
|
|
|
|
|
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
|
|
|
|
|
} else {
|
|
|
|
|
SCLogError(SC_ERR_INVALID_YAML_CONF_ENTRY, "Invalid entry \"%s\" "
|
|
|
|
|
"for autofp-scheduler in conf. Killing engine.",
|
|
|
|
@ -308,6 +311,53 @@ void TmqhOutputFlowActivePackets(ThreadVars *tv, Packet *p)
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* \brief select the queue to output based on address hash.
|
|
|
|
|
*
|
|
|
|
|
* \param tv thread vars.
|
|
|
|
|
* \param p packet.
|
|
|
|
|
*/
|
|
|
|
|
void TmqhOutputFlowHash(ThreadVars *tv, Packet *p)
|
|
|
|
|
{
|
|
|
|
|
int32_t qid = 0;
|
|
|
|
|
|
|
|
|
|
TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
|
|
|
|
|
|
|
|
|
|
/* if no flow we use the first queue,
|
|
|
|
|
* should be rare */
|
|
|
|
|
if (p->flow != NULL) {
|
|
|
|
|
qid = SC_ATOMIC_GET(p->flow->autofp_tmqh_flow_qid);
|
|
|
|
|
if (qid == -1) {
|
|
|
|
|
#if __WORDSIZE == 64
|
|
|
|
|
uint64_t addr = (uint64_t)p->flow;
|
|
|
|
|
#else
|
|
|
|
|
uint32_t addr = (uint32_t)p->flow;
|
|
|
|
|
#endif
|
|
|
|
|
addr >>= 7;
|
|
|
|
|
|
|
|
|
|
/* we don't have to worry about possible overflow, since
|
|
|
|
|
* ctx->size will be lesser than 2 ** 31 for sure */
|
|
|
|
|
qid = addr % ctx->size;
|
|
|
|
|
SC_ATOMIC_SET(p->flow->autofp_tmqh_flow_qid, qid);
|
|
|
|
|
SC_ATOMIC_ADD(ctx->queues[qid].total_flows, 1);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
qid = ctx->last++;
|
|
|
|
|
|
|
|
|
|
if (ctx->last == ctx->size)
|
|
|
|
|
ctx->last = 0;
|
|
|
|
|
}
|
|
|
|
|
SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1);
|
|
|
|
|
|
|
|
|
|
PacketQueue *q = ctx->queues[qid].q;
|
|
|
|
|
SCMutexLock(&q->mutex_q);
|
|
|
|
|
PacketEnqueue(q, p);
|
|
|
|
|
SCCondSignal(&q->cond_q);
|
|
|
|
|
SCMutexUnlock(&q->mutex_q);
|
|
|
|
|
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#ifdef UNITTESTS
|
|
|
|
|
|
|
|
|
|
static int TmqhOutputFlowSetupCtxTest01(void)
|
|
|
|
|