From ff6365dd33616ebfd18ce318f3e34229a2512a1a Mon Sep 17 00:00:00 2001 From: Eric Leblond Date: Thu, 8 Sep 2011 09:29:19 +0200 Subject: [PATCH] af-packet: switch to pcktacqloop API. This patch gets rid of the old API and brings some optimisation by reordering structure and optimisinf an error test. --- src/runmode-af-packet.c | 8 +- src/source-af-packet.c | 205 +++------------------------------------- 2 files changed, 19 insertions(+), 194 deletions(-) diff --git a/src/runmode-af-packet.c b/src/runmode-af-packet.c index 84e30e9df3..dc88ac62dc 100644 --- a/src/runmode-af-packet.c +++ b/src/runmode-af-packet.c @@ -249,7 +249,7 @@ int RunModeIdsAFPAuto(DetectEngineCtx *de_ctx) TmThreadCreatePacketHandler("ReceiveAFP", "packetpool", "packetpool", "pickup-queue", "simple", - "1slot"); + "pktacqloop"); if (tv_receiveafp == NULL) { printf("ERROR: TmThreadsCreate failed\n"); exit(EXIT_FAILURE); @@ -295,7 +295,7 @@ int RunModeIdsAFPAuto(DetectEngineCtx *de_ctx) TmThreadCreatePacketHandler(tnamec, "packetpool", "packetpool", "pickup-queue", "simple", - "1slot"); + "pktacqloop"); if (tv_receiveafp == NULL) { printf("ERROR: TmThreadsCreate failed\n"); exit(EXIT_FAILURE); @@ -614,7 +614,7 @@ int RunModeIdsAFPAutoFp(DetectEngineCtx *de_ctx) ThreadVars *tv_receive = TmThreadCreatePacketHandler(thread_name, "packetpool", "packetpool", - queues, "flow", "varslot"); + queues, "flow", "pktacqloop"); if (tv_receive == NULL) { printf("ERROR: TmThreadsCreate failed\n"); exit(EXIT_FAILURE); @@ -667,7 +667,7 @@ int RunModeIdsAFPAutoFp(DetectEngineCtx *de_ctx) ThreadVars *tv_receive = TmThreadCreatePacketHandler(thread_name, "packetpool", "packetpool", - queues, "flow", "varslot"); + queues, "flow", "pktacqloop"); if (tv_receive == NULL) { printf("ERROR: TmThreadsCreate failed\n"); exit(EXIT_FAILURE); diff --git a/src/source-af-packet.c b/src/source-af-packet.c index 9a6211eb61..e69c1cbaae 100644 --- a/src/source-af-packet.c +++ b/src/source-af-packet.c @@ -74,9 +74,6 @@ extern uint8_t suricata_ctl_flags; extern int max_pending_packets; -#define AFP_WORKER_MODE 0 -#define AFP_LOOP_MODE 1 - #ifndef HAVE_AF_PACKET TmEcode NoAFPSupportExit(ThreadVars *, void *, void **); @@ -118,10 +115,6 @@ TmEcode NoAFPSupportExit(ThreadVars *tv, void *initdata, void **data) #else /* We have AF_PACKET support */ -/** control how many packets we may read in one go */ -static int afp_max_read_packets = 0; -/** max packets < 65536 */ -#define AFP_FILE_MAX_PKTS 256 #define AFP_IFACE_NAME_LENGTH 48 #define AFP_STATE_DOWN 0 @@ -140,7 +133,6 @@ typedef struct AFPThreadVars_ int socket; /* handle state */ unsigned char afp_state; - char iface[AFP_IFACE_NAME_LENGTH]; /* data link type for the thread */ int datalink; @@ -151,6 +143,13 @@ typedef struct AFPThreadVars_ uint64_t bytes; uint32_t errs; + ThreadVars *tv; + TmSlot *slot; + + uint8_t *data; /** Per function and thread data */ + int datalen; /** Length of per function and thread data */ + + char iface[AFP_IFACE_NAME_LENGTH]; /* socket buffer size */ int buffer_size; int promisc; @@ -160,16 +159,7 @@ typedef struct AFPThreadVars_ int threads; - ThreadVars *tv; - TmSlot *slot; - - Packet *in_p; - Packet *array[AFP_FILE_MAX_PKTS]; - uint16_t array_idx; - - uint8_t *data; /** Per function and thread data */ - int datalen; /** Length of per function and thread data */ } AFPThreadVars; TmEcode ReceiveAFP(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); @@ -188,7 +178,7 @@ TmEcode DecodeAFP(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); void TmModuleReceiveAFPRegister (void) { tmm_modules[TMM_RECEIVEAFP].name = "ReceiveAFP"; tmm_modules[TMM_RECEIVEAFP].ThreadInit = ReceiveAFPThreadInit; - tmm_modules[TMM_RECEIVEAFP].Func = ReceiveAFP; + tmm_modules[TMM_RECEIVEAFP].Func = NULL; tmm_modules[TMM_RECEIVEAFP].PktAcqLoop = ReceiveAFPLoop; tmm_modules[TMM_RECEIVEAFP].ThreadExitPrintStats = ReceiveAFPThreadExitStats; tmm_modules[TMM_RECEIVEAFP].ThreadDeinit = NULL; @@ -222,7 +212,7 @@ static int AFPCreateSocket(AFPThreadVars *ptv, char *devname, int verbose); * \param user pointer to AFPThreadVars * \retval TM_ECODE_FAILED on failure and TM_ECODE_OK on success */ -TmEcode AFPRead(AFPThreadVars *ptv, int mode) +TmEcode AFPRead(AFPThreadVars *ptv) { Packet *p = NULL; /* XXX should try to use read that get directly to packet */ @@ -263,20 +253,8 @@ TmEcode AFPRead(AFPThreadVars *ptv, int mode) errno); SCReturnInt(TM_ECODE_FAILED); } - switch(mode) { - case AFP_WORKER_MODE: - if (ptv->array_idx == 0) { - p = ptv->in_p; - } else { - p = PacketGetFromQueueOrAlloc(); - } - break; - case AFP_LOOP_MODE: - p = PacketGetFromQueueOrAlloc(); - break; - default: - SCLogError(SC_ERR_INVALID_VALUE, "AFPRread does not support this mode"); - } + + p = PacketGetFromQueueOrAlloc(); if (p == NULL) { SCReturnInt(TM_ECODE_FAILED); } @@ -308,19 +286,7 @@ TmEcode AFPRead(AFPThreadVars *ptv, int mode) SCLogDebug("pktlen: %" PRIu32 " (pkt %p, pkt data %p)", GET_PKT_LEN(p), p, GET_PKT_DATA(p)); - switch(mode) { - case AFP_WORKER_MODE: - /* store the packet in our array */ - ptv->array[ptv->array_idx] = p; - ptv->array_idx++; - break; - case AFP_LOOP_MODE: - TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p); - break; - default: - SCLogError(SC_ERR_INVALID_VALUE, "AFPRread does not support this mode"); - TmqhOutputPacketpool(ptv->tv, p); - } + TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p); SCReturnInt(TM_ECODE_OK); } @@ -394,8 +360,7 @@ TmEcode ReceiveAFPLoop(ThreadVars *tv, void *data, void *slot) close(ptv->socket); ptv->afp_state = AFP_STATE_DOWN; continue; - } - if (fds.revents & POLLERR) { + } else if (fds.revents & POLLERR) { char c; /* Do a recv to get errno */ if (recv(ptv->socket, &c, sizeof c, MSG_PEEK) != -1) @@ -405,8 +370,7 @@ TmEcode ReceiveAFPLoop(ThreadVars *tv, void *data, void *slot) close(ptv->socket); ptv->afp_state = AFP_STATE_DOWN; continue; - } - if (fds.revents & POLLNVAL) { + } else if (fds.revents & POLLNVAL) { SCLogError(SC_ERR_AFP_READ, "Invalid polling request"); close(ptv->socket); ptv->afp_state = AFP_STATE_DOWN; @@ -414,7 +378,7 @@ TmEcode ReceiveAFPLoop(ThreadVars *tv, void *data, void *slot) } } else if (r > 0) { /* AFPRead will call TmThreadsSlotProcessPkt on read packets */ - r = AFPRead(ptv, AFP_LOOP_MODE); + r = AFPRead(ptv); if (r != TM_ECODE_OK) { SCReturnInt(TM_ECODE_FAILED); } @@ -435,140 +399,6 @@ TmEcode ReceiveAFPLoop(ThreadVars *tv, void *data, void *slot) SCReturnInt(TM_ECODE_OK); } -/** - * \brief Recieves packets from an interface via AF_PACKET socket - * - * This function recieves packets from an interface and passes - * the packet on to the AFP callback function. - * - * \param tv pointer to ThreadVars - * \param data pointer that gets cast into AFPThreadVars for ptv - * \param pq pointer to the PacketQueue (not used here but part of the api) - * \retval TM_ECODE_FAILED on failure and TM_ECODE_OK on success - */ -TmEcode ReceiveAFP(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) { - SCEnter(); - uint16_t packet_q_len = 0; - struct pollfd fds; - TmEcode ret; - uint16_t cnt = 0; - - AFPThreadVars *ptv = (AFPThreadVars *)data; - - /* test AFP handle */ - if (ptv->afp_state == AFP_STATE_DOWN) { - int r; - do { - usleep(AFP_RECONNECT_TIMEOUT); - if (suricata_ctl_flags != 0) { - break; - } - r = AFPTryReopen(ptv); - } while (r < 0); - } - - /* make sure we have at least one packet in the packet pool, to prevent - * us from alloc'ing packets at line rate */ - while (packet_q_len == 0) { - packet_q_len = PacketPoolSize(); - if (packet_q_len == 0) { - PacketPoolWait(); - } - } - - if (postpq == NULL) - afp_max_read_packets = 1; - - ptv->array_idx = 0; - ptv->in_p = p; - - fds.fd = ptv->socket; - fds.events = POLLIN; - - int r = 0; - while (r >= 0) { - r = poll(&fds, 1, POLL_TIMEOUT); - - if (suricata_ctl_flags != 0) { - break; - } - - if (r > 0 && - (fds.revents & (POLLHUP|POLLRDHUP|POLLERR|POLLNVAL))) { - if (fds.revents & (POLLHUP | POLLRDHUP)) { - close(ptv->socket); - ptv->afp_state = AFP_STATE_DOWN; - break; - } - if (fds.revents & POLLERR) { - char c; - /* Do a recv to get errno */ - if (recv(ptv->socket, &c, sizeof c, MSG_PEEK) != -1) - continue; /* what, no error? */ - SCLogError(SC_ERR_AFP_READ, "Error reading data from socket: (%d" PRIu32 ") %s", - errno, strerror(errno)); - close(ptv->socket); - ptv->afp_state = AFP_STATE_DOWN; - break; - } - if (fds.revents & POLLNVAL) { - SCLogError(SC_ERR_AFP_READ, "Invalid polling request"); - close(ptv->socket); - ptv->afp_state = AFP_STATE_DOWN; - break; - } - } else if (r > 0) { - ret = AFPRead(ptv, AFP_WORKER_MODE); - if (ret != TM_ECODE_OK) { - SCReturnInt(TM_ECODE_FAILED); - } - if (cnt++ >= afp_max_read_packets) - break; - } else if ((r < 0) && (errno != EINTR)) { - int dbreak = 0; - SCLogError(SC_ERR_AFP_READ, "Error reading data from socket: (%d" PRIu32 ") %s", - errno, strerror(errno)); - do { - usleep(AFP_RECONNECT_TIMEOUT); - if (suricata_ctl_flags != 0) { - dbreak = 1; - break; - } - r = AFPTryReopen(ptv); - } while (r < 0); - if (dbreak) { - r = 0; - break; - } - } else if (r == 0) { - /* timeout condition */ - if (cnt > 0) - break; - } - } - - for (cnt = 0; cnt < ptv->array_idx; cnt++) { - Packet *pp = ptv->array[cnt]; - - /* enqueue all but the first in the postpq, the first - * pkt is handled by the tv "out handler" */ - if (cnt > 0) { - PacketEnqueue(postpq, pp); - } - } - - if (r < 0) { - SCLogError(SC_ERR_AFP_DISPATCH, "error code %" PRId32, r); - SCReturnInt(TM_ECODE_OK); - } - - if (suricata_ctl_flags != 0) { - SCReturnInt(TM_ECODE_FAILED); - } - - SCReturnInt(TM_ECODE_OK); -} - static int AFPGetIfnumByDev(int fd, const char *ifname, int verbose) { struct ifreq ifr; @@ -719,11 +549,6 @@ TmEcode ReceiveAFPThreadInit(ThreadVars *tv, void *initdata, void **data) { int r; AFPIfaceConfig *afpconfig = initdata; - /* use max_pending_packets as AFP read size unless it's bigger than - * our size limit */ - afp_max_read_packets = (AFP_FILE_MAX_PKTS < max_pending_packets) ? - AFP_FILE_MAX_PKTS : max_pending_packets; - if (initdata == NULL) { SCLogError(SC_ERR_INVALID_ARGUMENT, "initdata == NULL"); SCReturnInt(TM_ECODE_FAILED);