Prepare multi queue support in NFQ

This patch prepare support for multiqueue in the
source file. The NFQ vars contained in Packet structure
has a new member. It is a reference to the NFQ thread var
it comes from. The behaviour is modified as a single verdict
thread treat packet for all Netfilter queues.

Locking is done in the verdict function to ensure that
simultaneous modifications of counters can not occur.

Signed-off-by: Eric Leblond <eric@regit.org>
remotes/origin/master-1.1.x
Eric Leblond 15 years ago committed by Victor Julien
parent d0faa6c96e
commit 1375e90030

@ -107,8 +107,8 @@ extern int max_pending_packets;
static NFQGlobalVars nfq_g;
static NFQThreadVars nfq_t[NFQ_MAX_QUEUE];
static NFQQueueVars nfq_q[NFQ_MAX_QUEUE];
static uint16_t receive_queue_num = 0;
static uint16_t verdict_queue_num = 0;
static SCMutex nfq_init_lock;
TmEcode ReceiveNFQ(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
@ -117,7 +117,6 @@ void ReceiveNFQThreadExitStats(ThreadVars *, void *);
TmEcode VerdictNFQ(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
TmEcode VerdictNFQThreadInit(ThreadVars *, void *, void **);
void VerdictNFQThreadExitStats(ThreadVars *, void *);
TmEcode VerdictNFQThreadDeinit(ThreadVars *, void *);
TmEcode DecodeNFQ(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
@ -126,7 +125,6 @@ TmEcode DecodeNFQThreadInit(ThreadVars *, void *, void **);
void TmModuleReceiveNFQRegister (void) {
/* XXX create a general NFQ setup function */
memset(&nfq_g, 0, sizeof(nfq_g));
memset(&nfq_t, 0, sizeof(nfq_t));
SCMutexInit(&nfq_init_lock, NULL);
tmm_modules[TMM_RECEIVENFQ].name = "ReceiveNFQ";
@ -141,7 +139,7 @@ void TmModuleVerdictNFQRegister (void) {
tmm_modules[TMM_VERDICTNFQ].name = "VerdictNFQ";
tmm_modules[TMM_VERDICTNFQ].ThreadInit = VerdictNFQThreadInit;
tmm_modules[TMM_VERDICTNFQ].Func = VerdictNFQ;
tmm_modules[TMM_VERDICTNFQ].ThreadExitPrintStats = VerdictNFQThreadExitStats;
tmm_modules[TMM_VERDICTNFQ].ThreadExitPrintStats = NULL;
tmm_modules[TMM_VERDICTNFQ].ThreadDeinit = VerdictNFQThreadDeinit;
tmm_modules[TMM_VERDICTNFQ].RegisterTests = NULL;
}
@ -214,11 +212,13 @@ static int NFQCallBack(struct nfq_q_handle *qh, struct nfgenmsg *nfmsg,
return -1;
}
p->nfq_v.nfq_index = ntv->nfq_index;
NFQSetupPkt(p, (void *)nfa);
#ifdef COUNTERS
nfq_t->pkts++;
nfq_t->bytes += GET_PKT_LEN(p);
NFQQueueVars *nfq_q = NFQGetQueue(ntv->nfq_index);
nfq_q->pkts++;
nfq_q->bytes += GET_PKT_LEN(p);
#endif /* COUNTERS */
/* pass on... */
@ -227,17 +227,20 @@ static int NFQCallBack(struct nfq_q_handle *qh, struct nfgenmsg *nfmsg,
return 0;
}
TmEcode NFQInitThread(NFQThreadVars *nfq_t, uint16_t queue_num, uint32_t queue_maxlen)
TmEcode NFQInitThread(NFQThreadVars *nfq_t, uint32_t queue_maxlen)
{
#ifndef OS_WIN32
struct timeval tv;
int opt;
#endif
nfq_t->queue_num = queue_num;
NFQQueueVars *nfq_q = NFQGetQueue(nfq_t->nfq_index);
if (nfq_q == NULL) {
SCLogError(SC_ERR_NFQ_OPEN, "no queue for given index");
return TM_ECODE_FAILED;
}
SCLogDebug("opening library handle");
nfq_t->h = nfq_open();
if (!nfq_t->h) {
nfq_q->h = nfq_open();
if (!nfq_q->h) {
SCLogError(SC_ERR_NFQ_OPEN, "nfq_open() failed");
return TM_ECODE_FAILED;
}
@ -247,11 +250,11 @@ TmEcode NFQInitThread(NFQThreadVars *nfq_t, uint16_t queue_num, uint32_t queue_m
/* VJ: on my Ubuntu Hardy system this fails the first time it's
* run. Ignoring the error seems to have no bad effects. */
SCLogDebug("unbinding existing nf_queue handler for AF_INET (if any)");
if (nfq_unbind_pf(nfq_t->h, AF_INET) < 0) {
if (nfq_unbind_pf(nfq_q->h, AF_INET) < 0) {
SCLogError(SC_ERR_NFQ_UNBIND, "nfq_unbind_pf() for AF_INET failed");
exit(EXIT_FAILURE);
}
if (nfq_unbind_pf(nfq_t->h, AF_INET6) < 0) {
if (nfq_unbind_pf(nfq_q->h, AF_INET6) < 0) {
SCLogError(SC_ERR_NFQ_UNBIND, "nfq_unbind_pf() for AF_INET6 failed");
exit(EXIT_FAILURE);
}
@ -259,22 +262,22 @@ TmEcode NFQInitThread(NFQThreadVars *nfq_t, uint16_t queue_num, uint32_t queue_m
SCLogDebug("binding nfnetlink_queue as nf_queue handler for AF_INET and AF_INET6");
if (nfq_bind_pf(nfq_t->h, AF_INET) < 0) {
if (nfq_bind_pf(nfq_q->h, AF_INET) < 0) {
SCLogError(SC_ERR_NFQ_BIND, "nfq_bind_pf() for AF_INET failed");
exit(EXIT_FAILURE);
}
if (nfq_bind_pf(nfq_t->h, AF_INET6) < 0) {
if (nfq_bind_pf(nfq_q->h, AF_INET6) < 0) {
SCLogError(SC_ERR_NFQ_BIND, "nfq_bind_pf() for AF_INET6 failed");
exit(EXIT_FAILURE);
}
}
SCLogInfo("binding this thread to queue '%" PRIu32 "'", nfq_t->queue_num);
SCLogInfo("binding this thread %d to queue '%" PRIu32 "'", nfq_t->nfq_index, nfq_q->queue_num);
/* pass the thread memory as a void ptr so the
* callback function has access to it. */
nfq_t->qh = nfq_create_queue(nfq_t->h, nfq_t->queue_num, &NFQCallBack, (void *)nfq_t);
if (nfq_t->qh == NULL)
nfq_q->qh = nfq_create_queue(nfq_q->h, nfq_q->queue_num, &NFQCallBack, (void *)nfq_t);
if (nfq_q->qh == NULL)
{
SCLogError(SC_ERR_NFQ_CREATE_QUEUE, "nfq_create_queue failed");
return TM_ECODE_FAILED;
@ -284,7 +287,7 @@ TmEcode NFQInitThread(NFQThreadVars *nfq_t, uint16_t queue_num, uint32_t queue_m
/* 05DC = 1500 */
//if (nfq_set_mode(nfq_t->qh, NFQNL_COPY_PACKET, 0x05DC) < 0) {
if (nfq_set_mode(nfq_t->qh, NFQNL_COPY_PACKET, 0xFFFF) < 0) {
if (nfq_set_mode(nfq_q->qh, NFQNL_COPY_PACKET, 0xFFFF) < 0) {
SCLogError(SC_ERR_NFQ_SET_MODE, "can't set packet_copy mode");
return TM_ECODE_FAILED;
}
@ -296,7 +299,7 @@ TmEcode NFQInitThread(NFQThreadVars *nfq_t, uint16_t queue_num, uint32_t queue_m
SCLogInfo("setting queue length to %" PRId32 "", queue_maxlen);
/* non-fatal if it fails */
if (nfq_set_queue_maxlen(nfq_t->qh, queue_maxlen) < 0) {
if (nfq_set_queue_maxlen(nfq_q->qh, queue_maxlen) < 0) {
SCLogWarning(SC_ERR_NFQ_MAXLEN, "can't set queue maxlen: your kernel probably "
"doesn't support setting the queue length");
}
@ -305,23 +308,24 @@ TmEcode NFQInitThread(NFQThreadVars *nfq_t, uint16_t queue_num, uint32_t queue_m
#ifndef OS_WIN32
/* set netlink buffer size to a decent value */
nfnl_rcvbufsiz(nfq_nfnlh(nfq_t->h), queue_maxlen * 1500);
nfnl_rcvbufsiz(nfq_nfnlh(nfq_q->h), queue_maxlen * 1500);
SCLogInfo("setting nfnl bufsize to %" PRId32 "", queue_maxlen * 1500);
nfq_t->nh = nfq_nfnlh(nfq_t->h);
nfq_t->fd = nfnl_fd(nfq_t->nh);
nfq_q->nh = nfq_nfnlh(nfq_q->h);
nfq_q->fd = nfnl_fd(nfq_q->nh);
SCMutexInit(&nfq_q->mutex_qh, NULL);
/* Set some netlink specific option on the socket to increase
performance */
opt = 1;
#ifdef NETLINK_BROADCAST_SEND_ERROR
setsockopt(nfq_t->fd, SOL_NETLINK,
NETLINK_BROADCAST_SEND_ERROR, &opt, sizeof(int));
setsockopt(nfq_q->fd, SOL_NETLINK,
NETLINK_BROADCAST_SEND_ERROR, &opt, sizeof(int));
#endif
/* Don't send error about no buffer space available but drop the
packets instead */
#ifdef NETLINK_NO_ENOBUFS
setsockopt(nfq_t->fd, SOL_NETLINK, NETLINK_NO_ENOBUFS, &opt, sizeof(int));
setsockopt(nfq_q->fd, SOL_NETLINK, NETLINK_NO_ENOBUFS, &opt, sizeof(int));
#endif
/* set a timeout to the socket so we can check for a signal
@ -329,17 +333,17 @@ TmEcode NFQInitThread(NFQThreadVars *nfq_t, uint16_t queue_num, uint32_t queue_m
tv.tv_sec = 1;
tv.tv_usec = 0;
if(setsockopt(nfq_t->fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) == -1) {
if(setsockopt(nfq_q->fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) == -1) {
SCLogWarning(SC_ERR_NFQ_SETSOCKOPT, "can't set socket timeout: %s", strerror(errno));
}
SCLogDebug("nfq_t->h %p, nfq_t->nh %p, nfq_t->qh %p, nfq_t->fd %" PRId32 "",
nfq_t->h, nfq_t->nh, nfq_t->qh, nfq_t->fd);
SCLogDebug("nfq_q->h %p, nfq_q->nh %p, nfq_q->qh %p, nfq_q->fd %" PRId32 "",
nfq_q->h, nfq_q->nh, nfq_q->qh, nfq_q->fd);
#else /* OS_WIN32 */
SCMutexInit(&nfq_t->mutex_qh, NULL);
nfq_t->ovr.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
nfq_t->fd = nfq_fd(nfq_t->h);
SCLogDebug("nfq_t->h %p, nfq_t->qh %p, nfq_t->fd %p", nfq_t->h, nfq_t->qh, nfq_t->fd);
SCMutexInit(&nfq_q->mutex_qh, NULL);
nfq_q->ovr.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
nfq_q->fd = nfq_fd(nfq_q->h);
SCLogDebug("nfq_q->h %p, nfq_q->qh %p, nfq_q->fd %p", nfq_q->h, nfq_q->qh, nfq_q->fd);
#endif /* OS_WIN32 */
return TM_ECODE_OK;
@ -347,7 +351,6 @@ TmEcode NFQInitThread(NFQThreadVars *nfq_t, uint16_t queue_num, uint32_t queue_m
TmEcode ReceiveNFQThreadInit(ThreadVars *tv, void *initdata, void **data) {
SCMutexLock(&nfq_init_lock);
SCLogDebug("starting... will bind to queuenum %" PRIu32 "", receive_queue_num);
#ifndef OS_WIN32
sigset_t sigs;
@ -355,23 +358,12 @@ TmEcode ReceiveNFQThreadInit(ThreadVars *tv, void *initdata, void **data) {
pthread_sigmask(SIG_BLOCK, &sigs, NULL);
#endif /* OS_WIN32 */
NFQThreadVars *ntv = &nfq_t[receive_queue_num];
NFQThreadVars *ntv = (NFQThreadVars *) initdata;
/* store the ThreadVars pointer in our NFQ thread context
* as we will need it in our callback function */
ntv->tv = tv;
/* Extract the queue number from the specified command line argument */
uint16_t queue_num = 0;
if ((ByteExtractStringUint16(&queue_num, 10, strlen((char *)initdata),
(char *)initdata)) < 0)
{
SCLogError(SC_ERR_INVALID_ARGUMENT, "specified queue number %s is not "
"valid", (char *)initdata);
exit(EXIT_FAILURE);
}
int r = NFQInitThread(ntv, queue_num, (max_pending_packets * NFQ_BURST_FACTOR));
int r = NFQInitThread(ntv, (max_pending_packets * NFQ_BURST_FACTOR));
if (r < 0) {
SCLogError(SC_ERR_NFQ_THREAD_INIT, "nfq thread failed to initialize");
@ -380,41 +372,134 @@ TmEcode ReceiveNFQThreadInit(ThreadVars *tv, void *initdata, void **data) {
}
*data = (void *)ntv;
receive_queue_num++;
SCMutexUnlock(&nfq_init_lock);
return TM_ECODE_OK;
}
TmEcode VerdictNFQThreadInit(ThreadVars *tv, void *initdata, void **data) {
SCMutexLock(&nfq_init_lock);
SCLogDebug("starting... will bind to queuenum %" PRIu32 "", verdict_queue_num);
/* no initialization, ReceiveNFQ takes care of that */
NFQThreadVars *ntv = &nfq_t[verdict_queue_num];
*data = (void *)ntv;
verdict_queue_num++;
*data = (void *)initdata;
SCMutexUnlock(&nfq_init_lock);
return TM_ECODE_OK;
}
TmEcode VerdictNFQThreadDeinit(ThreadVars *tv, void *data) {
NFQThreadVars *ntv = (NFQThreadVars *)data;
NFQQueueVars *nq = NFQGetQueue(ntv->nfq_index);
SCLogDebug("starting... will close queuenum %" PRIu32 "", ntv->queue_num);
nfq_destroy_queue(ntv->qh);
SCLogDebug("starting... will close queuenum %" PRIu32 "", nq->queue_num);
nfq_destroy_queue(nq->qh);
return TM_ECODE_OK;
}
/**
* \brief Add a Netfilter queue
*
* \param string with the queue name
*
* \retval 0 on success.
* \retval -1 on failure.
*/
int NFQRegisterQueue(char *queue)
{
NFQThreadVars *ntv = NULL;
NFQQueueVars *nq = NULL;
/* Extract the queue number from the specified command line argument */
uint16_t queue_num = 0;
if ((ByteExtractStringUint16(&queue_num, 10, strlen(queue), queue)) < 0)
{
SCLogError(SC_ERR_INVALID_ARGUMENT, "specified queue number %s is not "
"valid", queue);
return -1;
}
SCMutexLock(&nfq_init_lock);
if (receive_queue_num >= NFQ_MAX_QUEUE) {
SCLogError(SC_ERR_INVALID_ARGUMENT,
"too much Netfilter queue registered (%d)",
receive_queue_num);
SCMutexUnlock(&nfq_init_lock);
return -1;
}
if (receive_queue_num == 0) {
memset(&nfq_t, 0, sizeof(nfq_t));
memset(&nfq_q, 0, sizeof(nfq_q));
}
ntv = &nfq_t[receive_queue_num];
ntv->nfq_index = receive_queue_num;
nq = &nfq_q[receive_queue_num];
nq->queue_num = queue_num;
receive_queue_num++;
SCMutexUnlock(&nfq_init_lock);
SCLogDebug("Queue \"%s\" registered.", queue);
return 0;
}
/**
* \brief Get the number of registered queues
*
* \retval cnt the number of registered queues
*/
int NFQGetQueueCount(void) {
return receive_queue_num;
}
/**
* \brief Get a pointer to the NFQ queue at index
*
* \param number idx of the queue in our array
*
* \retval ptr pointer to the NFQThreadVars at index
* \retval NULL on error
*/
void *NFQGetQueue(int number) {
if (number > receive_queue_num)
return NULL;
return (void *)&nfq_q[number];
}
/**
* \brief Get queue number to the NFQ at index
*
* \param number idx of the queue in our array
*
* \retval ptr pointer to the NFQThreadVars at index
* \retval -1 on error
*/
int NFQGetQueueNum(int number) {
if (number > receive_queue_num)
return -1;
return nfq_q[number].queue_num;
}
/**
* \brief Get a pointer to the NFQ thread at index
*
* \param number idx of the queue in our array
*
* \retval ptr pointer to the NFQThreadVars at index
* \retval NULL on error
*/
void *NFQGetThread(int number) {
if (number > receive_queue_num)
return NULL;
return (void *)&nfq_t[number];
}
/**
* \brief NFQ function to get a packet from the kernel
*
* \note separate functions for Linux and Win32 for readability.
*/
#ifndef OS_WIN32
void NFQRecvPkt(NFQThreadVars *t) {
void NFQRecvPkt(NFQQueueVars *t) {
int rv, ret;
char buf[70000];
@ -426,7 +511,9 @@ void NFQRecvPkt(NFQThreadVars *t) {
/* no error on timeout */
} else {
#ifdef COUNTERS
SCMutexLock(&t->mutex_qh);
t->errs++;
SCMutexUnlock(&t->mutex_qh);
#endif /* COUNTERS */
}
} else if(rv == 0) {
@ -449,7 +536,7 @@ void NFQRecvPkt(NFQThreadVars *t) {
}
}
#else /* WIN32 version of NFQRecvPkt */
void NFQRecvPkt(NFQThreadVars *t) {
void NFQRecvPkt(NFQQueueVars *t) {
int rv, ret;
char buf[70000];
static int timeouted = 0;
@ -519,6 +606,12 @@ process_rv:
TmEcode ReceiveNFQ(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) {
NFQThreadVars *ntv = (NFQThreadVars *)data;
NFQQueueVars *nq = NFQGetQueue(ntv->nfq_index);
if (nq == NULL) {
SCLogWarning(SC_ERR_INVALID_ARGUMENT,
"can't get queue for %" PRId16 "", ntv->nfq_index);
return TM_ECODE_FAILED;
}
/* make sure we have at least one packet in the packet pool, to prevent
* us from 1) alloc'ing packets at line rate, 2) have a race condition
@ -528,7 +621,7 @@ TmEcode ReceiveNFQ(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, Packe
}
/* do our nfq magic */
NFQRecvPkt(ntv);
NFQRecvPkt(nq);
return TM_ECODE_OK;
}
@ -538,32 +631,25 @@ TmEcode ReceiveNFQ(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, Packe
*/
void ReceiveNFQThreadExitStats(ThreadVars *tv, void *data) {
NFQThreadVars *ntv = (NFQThreadVars *)data;
NFQQueueVars *nq = NFQGetQueue(ntv->nfq_index);
#ifdef COUNTERS
SCLogInfo("(%s) Pkts %" PRIu32 ", Bytes %" PRIu64 ", Errors %" PRIu32 "",
tv->name, ntv->pkts, ntv->bytes, ntv->errs);
#endif
}
/**
* \brief NFQ verdict module stats printing function
*/
void VerdictNFQThreadExitStats(ThreadVars *tv, void *data) {
NFQThreadVars *ntv = (NFQThreadVars *)data;
#ifdef COUNTERS
SCLogInfo("(%s) Pkts accepted %" PRIu32 ", dropped %" PRIu32
", replaced %" PRIu32, tv->name, ntv->accepted,
ntv->dropped, ntv->replaced);
tv->name, nq->pkts, nq->bytes, nq->errs);
SCLogInfo("Pkts accepted %"PRIu32", dropped %"PRIu32", replaced %"PRIu32,
nq->accepted, nq->dropped, nq->replaced);
#endif
}
/**
* \brief NFQ verdict function
*/
void NFQSetVerdict(NFQThreadVars *t, Packet *p) {
void NFQSetVerdict(Packet *p) {
int ret;
uint32_t verdict;
NFQQueueVars *t = nfq_q + p->nfq_v.nfq_index;
//printf("%p verdicting on queue %" PRIu32 "\n", t, t->queue_num);
SCMutexLock(&t->mutex_qh);
if (p->action & ACTION_REJECT || p->action & ACTION_REJECT_BOTH ||
p->action & ACTION_REJECT_DST || p->action & ACTION_DROP) {
@ -598,9 +684,6 @@ void NFQSetVerdict(NFQThreadVars *t, Packet *p) {
* \brief NFQ verdict module packet entry function
*/
TmEcode VerdictNFQ(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) {
NFQThreadVars *ntv = (NFQThreadVars *)data;
/* can't verdict a "fake" packet */
if (p->flags & PKT_PSEUDO_STREAM_END) {
SCReturnInt(TM_ECODE_OK);
@ -625,13 +708,13 @@ TmEcode VerdictNFQ(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, Packe
/* don't verdict if we are not ready */
if (verdict == 1) {
//printf("VerdictNFQ: setting verdict\n");
NFQSetVerdict(ntv, p->root ? p->root : p);
NFQSetVerdict(p->root ? p->root : p);
} else {
TUNNEL_INCR_PKT_RTV(p);
}
} else {
/* no tunnel, verdict normally */
NFQSetVerdict(ntv, p);
NFQSetVerdict(p);
}
return TM_ECODE_OK;
}

@ -42,6 +42,7 @@
typedef struct NFQPacketVars_
{
int id; /* this nfq packets id */
uint16_t nfq_index; /* index in NFQ array */
uint32_t mark;
uint32_t ifi;
@ -50,6 +51,13 @@ typedef struct NFQPacketVars_
} NFQPacketVars;
typedef struct NFQThreadVars_
{
uint16_t nfq_index;
ThreadVars *tv;
} NFQThreadVars;
typedef struct NFQQueueVars_
{
struct nfq_handle *h;
#ifndef OS_WIN32
@ -64,6 +72,8 @@ typedef struct NFQThreadVars_
SCMutex mutex_qh;
/* this one should be not changing after init */
uint16_t queue_num;
/* position into the NFQ queue var array */
uint16_t nfq_index;
#ifdef DBG_PERF
int dbg_maxreadsize;
@ -77,14 +87,20 @@ typedef struct NFQThreadVars_
uint32_t dropped;
uint32_t replaced;
ThreadVars *tv;
} NFQThreadVars;
} NFQQueueVars;
typedef struct NFQGlobalVars_
{
char unbind;
} NFQGlobalVars;
int NFQRegisterQueue(char *queue);
int NFQGetQueueCount(void);
void *NFQGetQueue(int number);
int NFQGetQueueNum(int number);
void *NFQGetThread(int number);
#endif /* NFQ */
#endif /* __SOURCE_NFQ_H__ */

Loading…
Cancel
Save