Minor flowq updates.

remotes/origin/master
Victor Julien 13 years ago
parent 7115fa3e72
commit 40ed10ab38

@ -253,6 +253,9 @@ typedef struct Flow_
*/
SC_ATOMIC_DECLARE(unsigned short, use_cnt);
/** flow queue id, used with autofp */
SC_ATOMIC_DECLARE(int, autofp_tmqh_flow_qid);
uint32_t probing_parser_toserver_al_proto_masks;
uint32_t probing_parser_toclient_al_proto_masks;
@ -311,8 +314,6 @@ typedef struct Flow_
uint32_t tosrcpktcnt;
uint64_t bytecnt;
#endif
SC_ATOMIC_DECLARE(int, autofp_tmqh_flow_qid);
} Flow;
enum {

@ -59,14 +59,14 @@ void TmqhFlowRegister(void)
char *scheduler = NULL;
if (ConfGet("autofp-scheduler", &scheduler) == 1) {
if (strcasecmp(scheduler, "round_robin") == 0) {
SCLogInfo("AutoFP mode using \"Round Robin\" Q Handler");
if (strcasecmp(scheduler, "round-robin") == 0) {
SCLogInfo("AutoFP mode using \"Round Robin\" flow load balancer");
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowRoundRobin;
} else if (strcasecmp(scheduler, "active_packets") == 0) {
SCLogInfo("AutoFP mode using \"Active Packets\" Q Handler");
} else if (strcasecmp(scheduler, "active-packets") == 0) {
SCLogInfo("AutoFP mode using \"Active Packets\" flow load balancer");
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets;
} else if (strcasecmp(scheduler, "hash") == 0) {
SCLogInfo("AutoFP mode using \"Hash\" Q Handler");
SCLogInfo("AutoFP mode using \"Hash\" flow load balancer");
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
} else {
SCLogError(SC_ERR_INVALID_YAML_CONF_ENTRY, "Invalid entry \"%s\" "
@ -75,7 +75,7 @@ void TmqhFlowRegister(void)
exit(EXIT_FAILURE);
}
} else {
SCLogInfo("AutoFP mode using default \"Active Packets\" Q Handler");
SCLogInfo("AutoFP mode using default \"Active Packets\" flow load balancer");
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets;
}
@ -87,14 +87,14 @@ Packet *TmqhInputFlow(ThreadVars *tv)
{
PacketQueue *q = &trans_q[tv->inq->id];
SCPerfSyncCountersIfSignalled(tv, 0);
SCMutexLock(&q->mutex_q);
if (q->len == 0) {
/* if we have no packets in queue, wait... */
SCCondWait(&q->cond_q, &q->mutex_q);
}
SCPerfSyncCountersIfSignalled(tv, 0);
if (q->len > 0) {
Packet *p = PacketDequeue(q);
SCMutexUnlock(&q->mutex_q);
@ -206,10 +206,9 @@ void TmqhOutputFlowFreeCtx(void *ctx)
SCLogInfo("AutoFP - Total flow handler queues - %" PRIu16,
tmqh_flow_outctx->size);
for (i = 0; i < fctx->size; i++) {
SCLogInfo("AutoFP - Total Packets - Queue %"PRIu32 " - %"PRIu64 , i,
SC_ATOMIC_GET(fctx->queues[i].total_packets));
SCLogInfo("AutoFP - Total Flows - Queue %"PRIu32 " - %"PRIu64 , i,
SC_ATOMIC_GET(fctx->queues[i].total_flows));
SCLogInfo("AutoFP - Queue %-2"PRIu32 " - pkts: %-12"PRIu64" flows: %-12"PRIu64, i,
SC_ATOMIC_GET(fctx->queues[i].total_packets),
SC_ATOMIC_GET(fctx->queues[i].total_flows));
SC_ATOMIC_DESTROY(fctx->queues[i].total_packets);
SC_ATOMIC_DESTROY(fctx->queues[i].total_flows);
}

@ -46,6 +46,8 @@ Packet *TmqhInputSimple(ThreadVars *t)
{
PacketQueue *q = &trans_q[t->inq->id];
SCPerfSyncCountersIfSignalled(t, 0);
SCMutexLock(&q->mutex_q);
if (q->len == 0) {
@ -53,8 +55,6 @@ Packet *TmqhInputSimple(ThreadVars *t)
SCCondWait(&q->cond_q, &q->mutex_q);
}
SCPerfSyncCountersIfSignalled(t, 0);
if (q->len > 0) {
Packet *p = PacketDequeue(q);
SCMutexUnlock(&q->mutex_q);

@ -20,12 +20,16 @@
#runmode: auto
# Specifies the kind of q scheduler used by flow pinned autofp mode.
# Supported scheduler are :
# round_robin - Flow alloted to queue in a round robin fashion.
# active-packets - Flow alloted to queue that has the least no of
#
# Supported schedulers are:
#
# round-robin - Flows assigned to threads in a round robin fashion.
# active-packets - Flows assigned to threads that have the lowest number of
# unprocessed packets.
# hash - Flow alloted usihng the address hash. More of a random technique.
#autofp-scheduler: active_packets
# hash - Flow alloted usihng the address hash. More of a random technique. Was
# the default in 1.2.1 and older.
#
autofp-scheduler: active-packets
# Default pid file.
# Will use this file if no --pidfile in command options.

Loading…
Cancel
Save