Adapt flow tmqh counters to be atomic vars. Remove support for active flows q handler. Introduce SC_ATOMIC_SET

remotes/origin/master
Anoop Saldanha 13 years ago committed by Victor Julien
parent 3faed5fe79
commit 5ffb050ada

@ -62,7 +62,7 @@
(f)->hprev = NULL; \
(f)->lnext = NULL; \
(f)->lprev = NULL; \
(f)->autofp_tmqh_flow_qid = -1; \
SC_ATOMIC_SET((f)->autofp_tmqh_flow_qid, -1); \
RESET_COUNTERS((f)); \
} while (0)
@ -93,9 +93,8 @@
(f)->tag_list = NULL; \
GenericVarFree((f)->flowvar); \
(f)->flowvar = NULL; \
if ((f)->autofp_tmqh_flow_qid != -1) { \
TmqhFlowUpdateActiveFlows((f)); \
(f)->autofp_tmqh_flow_qid = -1; \
if (SC_ATOMIC_GET((f)->autofp_tmqh_flow_qid) != -1) { \
SC_ATOMIC_SET((f)->autofp_tmqh_flow_qid, -1); \
} \
RESET_COUNTERS((f)); \
} while(0)
@ -111,8 +110,8 @@
DetectTagDataListFree((f)->tag_list); \
GenericVarFree((f)->flowvar); \
SCMutexDestroy(&(f)->de_state_m); \
if ((f)->autofp_tmqh_flow_qid != -1) { \
TmqhFlowUpdateActiveFlows((f)); \
if (SC_ATOMIC_GET((f)->autofp_tmqh_flow_qid) != -1) { \
SC_ATOMIC_DESTROY((f)->autofp_tmqh_flow_qid); \
} \
(f)->tag_list = NULL; \
} while(0)

@ -312,7 +312,7 @@ typedef struct Flow_
uint64_t bytecnt;
#endif
int32_t autofp_tmqh_flow_qid;
SC_ATOMIC_DECLARE(int, autofp_tmqh_flow_qid);
} Flow;
enum {

@ -1902,8 +1902,6 @@ int main(int argc, char **argv)
#endif /* OS_WIN32 */
SC_ATOMIC_DESTROY(engine_stage);
if (strcasecmp(RunmodeGetActive(), "autofp") == 0)
TmqhFlowPrintStatistics();
exit(engine_retval);
}

