Implement single, autofp and workers run modes for DAG interfaces. Includes multiple interface support.

Remove auto mode due to bad performance.
remotes/origin/HEAD
Jason Ish 14 years ago committed by Victor Julien
parent 8e064001c3
commit 105173939b

@ -34,9 +34,20 @@
#include "util-time.h"
#include "util-cpu.h"
#include "util-affinity.h"
#include "util-runmodes.h"
static const char *default_mode;
static int DagConfigGetThreadCount(void *conf)
{
return 1;
}
static void *ParseDagConfig(const char *iface)
{
return (void *)iface;
}
const char *RunModeErfDagGetDefaultMode(void)
{
return default_mode;
@ -44,184 +55,105 @@ const char *RunModeErfDagGetDefaultMode(void)
void RunModeErfDagRegister(void)
{
default_mode = "auto";
RunModeRegisterNewRunMode(RUNMODE_DAG, "auto",
"Multi threaded Erf dag mode",
RunModeErfDagAuto);
default_mode = "autofp";
RunModeRegisterNewRunMode(RUNMODE_DAG, "autofp",
"Multi threaded DAG mode. Packets from "
"each flow are assigned to a single detect "
"thread, unlike \"dag_auto\" where packets "
"from the same flow can be processed by any "
"detect thread",
RunModeIdsErfDagAutoFp);
RunModeRegisterNewRunMode(RUNMODE_DAG, "single",
"Singled threaded DAG mode",
RunModeIdsErfDagSingle);
RunModeRegisterNewRunMode(RUNMODE_DAG, "workers",
"Workers DAG mode, each thread does all "
" tasks from acquisition to logging",
RunModeIdsErfDagWorkers);
return;
}
/**
*
* \brief Sets up support for reading from a DAG card.
*
* \param de_ctx
* \param file
* \notes Currently only supports a single interface.
*/
int RunModeErfDagAuto(DetectEngineCtx *de_ctx)
int RunModeIdsErfDagSingle(DetectEngineCtx *de_ctx)
{
SCEnter();
char tname[12];
uint16_t cpu = 0;
int ret;
/* Available cpus */
uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
SCEnter();
RunModeInitialize();
char *iface = NULL;
if (ConfGet("erf-dag.iface", &iface) == 0) {
SCLogError(SC_ERR_RUNMODE, "Failed retrieving erf-dag.iface from Conf");
TimeModeSetLive();
ret = RunModeSetLiveCaptureSingle(de_ctx,
ParseDagConfig,
DagConfigGetThreadCount,
"ReceiveErfDag",
"DecodeErfDag",
"RxDAG",
NULL);
if (ret != 0) {
SCLogError(SC_ERR_RUNMODE, "DAG single runmode failed to start");
exit(EXIT_FAILURE);
}
SCLogDebug("iface %s", iface);
TimeModeSetOffline();
SCLogInfo("RunModeIdsDagSingle initialised");
/* @TODO/JNM: We need to create a separate processing pipeliine for each
* interface supported by the
*/
ThreadVars *tv_receiveerf =
TmThreadCreatePacketHandler("ReceiveErfDag",
"packetpool","packetpool",
"pickup-queue","simple",
"1slot");
if (tv_receiveerf == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
}
TmModule *tm_module = TmModuleGetByName("ReceiveErfDag");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName failed for ReceiveErfDag\n");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv_receiveerf, tm_module, iface);
SCReturnInt(0);
}
if (threading_set_cpu_affinity) {
TmThreadSetCPUAffinity(tv_receiveerf, 0);
if (ncpus > 1)
TmThreadSetThreadPriority(tv_receiveerf, PRIO_MEDIUM);
}
int RunModeIdsErfDagAutoFp(DetectEngineCtx *de_ctx)
{
int ret;
if (TmThreadSpawn(tv_receiveerf) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
SCEnter();
ThreadVars *tv_decode1 =
TmThreadCreatePacketHandler("Decode & Stream",
"pickup-queue","simple",
"stream-queue1","simple",
"varslot");
if (tv_decode1 == NULL) {
printf("ERROR: TmThreadsCreate failed for Decode1\n");
exit(EXIT_FAILURE);
}
tm_module = TmModuleGetByName("DecodeErfDag");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName DecodeErfDag failed\n");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv_decode1, tm_module, NULL);
RunModeInitialize();
tm_module = TmModuleGetByName("StreamTcp");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName StreamTcp failed\n");
TimeModeSetLive();
ret = RunModeSetLiveCaptureAutoFp(de_ctx,
ParseDagConfig,
DagConfigGetThreadCount,
"ReceiveErfDag",
"DecodeErfDag",
"RxDAG",
NULL);
if (ret != 0) {
SCLogError(SC_ERR_RUNMODE, "DAG autofp runmode failed to start");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv_decode1, tm_module, NULL);
if (threading_set_cpu_affinity) {
TmThreadSetCPUAffinity(tv_decode1, 0);
if (ncpus > 1)
TmThreadSetThreadPriority(tv_decode1, PRIO_MEDIUM);
}
SCLogInfo("RunModeIdsDagAutoFp initialised");
if (TmThreadSpawn(tv_decode1) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
/* start with cpu 1 so that if we're creating an odd number of detect
* threads we're not creating the most on CPU0. */
if (ncpus > 0)
cpu = 1;
/* always create at least one thread */
int thread_max = TmThreadGetNbThreads(DETECT_CPU_SET);
if (thread_max == 0)
thread_max = ncpus * threading_detect_ratio;
if (thread_max < 1)
thread_max = 1;
int thread;
for (thread = 0; thread < thread_max; thread++) {
snprintf(tname, sizeof(tname), "Detect%"PRIu16, thread+1);
char *thread_name = SCStrdup(tname);
SCLogDebug("Assigning %s affinity to cpu %u", thread_name, cpu);
ThreadVars *tv_detect_ncpu =
TmThreadCreatePacketHandler(thread_name,
"stream-queue1","simple",
"alert-queue1","simple",
"1slot");
if (tv_detect_ncpu == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
}
tm_module = TmModuleGetByName("Detect");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName Detect failed\n");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, (void *)de_ctx);
if (threading_set_cpu_affinity) {
TmThreadSetCPUAffinity(tv_detect_ncpu, (int)cpu);
/* If we have more than one core/cpu, the first Detect thread
* (at cpu 0) will have less priority (higher 'nice' value)
* In this case we will set the thread priority to +10 (default is 0)
*/
if (cpu == 0 && ncpus > 1) {
TmThreadSetThreadPriority(tv_detect_ncpu, PRIO_LOW);
} else if (ncpus > 1) {
TmThreadSetThreadPriority(tv_detect_ncpu, PRIO_MEDIUM);
}
}
if (TmThreadSpawn(tv_detect_ncpu) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
}
SCReturnInt(0);
}
ThreadVars *tv_outputs =
TmThreadCreatePacketHandler("Outputs",
"alert-queue1", "simple",
"packetpool", "packetpool",
"varslot");
if (tv_outputs == NULL) {
printf("ERROR: TmThreadCreatePacketHandler for Outputs failed\n");
exit(EXIT_FAILURE);
}
int RunModeIdsErfDagWorkers(DetectEngineCtx *de_ctx)
{
int ret;
SetupOutputs(tv_outputs);
SCEnter();
if (threading_set_cpu_affinity) {
TmThreadSetCPUAffinity(tv_outputs, 0);
if (ncpus > 1)
TmThreadSetThreadPriority(tv_outputs, PRIO_MEDIUM);
}
RunModeInitialize();
if (TmThreadSpawn(tv_outputs) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
TimeModeSetLive();
ret = RunModeSetLiveCaptureWorkers(de_ctx,
ParseDagConfig,
DagConfigGetThreadCount,
"ReceiveErfDag",
"DecodeErfDag",
"RxDAG",
NULL);
if (ret != 0) {
SCLogError(SC_ERR_RUNMODE, "DAG workers runmode failed to start");
exit(EXIT_FAILURE);
}
return 0;
SCLogInfo("RunModeIdsErfDagWorkers initialised");
SCReturnInt(0);
}

@ -23,7 +23,9 @@
#ifndef __RUNMODE_ERF_DAG_H__
#define __RUNMODE_ERF_DAG_H__
int RunModeErfDagAuto(DetectEngineCtx *);
int RunModeIdsErfDagAutoFp(DetectEngineCtx *);
int RunModeIdsErfDagSingle(DetectEngineCtx *);
int RunModeIdsErfDagWorkers(DetectEngineCtx *);
void RunModeErfDagRegister(void);
const char *RunModeErfDagGetDefaultMode(void);

@ -19,7 +19,7 @@
* \file
*
* \author Endace Technology Limited.
* \author Jason MacLulich <jason.maclulich@eendace.com>
* \author Jason MacLulich <jason.maclulich@endace.com>
*
* Support for reading ERF records from a DAG card.
*
@ -78,6 +78,8 @@ extern uint8_t suricata_ctl_flags;
typedef struct ErfDagThreadVars_ {
ThreadVars *tv;
TmSlot *slot;
int dagfd;
int dagstream;
char dagname[DAGNAME_BUFSIZE];
@ -100,12 +102,12 @@ typedef struct ErfDagThreadVars_ {
} ErfDagThreadVars;
TmEcode ReceiveErfDag(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
TmEcode ReceiveErfDagLoop(ThreadVars *, void *data, void *slot);
TmEcode ReceiveErfDagThreadInit(ThreadVars *, void *, void **);
void ReceiveErfDagThreadExitStats(ThreadVars *, void *);
TmEcode ReceiveErfDagThreadDeinit(ThreadVars *, void *);
TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn, Packet *p, uint8_t* top,
PacketQueue *postpq, uint32_t *pkts_read);
TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn, uint8_t* top,
uint32_t *pkts_read);
TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec, Packet *p);
TmEcode DecodeErfDagThreadInit(ThreadVars *, void *, void **);
@ -120,7 +122,8 @@ TmModuleReceiveErfDagRegister(void)
{
tmm_modules[TMM_RECEIVEERFDAG].name = "ReceiveErfDag";
tmm_modules[TMM_RECEIVEERFDAG].ThreadInit = ReceiveErfDagThreadInit;
tmm_modules[TMM_RECEIVEERFDAG].Func = ReceiveErfDag;
tmm_modules[TMM_RECEIVEERFDAG].Func = NULL;
tmm_modules[TMM_RECEIVEERFDAG].PktAcqLoop = ReceiveErfDagLoop;
tmm_modules[TMM_RECEIVEERFDAG].ThreadExitPrintStats =
ReceiveErfDagThreadExitStats;
tmm_modules[TMM_RECEIVEERFDAG].ThreadDeinit = NULL;
@ -300,37 +303,21 @@ ReceiveErfDagThreadInit(ThreadVars *tv, void *initdata, void **data)
}
/**
* \brief Thread entry function for reading ERF records from a DAG card.
*
* Reads a new ERF record the DAG input buffer and copies it to
* an internal Suricata packet buffer -- similar to the way the
* pcap packet handler works.
* \brief Receives packets from a DAG interface.
*
* We create new packet structures using PacketGetFromQueueOrAlloc
* for each packet between the top and btm pointers except for
* the first packet for which a Packet buffer is provided
* from the packetpool.
* \param tv pointer to ThreadVars
* \param data pointer to ErfDagThreadVars
* \param slot slot containing task information
*
* We always read up to dag_max_read_packets ERF packets from the
* DAG buffer, but we might read less. This differs from the
* ReceivePcap handler -- it will only read pkts up to a maximum
* of either the packetpool count or the pcap_max_read_packets.
*
* \param tv pointer to ThreadVars
* \param p data pointer
* \param data
* \param pq pointer to the PacketQueue (not used here)
* \param postpq
* \retval TM_ECODE_FAILED on failure and TM_ECODE_OK on success.
* \note We also use the packetpool hack first used in the source-pcap
* handler so we don't keep producing packets without any dying.
* This implies that if we are in this situation we run the risk
* of dropping packets at the interface.
* \retval TM_ECODE_OK on success
* \retval TM_ECODE_FAILED on failure
*/
TmEcode
ReceiveErfDag(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq,
PacketQueue *postpq)
TmEcode ReceiveErfDagLoop(ThreadVars *tv, void *data, void *slot)
{
ErfDagThreadVars *dtv = (ErfDagThreadVars *)data;
TmSlot *s = (TmSlot *)slot;
dtv->slot = s->slot_next;
SCEnter();
uint16_t packet_q_len = 0;
@ -339,97 +326,90 @@ ReceiveErfDag(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq,
uint8_t *top = NULL;
uint32_t pkts_read = 0;
assert(p);
assert(pq);
assert(postpq);
ErfDagThreadVars *ewtn = (ErfDagThreadVars *)data;
/* NOTE/JNM: Hack copied from source-pcap.c
*
* Make sure we have at least one packet in the packet pool, to
* prevent us from alloc'ing packets at line rate
*/
while (packet_q_len == 0) {
packet_q_len = PacketPoolSize();
if (packet_q_len == 0) {
PacketPoolWait();
}
}
if (postpq == NULL) {
ewtn->dag_max_read_packets = 1;
}
while(pkts_read == 0)
while (1)
{
if (suricata_ctl_flags != 0) {
break;
if (suricata_ctl_flags & SURICATA_STOP ||
suricata_ctl_flags & SURICATA_KILL) {
SCReturnInt(TM_ECODE_FAILED);
}
/* Make sure we have at least one packet in the packet pool,
* to prevent us from alloc'ing packets at line rate. */
do {
packet_q_len = PacketPoolSize();
if (unlikely(packet_q_len == 0)) {
PacketPoolWait();
}
} while (packet_q_len == 0);
/* NOTE/JNM: This might not work well if we start restricting the
* number of ERF records processed per call to a small number as
* the over head required here could exceed the time it takes to
* process a small number of ERF records.
*
* XXX/JNM: Possibly process the DAG stream buffer first if there
* are ERF packets or else call dag_advance_stream and then process
* the DAG stream buffer.
*/
top = dag_advance_stream(ewtn->dagfd, ewtn->dagstream, &(ewtn->btm));
if (NULL == top)
{
if((ewtn->dagstream & 0x1) && (errno == EAGAIN)) {
usleep(10 * 1000);
ewtn->btm = ewtn->top;
* number of ERF records processed per call to a small number as
* the over head required here could exceed the time it takes to
* process a small number of ERF records.
*
* XXX/JNM: Possibly process the DAG stream buffer first if there
* are ERF packets or else call dag_advance_stream and then process
* the DAG stream buffer.
*/
top = dag_advance_stream(dtv->dagfd, dtv->dagstream, &(dtv->btm));
if (NULL == top)
{
if ((dtv->dagstream & 0x1) && (errno == EAGAIN)) {
usleep(10 * 1000);
dtv->btm = dtv->top;
continue;
}
else {
SCLogError(SC_ERR_ERF_DAG_STREAM_READ_FAILED,
"Failed to read from stream: %d, DAG: %s when using dag_advance_stream",
ewtn->dagstream, ewtn->dagname);
SCReturnInt(TM_ECODE_FAILED);
}
}
}
else {
SCLogError(SC_ERR_ERF_DAG_STREAM_READ_FAILED,
"Failed to read from stream: %d, DAG: %s when using dag_advance_stream",
dtv->dagstream, dtv->dagname);
SCReturnInt(TM_ECODE_FAILED);
}
}
diff = top - ewtn->btm;
if (diff == 0)
{
continue;
}
diff = top - dtv->btm;
if (diff == 0)
{
continue;
}
assert(diff >= dag_record_size);
assert(diff >= dag_record_size);
err = ProcessErfDagRecords(ewtn, p, top, postpq, &pkts_read);
err = ProcessErfDagRecords(dtv, top, &pkts_read);
if (err == TM_ECODE_FAILED) {
SCLogError(SC_ERR_ERF_DAG_STREAM_READ_FAILED,
"Failed to read from stream: %d, DAG: %s",
ewtn->dagstream, ewtn->dagname);
ReceiveErfDagCloseStream(ewtn->dagfd, ewtn->dagstream);
SCLogError(SC_ERR_ERF_DAG_STREAM_READ_FAILED,
"Failed to read from stream: %d, DAG: %s",
dtv->dagstream, dtv->dagname);
ReceiveErfDagCloseStream(dtv->dagfd, dtv->dagstream);
SCReturnInt(err);
}
}
SCLogDebug("Read %d records from stream: %d, DAG: %s",
pkts_read, ewtn->dagstream, ewtn->dagname);
pkts_read, dtv->dagstream, dtv->dagname);
if (suricata_ctl_flags != 0) {
SCReturnInt(TM_ECODE_FAILED);
}
SCReturnInt(err);
SCReturnInt(TM_ECODE_OK);
}
/**
* \brief Process a chunk of records read from a DAG interface.
*
* This function takes a pointer to buffer read from the DAG interface
* and processes it individual records.
*/
TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn,
Packet *p,
uint8_t* top,
PacketQueue *postpq,
uint32_t *pkts_read)
{
SCEnter();
Packet *p;
int err = 0;
dag_record_t* dr = NULL;
char *prec = NULL;
@ -458,8 +438,7 @@ TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn,
if ((top-(ewtn->btm)) < rlen)
SCReturnInt(TM_ECODE_OK);
p = p ? p : PacketGetFromQueueOrAlloc();
p = PacketGetFromQueueOrAlloc();
if (p == NULL) {
SCLogError(SC_ERR_MEM_ALLOC,
"Failed to allocate a Packet on stream: %d, DAG: %s",
@ -469,22 +448,19 @@ TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn,
err = ProcessErfDagRecord(ewtn, prec, p);
if (err != TM_ECODE_OK)
if (err != TM_ECODE_OK) {
TmqhOutputPacketpool(ewtn->tv, p);
SCReturnInt(err);
}
ewtn->btm += rlen;
/* XXX/JNM: Hack to get around the fact that the first Packet from
* Suricata is added explicitly by the Slot code and shouldn't go
* onto the post queue -- else it is added twice to the next queue.
*/
if (*pkts_read) {
PacketEnqueue(postpq, p);
err = TmThreadsSlotProcessPkt(ewtn->tv, ewtn->slot, p);
if (err != TM_ECODE_OK) {
return err;
}
(*pkts_read)++;
p = NULL;
}
SCReturnInt(TM_ECODE_OK);
@ -530,9 +506,6 @@ TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec, Packet *p)
*/
PacketCopyData(p, pload->eth.dst, GET_PKT_LEN(p));
SCLogDebug("pktlen: %" PRIu32 " (pkt %02x, pkt data %02x)",
GET_PKT_LEN(p), *p, *GET_PKT_DATA(p));
/* Convert ERF time to timeval - from libpcap. */
uint64_t ts = dr->ts;
p->ts.tv_sec = ts >> 32;

@ -481,7 +481,7 @@ void usage(const char *progname)
#endif /* HAVE_LIBCAP_NG */
printf("\t--erf-in <path> : process an ERF file\n");
#ifdef HAVE_DAG
printf("\t--dag <dag0,dag1,...> : process ERF records from 0,1,...,n DAG input streams\n");
printf("\t--dag <dagX:Y> : process ERF records from DAG interface X, stream Y\n");
#endif
#ifdef HAVE_NAPATECH
printf("\t--napatech <adapter> : run Napatech feeds using <adapter>\n");
@ -944,17 +944,22 @@ int main(int argc, char **argv)
exit(EXIT_FAILURE);
}
}
else if (strcmp((long_opts[option_index]).name, "dag") == 0) {
else if (strcmp((long_opts[option_index]).name, "dag") == 0) {
#ifdef HAVE_DAG
run_mode = RUNMODE_DAG;
if (ConfSet("erf-dag.iface", optarg, 0) != 1) {
fprintf(stderr, "ERROR: Failed to set erf_dag.iface\n");
if (run_mode == RUNMODE_UNKNOWN) {
run_mode = RUNMODE_DAG;
}
else if (run_mode != RUNMODE_DAG) {
SCLogError(SC_ERR_MULTIPLE_RUN_MODE,
"more than one run mode has been specified");
usage(argv[0]);
exit(EXIT_FAILURE);
}
LiveRegisterDevice(optarg);
#else
SCLogError(SC_ERR_DAG_REQUIRED, "libdag and a DAG card are required"
SCLogError(SC_ERR_DAG_REQUIRED, "libdag and a DAG card are required"
" to receieve packets using --dag.");
exit(EXIT_FAILURE);
exit(EXIT_FAILURE);
#endif /* HAVE_DAG */
}
else if (strcmp((long_opts[option_index]).name, "napatech") == 0) {

Loading…
Cancel
Save