|
|
|
|
@ -39,24 +39,34 @@
|
|
|
|
|
|
|
|
|
|
#include "util-unittest.h"
|
|
|
|
|
|
|
|
|
|
typedef struct TmqhFlowMode_ {
|
|
|
|
|
PacketQueue *q;
|
|
|
|
|
|
|
|
|
|
SC_ATOMIC_DECLARE(uint64_t, active_flows);
|
|
|
|
|
SC_ATOMIC_DECLARE(uint64_t, total_packets);
|
|
|
|
|
} TmqhFlowMode;
|
|
|
|
|
|
|
|
|
|
/** \brief Ctx for the flow queue handler
|
|
|
|
|
* \param size number of queues to output to
|
|
|
|
|
* \param queues array of queue id's this flow handler outputs to */
|
|
|
|
|
typedef struct TmqhFlowCtx_ {
|
|
|
|
|
uint16_t size;
|
|
|
|
|
uint16_t *queues;
|
|
|
|
|
uint16_t last;
|
|
|
|
|
|
|
|
|
|
TmqhFlowMode *queues;
|
|
|
|
|
|
|
|
|
|
uint16_t round_robin_idx;
|
|
|
|
|
} TmqhFlowCtx;
|
|
|
|
|
|
|
|
|
|
Packet *TmqhInputFlow(ThreadVars *t);
|
|
|
|
|
void TmqhOutputFlow(ThreadVars *t, Packet *p);
|
|
|
|
|
void TmqhOutputFlowRoundRobin(ThreadVars *t, Packet *p);
|
|
|
|
|
void *TmqhOutputFlowSetupCtx(char *queue_str);
|
|
|
|
|
void TmqhFlowRegisterTests(void);
|
|
|
|
|
|
|
|
|
|
void TmqhFlowRegister (void) {
|
|
|
|
|
tmqh_table[TMQH_FLOW].name = "flow";
|
|
|
|
|
tmqh_table[TMQH_FLOW].InHandler = TmqhInputFlow;
|
|
|
|
|
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlow;
|
|
|
|
|
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowRoundRobin;
|
|
|
|
|
tmqh_table[TMQH_FLOW].OutHandlerCtxSetup = TmqhOutputFlowSetupCtx;
|
|
|
|
|
tmqh_table[TMQH_FLOW].OutHandlerCtxFree = NULL;
|
|
|
|
|
tmqh_table[TMQH_FLOW].RegisterTests = TmqhFlowRegisterTests;
|
|
|
|
|
@ -96,23 +106,24 @@ static int StoreQueueId(TmqhFlowCtx *ctx, char *name) {
|
|
|
|
|
tmq->writer_cnt++;
|
|
|
|
|
|
|
|
|
|
uint16_t id = tmq->id;
|
|
|
|
|
//printf("StoreQueueId: id %u\n", id);
|
|
|
|
|
|
|
|
|
|
if (ctx->queues == NULL) {
|
|
|
|
|
ctx->size = 1;
|
|
|
|
|
ctx->queues = SCMalloc(ctx->size * sizeof(uint16_t));
|
|
|
|
|
ctx->queues = SCMalloc(ctx->size * sizeof(TmqhFlowMode));
|
|
|
|
|
memset(ctx->queues, 0, ctx->size * sizeof(sizeof(TmqhFlowMode)));
|
|
|
|
|
if (ctx->queues == NULL) {
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
ctx->size++;
|
|
|
|
|
ctx->queues = SCRealloc(ctx->queues, ctx->size * sizeof(uint16_t));
|
|
|
|
|
ctx->queues = SCRealloc(ctx->queues, ctx->size * sizeof(TmqhFlowMode));
|
|
|
|
|
if (ctx->queues == NULL) {
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
memset(ctx->queues + (ctx->size - 1), 0, sizeof(sizeof(TmqhFlowMode)));
|
|
|
|
|
}
|
|
|
|
|
if (ctx->queues == NULL) {
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
ctx->queues[ctx->size - 1].q = &trans_q[id];
|
|
|
|
|
|
|
|
|
|
ctx->queues[ctx->size - 1] = id;
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ -170,37 +181,28 @@ error:
|
|
|
|
|
* \param tv thread vars
|
|
|
|
|
* \param p packet
|
|
|
|
|
*/
|
|
|
|
|
void TmqhOutputFlow(ThreadVars *tv, Packet *p)
|
|
|
|
|
void TmqhOutputFlowRoundRobin(ThreadVars *tv, Packet *p)
|
|
|
|
|
{
|
|
|
|
|
uint16_t qid = 0;
|
|
|
|
|
int32_t qid = 0;
|
|
|
|
|
|
|
|
|
|
TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
|
|
|
|
|
if (ctx == NULL) {
|
|
|
|
|
abort();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* if no flow we use the first queue,
|
|
|
|
|
* should be rare */
|
|
|
|
|
if (p->flow != NULL) {
|
|
|
|
|
#if __WORDSIZE == 64
|
|
|
|
|
uint64_t addr = (uint64_t)p->flow;
|
|
|
|
|
#else
|
|
|
|
|
uint32_t addr = (uint32_t)p->flow;
|
|
|
|
|
#endif
|
|
|
|
|
addr >>= 7;
|
|
|
|
|
|
|
|
|
|
uint16_t idx = addr % ctx->size;
|
|
|
|
|
qid = ctx->queues[idx];
|
|
|
|
|
qid = p->flow->autofp_tmqh_flow_qid;
|
|
|
|
|
if (qid == -1) {
|
|
|
|
|
p->flow->autofp_tmqh_flow_qid = qid = ctx->round_robin_idx++;
|
|
|
|
|
ctx->round_robin_idx = ctx->round_robin_idx % ctx->size;
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
ctx->last++;
|
|
|
|
|
qid = ctx->last++;
|
|
|
|
|
|
|
|
|
|
if (ctx->last == ctx->size)
|
|
|
|
|
ctx->last = 0;
|
|
|
|
|
|
|
|
|
|
qid = ctx->queues[ctx->last];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
PacketQueue *q = &trans_q[qid];
|
|
|
|
|
PacketQueue *q = ctx->queues[qid].q;
|
|
|
|
|
SCMutexLock(&q->mutex_q);
|
|
|
|
|
PacketEnqueue(q, p);
|
|
|
|
|
SCCondSignal(&q->cond_q);
|
|
|
|
|
@ -208,6 +210,8 @@ void TmqhOutputFlow(ThreadVars *tv, Packet *p)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#ifdef UNITTESTS
|
|
|
|
|
|
|
|
|
|
#if 0
|
|
|
|
|
static int TmqhOutputFlowSetupCtxTest01(void) {
|
|
|
|
|
int retval = 0;
|
|
|
|
|
Tmq *tmq = NULL;
|
|
|
|
|
@ -331,14 +335,17 @@ end:
|
|
|
|
|
TmqResetQueues();
|
|
|
|
|
return retval;
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
#endif /* UNITTESTS */
|
|
|
|
|
|
|
|
|
|
void TmqhFlowRegisterTests(void) {
|
|
|
|
|
#ifdef UNITTESTS
|
|
|
|
|
#if 0
|
|
|
|
|
UtRegisterTest("TmqhOutputFlowSetupCtxTest01", TmqhOutputFlowSetupCtxTest01, 1);
|
|
|
|
|
UtRegisterTest("TmqhOutputFlowSetupCtxTest02", TmqhOutputFlowSetupCtxTest02, 1);
|
|
|
|
|
UtRegisterTest("TmqhOutputFlowSetupCtxTest03", TmqhOutputFlowSetupCtxTest03, 1);
|
|
|
|
|
#endif
|
|
|
|
|
#endif
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|