From 1375e90030781d668f641089740390254a0501fb Mon Sep 17 00:00:00 2001 From: Eric Leblond Date: Mon, 24 Jan 2011 13:27:00 +0100 Subject: [PATCH] Prepare multi queue support in NFQ This patch prepare support for multiqueue in the source file. The NFQ vars contained in Packet structure has a new member. It is a reference to the NFQ thread var it comes from. The behaviour is modified as a single verdict thread treat packet for all Netfilter queues. Locking is done in the verdict function to ensure that simultaneous modifications of counters can not occur. Signed-off-by: Eric Leblond --- src/source-nfq.c | 243 +++++++++++++++++++++++++++++++---------------- src/source-nfq.h | 20 +++- 2 files changed, 181 insertions(+), 82 deletions(-) diff --git a/src/source-nfq.c b/src/source-nfq.c index 04ecd0468b..a953ce8e7e 100644 --- a/src/source-nfq.c +++ b/src/source-nfq.c @@ -107,8 +107,8 @@ extern int max_pending_packets; static NFQGlobalVars nfq_g; static NFQThreadVars nfq_t[NFQ_MAX_QUEUE]; +static NFQQueueVars nfq_q[NFQ_MAX_QUEUE]; static uint16_t receive_queue_num = 0; -static uint16_t verdict_queue_num = 0; static SCMutex nfq_init_lock; TmEcode ReceiveNFQ(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); @@ -117,7 +117,6 @@ void ReceiveNFQThreadExitStats(ThreadVars *, void *); TmEcode VerdictNFQ(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); TmEcode VerdictNFQThreadInit(ThreadVars *, void *, void **); -void VerdictNFQThreadExitStats(ThreadVars *, void *); TmEcode VerdictNFQThreadDeinit(ThreadVars *, void *); TmEcode DecodeNFQ(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); @@ -126,7 +125,6 @@ TmEcode DecodeNFQThreadInit(ThreadVars *, void *, void **); void TmModuleReceiveNFQRegister (void) { /* XXX create a general NFQ setup function */ memset(&nfq_g, 0, sizeof(nfq_g)); - memset(&nfq_t, 0, sizeof(nfq_t)); SCMutexInit(&nfq_init_lock, NULL); tmm_modules[TMM_RECEIVENFQ].name = "ReceiveNFQ"; @@ -141,7 +139,7 @@ void TmModuleVerdictNFQRegister (void) { tmm_modules[TMM_VERDICTNFQ].name = "VerdictNFQ"; tmm_modules[TMM_VERDICTNFQ].ThreadInit = VerdictNFQThreadInit; tmm_modules[TMM_VERDICTNFQ].Func = VerdictNFQ; - tmm_modules[TMM_VERDICTNFQ].ThreadExitPrintStats = VerdictNFQThreadExitStats; + tmm_modules[TMM_VERDICTNFQ].ThreadExitPrintStats = NULL; tmm_modules[TMM_VERDICTNFQ].ThreadDeinit = VerdictNFQThreadDeinit; tmm_modules[TMM_VERDICTNFQ].RegisterTests = NULL; } @@ -214,11 +212,13 @@ static int NFQCallBack(struct nfq_q_handle *qh, struct nfgenmsg *nfmsg, return -1; } + p->nfq_v.nfq_index = ntv->nfq_index; NFQSetupPkt(p, (void *)nfa); #ifdef COUNTERS - nfq_t->pkts++; - nfq_t->bytes += GET_PKT_LEN(p); + NFQQueueVars *nfq_q = NFQGetQueue(ntv->nfq_index); + nfq_q->pkts++; + nfq_q->bytes += GET_PKT_LEN(p); #endif /* COUNTERS */ /* pass on... */ @@ -227,17 +227,20 @@ static int NFQCallBack(struct nfq_q_handle *qh, struct nfgenmsg *nfmsg, return 0; } -TmEcode NFQInitThread(NFQThreadVars *nfq_t, uint16_t queue_num, uint32_t queue_maxlen) +TmEcode NFQInitThread(NFQThreadVars *nfq_t, uint32_t queue_maxlen) { #ifndef OS_WIN32 struct timeval tv; int opt; #endif - nfq_t->queue_num = queue_num; - + NFQQueueVars *nfq_q = NFQGetQueue(nfq_t->nfq_index); + if (nfq_q == NULL) { + SCLogError(SC_ERR_NFQ_OPEN, "no queue for given index"); + return TM_ECODE_FAILED; + } SCLogDebug("opening library handle"); - nfq_t->h = nfq_open(); - if (!nfq_t->h) { + nfq_q->h = nfq_open(); + if (!nfq_q->h) { SCLogError(SC_ERR_NFQ_OPEN, "nfq_open() failed"); return TM_ECODE_FAILED; } @@ -247,11 +250,11 @@ TmEcode NFQInitThread(NFQThreadVars *nfq_t, uint16_t queue_num, uint32_t queue_m /* VJ: on my Ubuntu Hardy system this fails the first time it's * run. Ignoring the error seems to have no bad effects. */ SCLogDebug("unbinding existing nf_queue handler for AF_INET (if any)"); - if (nfq_unbind_pf(nfq_t->h, AF_INET) < 0) { + if (nfq_unbind_pf(nfq_q->h, AF_INET) < 0) { SCLogError(SC_ERR_NFQ_UNBIND, "nfq_unbind_pf() for AF_INET failed"); exit(EXIT_FAILURE); } - if (nfq_unbind_pf(nfq_t->h, AF_INET6) < 0) { + if (nfq_unbind_pf(nfq_q->h, AF_INET6) < 0) { SCLogError(SC_ERR_NFQ_UNBIND, "nfq_unbind_pf() for AF_INET6 failed"); exit(EXIT_FAILURE); } @@ -259,22 +262,22 @@ TmEcode NFQInitThread(NFQThreadVars *nfq_t, uint16_t queue_num, uint32_t queue_m SCLogDebug("binding nfnetlink_queue as nf_queue handler for AF_INET and AF_INET6"); - if (nfq_bind_pf(nfq_t->h, AF_INET) < 0) { + if (nfq_bind_pf(nfq_q->h, AF_INET) < 0) { SCLogError(SC_ERR_NFQ_BIND, "nfq_bind_pf() for AF_INET failed"); exit(EXIT_FAILURE); } - if (nfq_bind_pf(nfq_t->h, AF_INET6) < 0) { + if (nfq_bind_pf(nfq_q->h, AF_INET6) < 0) { SCLogError(SC_ERR_NFQ_BIND, "nfq_bind_pf() for AF_INET6 failed"); exit(EXIT_FAILURE); } } - SCLogInfo("binding this thread to queue '%" PRIu32 "'", nfq_t->queue_num); + SCLogInfo("binding this thread %d to queue '%" PRIu32 "'", nfq_t->nfq_index, nfq_q->queue_num); /* pass the thread memory as a void ptr so the * callback function has access to it. */ - nfq_t->qh = nfq_create_queue(nfq_t->h, nfq_t->queue_num, &NFQCallBack, (void *)nfq_t); - if (nfq_t->qh == NULL) + nfq_q->qh = nfq_create_queue(nfq_q->h, nfq_q->queue_num, &NFQCallBack, (void *)nfq_t); + if (nfq_q->qh == NULL) { SCLogError(SC_ERR_NFQ_CREATE_QUEUE, "nfq_create_queue failed"); return TM_ECODE_FAILED; @@ -284,7 +287,7 @@ TmEcode NFQInitThread(NFQThreadVars *nfq_t, uint16_t queue_num, uint32_t queue_m /* 05DC = 1500 */ //if (nfq_set_mode(nfq_t->qh, NFQNL_COPY_PACKET, 0x05DC) < 0) { - if (nfq_set_mode(nfq_t->qh, NFQNL_COPY_PACKET, 0xFFFF) < 0) { + if (nfq_set_mode(nfq_q->qh, NFQNL_COPY_PACKET, 0xFFFF) < 0) { SCLogError(SC_ERR_NFQ_SET_MODE, "can't set packet_copy mode"); return TM_ECODE_FAILED; } @@ -296,7 +299,7 @@ TmEcode NFQInitThread(NFQThreadVars *nfq_t, uint16_t queue_num, uint32_t queue_m SCLogInfo("setting queue length to %" PRId32 "", queue_maxlen); /* non-fatal if it fails */ - if (nfq_set_queue_maxlen(nfq_t->qh, queue_maxlen) < 0) { + if (nfq_set_queue_maxlen(nfq_q->qh, queue_maxlen) < 0) { SCLogWarning(SC_ERR_NFQ_MAXLEN, "can't set queue maxlen: your kernel probably " "doesn't support setting the queue length"); } @@ -305,23 +308,24 @@ TmEcode NFQInitThread(NFQThreadVars *nfq_t, uint16_t queue_num, uint32_t queue_m #ifndef OS_WIN32 /* set netlink buffer size to a decent value */ - nfnl_rcvbufsiz(nfq_nfnlh(nfq_t->h), queue_maxlen * 1500); + nfnl_rcvbufsiz(nfq_nfnlh(nfq_q->h), queue_maxlen * 1500); SCLogInfo("setting nfnl bufsize to %" PRId32 "", queue_maxlen * 1500); - nfq_t->nh = nfq_nfnlh(nfq_t->h); - nfq_t->fd = nfnl_fd(nfq_t->nh); + nfq_q->nh = nfq_nfnlh(nfq_q->h); + nfq_q->fd = nfnl_fd(nfq_q->nh); + SCMutexInit(&nfq_q->mutex_qh, NULL); /* Set some netlink specific option on the socket to increase performance */ opt = 1; #ifdef NETLINK_BROADCAST_SEND_ERROR - setsockopt(nfq_t->fd, SOL_NETLINK, - NETLINK_BROADCAST_SEND_ERROR, &opt, sizeof(int)); + setsockopt(nfq_q->fd, SOL_NETLINK, + NETLINK_BROADCAST_SEND_ERROR, &opt, sizeof(int)); #endif /* Don't send error about no buffer space available but drop the packets instead */ #ifdef NETLINK_NO_ENOBUFS - setsockopt(nfq_t->fd, SOL_NETLINK, NETLINK_NO_ENOBUFS, &opt, sizeof(int)); + setsockopt(nfq_q->fd, SOL_NETLINK, NETLINK_NO_ENOBUFS, &opt, sizeof(int)); #endif /* set a timeout to the socket so we can check for a signal @@ -329,17 +333,17 @@ TmEcode NFQInitThread(NFQThreadVars *nfq_t, uint16_t queue_num, uint32_t queue_m tv.tv_sec = 1; tv.tv_usec = 0; - if(setsockopt(nfq_t->fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) == -1) { + if(setsockopt(nfq_q->fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) == -1) { SCLogWarning(SC_ERR_NFQ_SETSOCKOPT, "can't set socket timeout: %s", strerror(errno)); } - SCLogDebug("nfq_t->h %p, nfq_t->nh %p, nfq_t->qh %p, nfq_t->fd %" PRId32 "", - nfq_t->h, nfq_t->nh, nfq_t->qh, nfq_t->fd); + SCLogDebug("nfq_q->h %p, nfq_q->nh %p, nfq_q->qh %p, nfq_q->fd %" PRId32 "", + nfq_q->h, nfq_q->nh, nfq_q->qh, nfq_q->fd); #else /* OS_WIN32 */ - SCMutexInit(&nfq_t->mutex_qh, NULL); - nfq_t->ovr.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); - nfq_t->fd = nfq_fd(nfq_t->h); - SCLogDebug("nfq_t->h %p, nfq_t->qh %p, nfq_t->fd %p", nfq_t->h, nfq_t->qh, nfq_t->fd); + SCMutexInit(&nfq_q->mutex_qh, NULL); + nfq_q->ovr.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); + nfq_q->fd = nfq_fd(nfq_q->h); + SCLogDebug("nfq_q->h %p, nfq_q->qh %p, nfq_q->fd %p", nfq_q->h, nfq_q->qh, nfq_q->fd); #endif /* OS_WIN32 */ return TM_ECODE_OK; @@ -347,7 +351,6 @@ TmEcode NFQInitThread(NFQThreadVars *nfq_t, uint16_t queue_num, uint32_t queue_m TmEcode ReceiveNFQThreadInit(ThreadVars *tv, void *initdata, void **data) { SCMutexLock(&nfq_init_lock); - SCLogDebug("starting... will bind to queuenum %" PRIu32 "", receive_queue_num); #ifndef OS_WIN32 sigset_t sigs; @@ -355,23 +358,12 @@ TmEcode ReceiveNFQThreadInit(ThreadVars *tv, void *initdata, void **data) { pthread_sigmask(SIG_BLOCK, &sigs, NULL); #endif /* OS_WIN32 */ - NFQThreadVars *ntv = &nfq_t[receive_queue_num]; - + NFQThreadVars *ntv = (NFQThreadVars *) initdata; /* store the ThreadVars pointer in our NFQ thread context * as we will need it in our callback function */ ntv->tv = tv; - /* Extract the queue number from the specified command line argument */ - uint16_t queue_num = 0; - if ((ByteExtractStringUint16(&queue_num, 10, strlen((char *)initdata), - (char *)initdata)) < 0) - { - SCLogError(SC_ERR_INVALID_ARGUMENT, "specified queue number %s is not " - "valid", (char *)initdata); - exit(EXIT_FAILURE); - } - - int r = NFQInitThread(ntv, queue_num, (max_pending_packets * NFQ_BURST_FACTOR)); + int r = NFQInitThread(ntv, (max_pending_packets * NFQ_BURST_FACTOR)); if (r < 0) { SCLogError(SC_ERR_NFQ_THREAD_INIT, "nfq thread failed to initialize"); @@ -380,41 +372,134 @@ TmEcode ReceiveNFQThreadInit(ThreadVars *tv, void *initdata, void **data) { } *data = (void *)ntv; - receive_queue_num++; SCMutexUnlock(&nfq_init_lock); return TM_ECODE_OK; } TmEcode VerdictNFQThreadInit(ThreadVars *tv, void *initdata, void **data) { - SCMutexLock(&nfq_init_lock); - SCLogDebug("starting... will bind to queuenum %" PRIu32 "", verdict_queue_num); - /* no initialization, ReceiveNFQ takes care of that */ - NFQThreadVars *ntv = &nfq_t[verdict_queue_num]; - - *data = (void *)ntv; - verdict_queue_num++; + *data = (void *)initdata; - SCMutexUnlock(&nfq_init_lock); return TM_ECODE_OK; } TmEcode VerdictNFQThreadDeinit(ThreadVars *tv, void *data) { NFQThreadVars *ntv = (NFQThreadVars *)data; + NFQQueueVars *nq = NFQGetQueue(ntv->nfq_index); - SCLogDebug("starting... will close queuenum %" PRIu32 "", ntv->queue_num); - nfq_destroy_queue(ntv->qh); + SCLogDebug("starting... will close queuenum %" PRIu32 "", nq->queue_num); + nfq_destroy_queue(nq->qh); return TM_ECODE_OK; } +/** + * \brief Add a Netfilter queue + * + * \param string with the queue name + * + * \retval 0 on success. + * \retval -1 on failure. + */ +int NFQRegisterQueue(char *queue) +{ + NFQThreadVars *ntv = NULL; + NFQQueueVars *nq = NULL; + /* Extract the queue number from the specified command line argument */ + uint16_t queue_num = 0; + if ((ByteExtractStringUint16(&queue_num, 10, strlen(queue), queue)) < 0) + { + SCLogError(SC_ERR_INVALID_ARGUMENT, "specified queue number %s is not " + "valid", queue); + return -1; + } + + SCMutexLock(&nfq_init_lock); + if (receive_queue_num >= NFQ_MAX_QUEUE) { + SCLogError(SC_ERR_INVALID_ARGUMENT, + "too much Netfilter queue registered (%d)", + receive_queue_num); + SCMutexUnlock(&nfq_init_lock); + return -1; + } + if (receive_queue_num == 0) { + memset(&nfq_t, 0, sizeof(nfq_t)); + memset(&nfq_q, 0, sizeof(nfq_q)); + } + + ntv = &nfq_t[receive_queue_num]; + ntv->nfq_index = receive_queue_num; + + nq = &nfq_q[receive_queue_num]; + nq->queue_num = queue_num; + receive_queue_num++; + SCMutexUnlock(&nfq_init_lock); + + SCLogDebug("Queue \"%s\" registered.", queue); + return 0; +} + +/** + * \brief Get the number of registered queues + * + * \retval cnt the number of registered queues + */ +int NFQGetQueueCount(void) { + return receive_queue_num; +} + +/** + * \brief Get a pointer to the NFQ queue at index + * + * \param number idx of the queue in our array + * + * \retval ptr pointer to the NFQThreadVars at index + * \retval NULL on error + */ +void *NFQGetQueue(int number) { + if (number > receive_queue_num) + return NULL; + + return (void *)&nfq_q[number]; +} + +/** + * \brief Get queue number to the NFQ at index + * + * \param number idx of the queue in our array + * + * \retval ptr pointer to the NFQThreadVars at index + * \retval -1 on error + */ +int NFQGetQueueNum(int number) { + if (number > receive_queue_num) + return -1; + + return nfq_q[number].queue_num; +} + +/** + * \brief Get a pointer to the NFQ thread at index + * + * \param number idx of the queue in our array + * + * \retval ptr pointer to the NFQThreadVars at index + * \retval NULL on error + */ +void *NFQGetThread(int number) { + if (number > receive_queue_num) + return NULL; + + return (void *)&nfq_t[number]; +} + /** * \brief NFQ function to get a packet from the kernel * * \note separate functions for Linux and Win32 for readability. */ #ifndef OS_WIN32 -void NFQRecvPkt(NFQThreadVars *t) { +void NFQRecvPkt(NFQQueueVars *t) { int rv, ret; char buf[70000]; @@ -426,7 +511,9 @@ void NFQRecvPkt(NFQThreadVars *t) { /* no error on timeout */ } else { #ifdef COUNTERS + SCMutexLock(&t->mutex_qh); t->errs++; + SCMutexUnlock(&t->mutex_qh); #endif /* COUNTERS */ } } else if(rv == 0) { @@ -449,7 +536,7 @@ void NFQRecvPkt(NFQThreadVars *t) { } } #else /* WIN32 version of NFQRecvPkt */ -void NFQRecvPkt(NFQThreadVars *t) { +void NFQRecvPkt(NFQQueueVars *t) { int rv, ret; char buf[70000]; static int timeouted = 0; @@ -519,6 +606,12 @@ process_rv: TmEcode ReceiveNFQ(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) { NFQThreadVars *ntv = (NFQThreadVars *)data; + NFQQueueVars *nq = NFQGetQueue(ntv->nfq_index); + if (nq == NULL) { + SCLogWarning(SC_ERR_INVALID_ARGUMENT, + "can't get queue for %" PRId16 "", ntv->nfq_index); + return TM_ECODE_FAILED; + } /* make sure we have at least one packet in the packet pool, to prevent * us from 1) alloc'ing packets at line rate, 2) have a race condition @@ -528,7 +621,7 @@ TmEcode ReceiveNFQ(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, Packe } /* do our nfq magic */ - NFQRecvPkt(ntv); + NFQRecvPkt(nq); return TM_ECODE_OK; } @@ -538,32 +631,25 @@ TmEcode ReceiveNFQ(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, Packe */ void ReceiveNFQThreadExitStats(ThreadVars *tv, void *data) { NFQThreadVars *ntv = (NFQThreadVars *)data; + NFQQueueVars *nq = NFQGetQueue(ntv->nfq_index); #ifdef COUNTERS SCLogInfo("(%s) Pkts %" PRIu32 ", Bytes %" PRIu64 ", Errors %" PRIu32 "", - tv->name, ntv->pkts, ntv->bytes, ntv->errs); -#endif -} - -/** - * \brief NFQ verdict module stats printing function - */ -void VerdictNFQThreadExitStats(ThreadVars *tv, void *data) { - NFQThreadVars *ntv = (NFQThreadVars *)data; -#ifdef COUNTERS - SCLogInfo("(%s) Pkts accepted %" PRIu32 ", dropped %" PRIu32 - ", replaced %" PRIu32, tv->name, ntv->accepted, - ntv->dropped, ntv->replaced); + tv->name, nq->pkts, nq->bytes, nq->errs); + SCLogInfo("Pkts accepted %"PRIu32", dropped %"PRIu32", replaced %"PRIu32, + nq->accepted, nq->dropped, nq->replaced); #endif } /** * \brief NFQ verdict function */ -void NFQSetVerdict(NFQThreadVars *t, Packet *p) { +void NFQSetVerdict(Packet *p) { int ret; uint32_t verdict; + NFQQueueVars *t = nfq_q + p->nfq_v.nfq_index; //printf("%p verdicting on queue %" PRIu32 "\n", t, t->queue_num); + SCMutexLock(&t->mutex_qh); if (p->action & ACTION_REJECT || p->action & ACTION_REJECT_BOTH || p->action & ACTION_REJECT_DST || p->action & ACTION_DROP) { @@ -598,9 +684,6 @@ void NFQSetVerdict(NFQThreadVars *t, Packet *p) { * \brief NFQ verdict module packet entry function */ TmEcode VerdictNFQ(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) { - - NFQThreadVars *ntv = (NFQThreadVars *)data; - /* can't verdict a "fake" packet */ if (p->flags & PKT_PSEUDO_STREAM_END) { SCReturnInt(TM_ECODE_OK); @@ -625,13 +708,13 @@ TmEcode VerdictNFQ(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, Packe /* don't verdict if we are not ready */ if (verdict == 1) { //printf("VerdictNFQ: setting verdict\n"); - NFQSetVerdict(ntv, p->root ? p->root : p); + NFQSetVerdict(p->root ? p->root : p); } else { TUNNEL_INCR_PKT_RTV(p); } } else { /* no tunnel, verdict normally */ - NFQSetVerdict(ntv, p); + NFQSetVerdict(p); } return TM_ECODE_OK; } diff --git a/src/source-nfq.h b/src/source-nfq.h index ed1bdc76c6..f4918338bd 100644 --- a/src/source-nfq.h +++ b/src/source-nfq.h @@ -42,6 +42,7 @@ typedef struct NFQPacketVars_ { int id; /* this nfq packets id */ + uint16_t nfq_index; /* index in NFQ array */ uint32_t mark; uint32_t ifi; @@ -50,6 +51,13 @@ typedef struct NFQPacketVars_ } NFQPacketVars; typedef struct NFQThreadVars_ +{ + uint16_t nfq_index; + ThreadVars *tv; + +} NFQThreadVars; + +typedef struct NFQQueueVars_ { struct nfq_handle *h; #ifndef OS_WIN32 @@ -64,6 +72,8 @@ typedef struct NFQThreadVars_ SCMutex mutex_qh; /* this one should be not changing after init */ uint16_t queue_num; + /* position into the NFQ queue var array */ + uint16_t nfq_index; #ifdef DBG_PERF int dbg_maxreadsize; @@ -77,14 +87,20 @@ typedef struct NFQThreadVars_ uint32_t dropped; uint32_t replaced; - ThreadVars *tv; -} NFQThreadVars; +} NFQQueueVars; + + typedef struct NFQGlobalVars_ { char unbind; } NFQGlobalVars; +int NFQRegisterQueue(char *queue); +int NFQGetQueueCount(void); +void *NFQGetQueue(int number); +int NFQGetQueueNum(int number); +void *NFQGetThread(int number); #endif /* NFQ */ #endif /* __SOURCE_NFQ_H__ */