@ -73,6 +73,7 @@ typedef struct ThreadVars_ {
Tmq *inq;
Tmq *outq;
void *outctx;
char *outqh_name;
/** queue handlers */
struct Packet_ * (*tmqh_in)(struct ThreadVars_ *);

@ -1192,6 +1192,7 @@ ThreadVars *TmThreadCreate(char *name, char *inq_name, char *inqh_name,
goto error;
tv->tmqh_out = tmqh->OutHandler;
tv->outqh_name = tmqh->name;
if (outq_name != NULL && strcmp(outq_name, "packetpool") != 0) {
SCLogDebug("outq_name \"%s\"", outq_name);
@ -1430,6 +1431,16 @@ void TmThreadKillThread(ThreadVars *tv)
SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
}
if (tv->outctx != NULL) {
Tmqh *tmqh = TmqhGetQueueHandlerByName(tv->outqh_name);
if (tmqh == NULL)
BUG_ON(1);
if (tmqh->OutHandlerCtxFree != NULL) {
tmqh->OutHandlerCtxFree(tv->outctx);
}
}
if (tv->cond != NULL ) {
int cnt = 0;
while (1) {

@ -62,9 +62,6 @@ void TmqhFlowRegister(void)
if (strcmp(scheduler, "round_robin") == 0) {
SCLogInfo("AutoFP mode using \"Round Robin\" Q Handler");
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowRoundRobin;
} else if (strcmp(scheduler, "active_flows") == 0) {
SCLogInfo("AutoFP mode using \"Active Flows\" Q Handler");
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActiveFlows;
} else if (strcmp(scheduler, "active_packets") == 0) {
SCLogInfo("AutoFP mode using \"Active Packets\" Q Handler");
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets;
@ -75,8 +72,8 @@ void TmqhFlowRegister(void)
exit(EXIT_FAILURE);
}
} else {
SCLogInfo("AutoFP mode using default \"Round Robin\" Q Handler");
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowRoundRobin;
SCLogInfo("AutoFP mode using default \"Active Packets\" Q Handler");
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets;
}
return;
@ -134,7 +131,8 @@ static int StoreQueueId(TmqhFlowCtx *ctx, char *name)
memset(ctx->queues + (ctx->size - 1), 0, sizeof(TmqhFlowMode));
}
ctx->queues[ctx->size - 1].q = &trans_q[id];
SC_ATOMIC_INIT(ctx->queues[ctx->size - 1].active_flows);
SC_ATOMIC_INIT(ctx->queues[ctx->size - 1].total_packets);
SC_ATOMIC_INIT(ctx->queues[ctx->size - 1].total_flows);
return 0;
}
@ -185,6 +183,8 @@ void *TmqhOutputFlowSetupCtx(char *queue_str)
tmqh_flow_outctx = ctx;
SC_ATOMIC_INIT(ctx->round_robin_idx);
SCFree(str);
return (void *)ctx;
@ -200,8 +200,15 @@ void TmqhOutputFlowFreeCtx(void *ctx)
int i;
TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
SCLogInfo("AutoFP - Total flow handler queues - %" PRIu16,
tmqh_flow_outctx->size);
for (i = 0; i < fctx->size; i++) {
SC_ATOMIC_DESTROY(fctx->queues[i].active_flows);
SCLogInfo("AutoFP - Total Packets - Queue %"PRIu32 " - %"PRIu64 , i,
SC_ATOMIC_GET(fctx->queues[i].total_packets));
SCLogInfo("AutoFP - Total Flows - Queue %"PRIu32 " - %"PRIu64 , i,
SC_ATOMIC_GET(fctx->queues[i].total_flows));
SC_ATOMIC_DESTROY(fctx->queues[i].total_packets);
SC_ATOMIC_DESTROY(fctx->queues[i].total_flows);
}
SCFree(fctx->queues);
@ -226,20 +233,23 @@ void TmqhOutputFlowRoundRobin(ThreadVars *tv, Packet *p)
/* if no flow we use the first queue,
* should be rare */
if (p->flow != NULL) {
qid = p->flow->autofp_tmqh_flow_qid;
qid = SC_ATOMIC_GET(p->flow->autofp_tmqh_flow_qid);
if (qid == -1) {
p->flow->autofp_tmqh_flow_qid = qid = ctx->round_robin_idx++;
ctx->round_robin_idx = ctx->round_robin_idx % ctx->size;
SC_ATOMIC_ADD(ctx->queues[qid].active_flows, 1);
ctx->queues[qid].total_flows++;
qid = SC_ATOMIC_ADD(ctx->round_robin_idx, 1);
if (qid >= ctx->size) {
SC_ATOMIC_RESET(ctx->round_robin_idx);
qid = 0;
}
SC_ATOMIC_ADD(ctx->queues[qid].total_flows, 1);
SC_ATOMIC_SET(p->flow->autofp_tmqh_flow_qid, qid);
}
ctx->queues[qid].total_packets++;
} else {
qid = ctx->last++;
if (ctx->last == ctx->size)
ctx->last = 0;
}
SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1);
PacketQueue *q = ctx->queues[qid].q;
SCMutexLock(&q->mutex_q);
@ -265,7 +275,7 @@ void TmqhOutputFlowActivePackets(ThreadVars *tv, Packet *p)
/* if no flow we use the first queue,
* should be rare */
if (p->flow != NULL) {
qid = p->flow->autofp_tmqh_flow_qid;
qid = SC_ATOMIC_GET(p->flow->autofp_tmqh_flow_qid);
if (qid == -1) {
uint16_t i = 0;
int lowest_id = 0;
@ -277,17 +287,17 @@ void TmqhOutputFlowActivePackets(ThreadVars *tv, Packet *p)
lowest_id = i;
}
}
p->flow->autofp_tmqh_flow_qid = qid = lowest_id;
SC_ATOMIC_ADD(ctx->queues[qid].active_flows, 1);
queues[qid].total_flows++;
qid = lowest_id;
SC_ATOMIC_SET(p->flow->autofp_tmqh_flow_qid, lowest_id);
SC_ATOMIC_ADD(ctx->queues[qid].total_flows, 1);
}
ctx->queues[qid].total_packets++;
} else {
qid = ctx->last++;
if (ctx->last == ctx->size)
ctx->last = 0;
}
SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1);
PacketQueue *q = ctx->queues[qid].q;
SCMutexLock(&q->mutex_q);
@ -298,76 +308,6 @@ void TmqhOutputFlowActivePackets(ThreadVars *tv, Packet *p)
return;
}
/**
* \brief select the queue to output to based on active flows.
*
* \param tv thread vars
* \param p packet
*/
void TmqhOutputFlowActiveFlows(ThreadVars *tv, Packet *p)
{
int32_t qid = 0;
TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
/* if no flow we use the first queue,
* should be rare */
if (p->flow != NULL) {
qid = p->flow->autofp_tmqh_flow_qid;
if (qid == -1) {
uint32_t i = 0;
int lowest_id = 0;
uint32_t lowest = ctx->queues[i].active_flows_sc_atomic__;
for (i = 1; i < ctx->size; i++) {
if (ctx->queues[i].active_flows_sc_atomic__ < lowest) {
lowest = ctx->queues[i].active_flows_sc_atomic__;
lowest_id = i;
}
}
p->flow->autofp_tmqh_flow_qid = qid = lowest_id;
SC_ATOMIC_ADD(ctx->queues[qid].active_flows, 1);
ctx->queues[qid].total_flows++;
}
ctx->queues[qid].total_packets++;
} else {
qid = ctx->last++;
if (ctx->last == ctx->size)
ctx->last = 0;
}
PacketQueue *q = ctx->queues[qid].q;
SCMutexLock(&q->mutex_q);
PacketEnqueue(q, p);
SCCondSignal(&q->cond_q);
SCMutexUnlock(&q->mutex_q);
return;
}
/**
* \brief Prints flow q handler statistics.
*
* Requires engine to be dead when this function's called.
*/
void TmqhFlowPrintStatistics(void)
{
uint32_t i;
SCLogInfo("AutoFP - Total flow handler queues - %" PRIu16,
tmqh_flow_outctx->size);
for (i = 0; i < tmqh_flow_outctx->size; i++) {
SCLogInfo("AutoFP - Total Packets - Queue %"PRIu32 " - %"PRIu64 , i,
tmqh_flow_outctx->queues[i].total_packets);
}
for (i = 0; i < tmqh_flow_outctx->size; i++) {
SCLogInfo("AutoFP - Total Flows - Queue %"PRIu32 " - %"PRIu64 , i,
tmqh_flow_outctx->queues[i].total_flows);
}
return;
}
#ifdef UNITTESTS
static int TmqhOutputFlowSetupCtxTest01(void)

@ -27,9 +27,8 @@
typedef struct TmqhFlowMode_ {
PacketQueue *q;
SC_ATOMIC_DECLARE(uint64_t, active_flows);
uint64_t total_packets;
uint64_t total_flows;
SC_ATOMIC_DECLARE(uint64_t, total_packets);
SC_ATOMIC_DECLARE(uint64_t, total_flows);
} TmqhFlowMode;
/** \brief Ctx for the flow queue handler
@ -41,20 +40,10 @@ typedef struct TmqhFlowCtx_ {
TmqhFlowMode *queues;
uint16_t round_robin_idx;
SC_ATOMIC_DECLARE(uint16_t, round_robin_idx);
} TmqhFlowCtx;
extern TmqhFlowCtx *tmqh_flow_outctx;
static inline void TmqhFlowUpdateActiveFlows(Flow *f)
{
SC_ATOMIC_SUB(tmqh_flow_outctx->queues[f->autofp_tmqh_flow_qid].active_flows, 1);
return;
}
void TmqhFlowRegister (void);
void TmqhFlowRegisterTests(void);
void TmqhFlowPrintStatistics(void);
#endif /* __TMQH_FLOW_H__ */

@ -198,6 +198,21 @@
var; \
})
/**
* \brief Set the value for the atomic variable.
*
* \retval var value
*/
#define SC_ATOMIC_SET(name, val) ({ \
typeof(name ## _sc_atomic__) var; \
do { \
SCSpinLock(&(name ## _sc_lock__)); \
var = (name ## _sc_atomic__) = val; \
SCSpinUnlock(&(name ## _sc_lock__)); \
} while (0); \
var; \
})
/**
* \brief atomic Compare and Switch
*
@ -441,6 +456,16 @@
#define SC_ATOMIC_GET(name) \
(name ## _sc_atomic__)
/**
* \brief Set the value for the atomic variable.
*
* \retval var value
*/
#define SC_ATOMIC_SET(name, val) ({ \
while (SC_ATOMIC_CAS(&name, SC_ATOMIC_GET(name), val) == 0) \
; \
})
#endif /* !no atomic operations */
#endif /* __UTIL_ATOMIC_H__ */

@ -24,8 +24,7 @@
# round_robin - Flow alloted to queue in a round robin fashion.
# active-packets - Flow alloted to queue that has the least no of
# unprocessed packets.
# active-flows - Flow alloted to queue that has least no of active flows.
autofp-scheduler: round-robin
#autofp-scheduler: active_packets
# Default pid file.
# Will use this file if no --pidfile in command options.

Loading…
Cancel
Save