@ -25,12 +25,13 @@
* Fanouts socket from David Miller :
* we need to support the split of flow in different socket
* option :
- packet_fanout type
- fanout ID ? ? seems it could be useful
- protocol is the IEEE 802.3 protocol number in network order ( filtering is great )
- runmode - > family of threads in parallel ( acccount )
- add a new ratio or threads number ( overwritten by cpu_affinity )
- add af_max_read_packets for batched reading
* - packet_fanout type
* - fanout ID ? ? seems it could be useful
* - protocol is the IEEE 802.3 protocol number in network order ( filtering
* is great )
* - runmode - > family of threads in parallel ( acccount )
* - add a new ratio or threads number ( overwritten by cpu_affinity )
* - add af_max_read_packets for batched reading
*
* architecture
* loop with read
@ -40,10 +41,12 @@
* bind
* must switch to promiscous mode - > use PACKET_MR_PROMISC socket option
*
* \ todo watch other interface event to detect suppression of the monitored interface
* \ todo watch other interface event to detect suppression of the monitored
* interface
*/
# include "suricata-common.h"
# include "config.h"
# include "suricata.h"
# include "decode.h"
# include "packet-queue.h"
@ -57,17 +60,64 @@
# include "util-debug.h"
# include "util-error.h"
# include "util-privs.h"
# include "util-optimize.h"
# include "tmqh-packetpool.h"
# include "source-af-packet.h"
# include <sys/ioctl.h>
# ifdef HAVE_AF_PACKET
# include <linux/if_ether.h>
# include <linux/if_packet.h>
# include <linux/if_arp.h>
# endif
extern uint8_t suricata_ctl_flags ;
extern int max_pending_packets ;
# define AFP_WORKER_MODE 0
# define AFP_LOOP_MODE 1
# ifndef HAVE_AF_PACKET
TmEcode NoAFPSupportExit ( ThreadVars * , void * , void * * ) ;
void TmModuleReceiveAFPRegister ( void ) {
tmm_modules [ TMM_RECEIVEAFP ] . name = " ReceiveAFP " ;
tmm_modules [ TMM_RECEIVEAFP ] . ThreadInit = NoAFPSupportExit ;
tmm_modules [ TMM_RECEIVEAFP ] . Func = NULL ;
tmm_modules [ TMM_RECEIVEAFP ] . ThreadExitPrintStats = NULL ;
tmm_modules [ TMM_RECEIVEAFP ] . ThreadDeinit = NULL ;
tmm_modules [ TMM_RECEIVEAFP ] . RegisterTests = NULL ;
tmm_modules [ TMM_RECEIVEAFP ] . cap_flags = 0 ;
}
/**
* \ brief Registration Function for DecodeAFP .
* \ todo Unit tests are needed for this module .
*/
void TmModuleDecodeAFPRegister ( void ) {
tmm_modules [ TMM_DECODEAFP ] . name = " DecodeAFP " ;
tmm_modules [ TMM_DECODEAFP ] . ThreadInit = NoAFPSupportExit ;
tmm_modules [ TMM_DECODEAFP ] . Func = NULL ;
tmm_modules [ TMM_DECODEAFP ] . ThreadExitPrintStats = NULL ;
tmm_modules [ TMM_DECODEAFP ] . ThreadDeinit = NULL ;
tmm_modules [ TMM_DECODEAFP ] . RegisterTests = NULL ;
tmm_modules [ TMM_DECODEAFP ] . cap_flags = 0 ;
}
/**
* \ brief this function prints an error message and exits .
*/
TmEcode NoAFPSupportExit ( ThreadVars * tv , void * initdata , void * * data )
{
SCLogError ( SC_ERR_NO_AF_PACKET , " Error creating thread %s: you do not have "
" support for AF_PACKET enabled, on Linux host please recompile "
" with --enable-af-packet " , tv - > name ) ;
exit ( EXIT_FAILURE ) ;
}
# else /* We have AF_PACKET support */
/** control how many packets we may read in one go */
static int afp_max_read_packets = 0 ;
/** max packets < 65536 */
@ -81,8 +131,6 @@ static int afp_max_read_packets = 0;
# define POLL_TIMEOUT 100
# define AFP_BUFSIZE 4096
/**
* \ brief Structure to hold thread specific variables .
*/
@ -103,19 +151,21 @@ typedef struct AFPThreadVars_
uint64_t bytes ;
uint32_t errs ;
/* AFP buffer size */
int AFP_buffer_size ;
/* socket buffer size */
int buffer_size ;
int cluster_id ;
int cluster_type ;
ThreadVars * tv ;
TmSlot * slot ;
Packet * in_p ;
Packet * array [ AFP_FILE_MAX_PKTS ] ;
uint16_t array_idx ;
int fanout ;
char * data ; /** Per function and thread data */
uint8_t * data ; /** Per function and thread data */
int datalen ; /** Length of per function and thread data */
} AFPThreadVars ;
@ -123,6 +173,7 @@ TmEcode ReceiveAFP(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *)
TmEcode ReceiveAFPThreadInit ( ThreadVars * , void * , void * * ) ;
void ReceiveAFPThreadExitStats ( ThreadVars * , void * ) ;
TmEcode ReceiveAFPThreadDeinit ( ThreadVars * , void * ) ;
TmEcode ReceiveAFPLoop ( ThreadVars * tv , void * data , void * slot ) ;
TmEcode DecodeAFPThreadInit ( ThreadVars * , void * , void * * ) ;
TmEcode DecodeAFP ( ThreadVars * , Packet * , void * , PacketQueue * , PacketQueue * ) ;
@ -135,6 +186,7 @@ void TmModuleReceiveAFPRegister (void) {
tmm_modules [ TMM_RECEIVEAFP ] . name = " ReceiveAFP " ;
tmm_modules [ TMM_RECEIVEAFP ] . ThreadInit = ReceiveAFPThreadInit ;
tmm_modules [ TMM_RECEIVEAFP ] . Func = ReceiveAFP ;
tmm_modules [ TMM_RECEIVEAFP ] . PktAcqLoop = ReceiveAFPLoop ;
tmm_modules [ TMM_RECEIVEAFP ] . ThreadExitPrintStats = ReceiveAFPThreadExitStats ;
tmm_modules [ TMM_RECEIVEAFP ] . ThreadDeinit = NULL ;
tmm_modules [ TMM_RECEIVEAFP ] . RegisterTests = NULL ;
@ -155,7 +207,26 @@ void TmModuleDecodeAFPRegister (void) {
tmm_modules [ TMM_DECODEAFP ] . cap_flags = 0 ;
}
static int createsocket ( AFPThreadVars * ptv , char * devname , int verbose ) ;
static int AFPCreateSocket ( AFPThreadVars * ptv , char * devname , int verbose ) ;
int AFPConfGetThreads ( )
{
int afp_threads = 1 ;
char * threadsstr = NULL ;
if ( ConfGet ( " af-packet.threads " , & threadsstr ) ! = 1 ) {
afp_threads = 1 ;
} else {
if ( threadsstr ! = NULL ) {
afp_threads = ( uint8_t ) atoi ( threadsstr ) ;
}
}
if ( afp_threads = = 0 ) {
afp_threads = 1 ;
}
return afp_threads ;
}
/**
* \ brief AF packet read function .
@ -166,11 +237,10 @@ static int createsocket(AFPThreadVars *ptv, char *devname, int verbose);
* \ param user pointer to AFPThreadVars
* \ retval TM_ECODE_FAILED on failure and TM_ECODE_OK on success
*/
TmEcode AFPRead ( AFPThreadVars * ptv )
TmEcode AFPRead ( AFPThreadVars * ptv , int mode )
{
Packet * p = NULL ;
/* XXX should try to use read that get directly to packet */
uint8_t buf [ AFP_BUFSIZE ] ;
int offset = 0 ;
int caplen ;
struct sockaddr_ll from ;
@ -198,8 +268,8 @@ TmEcode AFPRead(AFPThreadVars *ptv)
offset = SLL_HEADER_LEN ;
else
offset = 0 ;
iov . iov_len = AFP_BUFSIZE - offset ;
iov . iov_base = buf + offset ;
iov . iov_len = ptv- > datalen - offset ;
iov . iov_base = ptv- > data + offset ;
caplen = recvmsg ( ptv - > socket , & msg , MSG_TRUNC ) ;
@ -208,14 +278,21 @@ TmEcode AFPRead(AFPThreadVars *ptv)
errno ) ;
SCReturnInt ( TM_ECODE_FAILED ) ;
}
if ( ptv - > array_idx = = 0 ) {
p = ptv - > in_p ;
} else {
p = PacketGetFromQueueOrAlloc ( ) ;
switch ( mode ) {
case AFP_WORKER_MODE :
if ( ptv - > array_idx = = 0 ) {
p = ptv - > in_p ;
} else {
p = PacketGetFromQueueOrAlloc ( ) ;
}
break ;
case AFP_LOOP_MODE :
p = PacketGetFromQueueOrAlloc ( ) ;
break ;
default :
SCLogError ( SC_ERR_INVALID_VALUE , " AFPRread does not support this mode " ) ;
}
if ( p = = NULL ) {
TmqhOutputPacketpool ( ptv - > tv , p ) ;
SCReturnInt ( TM_ECODE_FAILED ) ;
}
@ -232,23 +309,33 @@ TmEcode AFPRead(AFPThreadVars *ptv)
/* add forged header */
if ( ptv - > cooked ) {
SllHdr * hdrp = ( SllHdr * ) buf ;
SllHdr * hdrp = ( SllHdr * ) ptv- > data ;
/* XXX this is minimalist, but this seems enough */
hdrp - > sll_protocol = from . sll_protocol ;
}
p - > datalink = ptv - > datalink ;
SET_PKT_LEN ( p , caplen + offset ) ;
if ( PacketCopyData ( p , buf , GET_PKT_LEN ( p ) ) = = - 1 ) {
if ( PacketCopyData ( p , ptv- > data , GET_PKT_LEN ( p ) ) = = - 1 ) {
TmqhOutputPacketpool ( ptv - > tv , p ) ;
SCReturnInt ( TM_ECODE_FAILED ) ;
}
SCLogDebug ( " pktlen: % " PRIu32 " (pkt %02x, pkt data %02x) " ,
GET_PKT_LEN ( p ) , * pkt , * GET_PKT_DATA ( p ) ) ;
/* store the packet in our array */
ptv - > array [ ptv - > array_idx ] = p ;
ptv - > array_idx + + ;
SCLogDebug ( " pktlen: % " PRIu32 " (pkt %p, pkt data %p) " ,
GET_PKT_LEN ( p ) , p , GET_PKT_DATA ( p ) ) ;
switch ( mode ) {
case AFP_WORKER_MODE :
/* store the packet in our array */
ptv - > array [ ptv - > array_idx ] = p ;
ptv - > array_idx + + ;
break ;
case AFP_LOOP_MODE :
TmThreadsSlotProcessPkt ( ptv - > tv , ptv - > slot , p ) ;
break ;
default :
SCLogError ( SC_ERR_INVALID_VALUE , " AFPRread does not support this mode " ) ;
TmqhOutputPacketpool ( ptv - > tv , p ) ;
}
SCReturnInt ( TM_ECODE_OK ) ;
}
@ -258,7 +345,7 @@ static int AFPTryReopen(AFPThreadVars *ptv)
ptv - > afp_state = AFP_STATE_DOWN ;
afp_activate_r = creates ocket( ptv , ptv - > iface , 0 ) ;
afp_activate_r = AFPCreateS ocket( ptv , ptv - > iface , 0 ) ;
if ( afp_activate_r ! = 0 ) {
return afp_activate_r ;
}
@ -268,6 +355,101 @@ static int AFPTryReopen(AFPThreadVars *ptv)
return 0 ;
}
/**
* \ brief Main AF_PACKET reading Loop function
*/
TmEcode ReceiveAFPLoop ( ThreadVars * tv , void * data , void * slot )
{
uint16_t packet_q_len = 0 ;
AFPThreadVars * ptv = ( AFPThreadVars * ) data ;
TmSlot * s = ( TmSlot * ) slot ;
ptv - > slot = s - > slot_next ;
struct pollfd fds ;
int r ;
SCEnter ( ) ;
fds . fd = ptv - > socket ;
fds . events = POLLIN ;
while ( 1 ) {
/* Start by checking the state of our interface */
if ( unlikely ( ptv - > afp_state = = AFP_STATE_DOWN ) ) {
int dbreak = 0 ;
do {
usleep ( AFP_RECONNECT_TIMEOUT ) ;
if ( suricata_ctl_flags ! = 0 ) {
dbreak = 1 ;
break ;
}
r = AFPTryReopen ( ptv ) ;
} while ( r < 0 ) ;
if ( dbreak = = 1 )
break ;
}
/* make sure we have at least one packet in the packet pool, to prevent
* us from alloc ' ing packets at line rate */
do {
packet_q_len = PacketPoolSize ( ) ;
if ( unlikely ( packet_q_len = = 0 ) ) {
PacketPoolWait ( ) ;
}
} while ( packet_q_len = = 0 ) ;
r = poll ( & fds , 1 , POLL_TIMEOUT ) ;
if ( suricata_ctl_flags ! = 0 ) {
break ;
}
if ( r > 0 & &
( fds . revents & ( POLLHUP | POLLRDHUP | POLLERR | POLLNVAL ) ) ) {
if ( fds . revents & ( POLLHUP | POLLRDHUP ) ) {
close ( ptv - > socket ) ;
ptv - > afp_state = AFP_STATE_DOWN ;
continue ;
}
if ( fds . revents & POLLERR ) {
char c ;
/* Do a recv to get errno */
if ( recv ( ptv - > socket , & c , sizeof c , MSG_PEEK ) ! = - 1 )
continue ; /* what, no error? */
SCLogError ( SC_ERR_AFP_READ , " Error reading data from socket: (%d " PRIu32 " ) %s " ,
errno , strerror ( errno ) ) ;
close ( ptv - > socket ) ;
ptv - > afp_state = AFP_STATE_DOWN ;
continue ;
}
if ( fds . revents & POLLNVAL ) {
SCLogError ( SC_ERR_AFP_READ , " Invalid polling request " ) ;
close ( ptv - > socket ) ;
ptv - > afp_state = AFP_STATE_DOWN ;
continue ;
}
} else if ( r > 0 ) {
/* AFPRead will call TmThreadsSlotProcessPkt on read packets */
r = AFPRead ( ptv , AFP_LOOP_MODE ) ;
if ( r ! = TM_ECODE_OK ) {
SCReturnInt ( TM_ECODE_FAILED ) ;
}
} else if ( ( r < 0 ) & & ( errno ! = EINTR ) ) {
SCLogError ( SC_ERR_AFP_READ , " Error reading data from socket: (%d " PRIu32 " ) %s " ,
errno , strerror ( errno ) ) ;
close ( ptv - > socket ) ;
ptv - > afp_state = AFP_STATE_DOWN ;
continue ;
}
}
if ( suricata_ctl_flags & SURICATA_STOP | |
suricata_ctl_flags & SURICATA_KILL ) {
SCReturnInt ( TM_ECODE_FAILED ) ;
}
SCReturnInt ( TM_ECODE_OK ) ;
}
/**
* \ brief Recieves packets from an interface via AF_PACKET socket
*
@ -322,19 +504,42 @@ TmEcode ReceiveAFP(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, Packe
while ( r > = 0 ) {
r = poll ( & fds , 1 , POLL_TIMEOUT ) ;
if ( r > 0 ) {
if ( suricata_ctl_flags ! = 0 ) {
break ;
}
ret = AFPRead ( ptv ) ;
if ( ret ! = TM_ECODE_OK ) {
SCReturnInt ( TM_ECODE_FAILED ) ;
if ( r > 0 & &
( fds . revents & ( POLLHUP | POLLRDHUP | POLLERR | POLLNVAL ) ) ) {
if ( fds . revents & ( POLLHUP | POLLRDHUP ) ) {
close ( ptv - > socket ) ;
ptv - > afp_state = AFP_STATE_DOWN ;
break ;
}
if ( suricata_ctl_flags ! = 0 ) {
if ( fds . revents & POLLERR ) {
char c ;
/* Do a recv to get errno */
if ( recv ( ptv - > socket , & c , sizeof c , MSG_PEEK ) ! = - 1 )
continue ; /* what, no error? */
SCLogError ( SC_ERR_AFP_READ , " Error reading data from socket: (%d " PRIu32 " ) %s " ,
errno , strerror ( errno ) ) ;
close ( ptv - > socket ) ;
ptv - > afp_state = AFP_STATE_DOWN ;
break ;
}
if ( fds . revents & POLLNVAL ) {
SCLogError ( SC_ERR_AFP_READ , " Invalid polling request " ) ;
close ( ptv - > socket ) ;
ptv - > afp_state = AFP_STATE_DOWN ;
break ;
}
} else if ( r > 0 ) {
ret = AFPRead ( ptv , AFP_WORKER_MODE ) ;
if ( ret ! = TM_ECODE_OK ) {
SCReturnInt ( TM_ECODE_FAILED ) ;
}
if ( cnt + + > = afp_max_read_packets )
break ;
}
if ( r < 0 ) {
} else if ( ( r < 0 ) & & ( errno ! = EINTR ) ) {
int dbreak = 0 ;
SCLogError ( SC_ERR_AFP_READ , " Error reading data from socket: (%d " PRIu32 " ) %s " ,
errno , strerror ( errno ) ) ;
@ -350,11 +555,8 @@ TmEcode ReceiveAFP(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, Packe
r = 0 ;
break ;
}
}
if ( r = = 0 ) {
if ( suricata_ctl_flags ! = 0 ) {
break ;
}
} else if ( r = = 0 ) {
/* timeout condition */
if ( cnt > 0 )
break ;
}
@ -371,9 +573,7 @@ TmEcode ReceiveAFP(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, Packe
}
if ( r < 0 ) {
SCLogError ( SC_ERR_AFP_DISPATCH , " error code % " PRId32 ,
r ) ;
SCLogError ( SC_ERR_AFP_DISPATCH , " error code % " PRId32 , r ) ;
SCReturnInt ( TM_ECODE_OK ) ;
}
@ -384,12 +584,12 @@ TmEcode ReceiveAFP(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, Packe
SCReturnInt ( TM_ECODE_OK ) ;
}
static int getifnumbyd ev( int fd , const char * ifname , int verbose )
static int AFPGetIfnumByD ev( int fd , const char * ifname , int verbose )
{
struct ifreq ifr ;
memset ( & ifr , 0 , sizeof ( ifr ) ) ;
str n cpy( ifr . ifr_name , ifname , sizeof ( ifr . ifr_name ) ) ;
str l cpy( ifr . ifr_name , ifname , sizeof ( ifr . ifr_name ) ) ;
if ( ioctl ( fd , SIOCGIFINDEX , & ifr ) = = - 1 ) {
if ( verbose )
@ -401,12 +601,12 @@ static int getifnumbydev(int fd, const char *ifname, int verbose)
return ifr . ifr_ifindex ;
}
static int getdevl inktype( int fd , const char * ifname )
static int AFPGetDevL inktype( int fd , const char * ifname )
{
struct ifreq ifr ;
memset ( & ifr , 0 , sizeof ( ifr ) ) ;
str n cpy( ifr . ifr_name , ifname , sizeof ( ifr . ifr_name ) ) ;
str l cpy( ifr . ifr_name , ifname , sizeof ( ifr . ifr_name ) ) ;
if ( ioctl ( fd , SIOCGIFHWADDR , & ifr ) = = - 1 ) {
SCLogError ( SC_ERR_AFP_CREATE , " Unable to find type for iface \" %s \" : %s " ,
@ -414,10 +614,17 @@ static int getdevlinktype(int fd, const char *ifname)
return - 1 ;
}
return ifr . ifr_hwaddr . sa_family ;
switch ( ifr . ifr_hwaddr . sa_family ) {
case ARPHRD_LOOPBACK :
return LINKTYPE_ETHERNET ;
case ARPHRD_PPP :
return LINKTYPE_RAW ;
default :
return ifr . ifr_hwaddr . sa_family ;
}
}
static int createsocket ( AFPThreadVars * ptv , char * devname , int verbose )
static int AFPCreateS ocket( AFPThreadVars * ptv , char * devname , int verbose )
{
int r ;
struct packet_mreq sock_params ;
@ -425,7 +632,7 @@ static int createsocket(AFPThreadVars *ptv, char *devname, int verbose)
/* open socket */
ptv - > socket = socket ( AF_PACKET , SOCK_RAW , htons ( ETH_P_ALL ) ) ;
if ( ptv - > socket = = - 1 ) {
SCLogError ( SC_ERR_AFP_CREATE , " Cou dn't create a AF_PACKET socket, error %s" , strerror ( errno ) ) ;
SCLogError ( SC_ERR_AFP_CREATE , " Cou l dn't create a AF_PACKET socket, error %s" , strerror ( errno ) ) ;
return - 1 ;
}
SCLogInfo ( " using interface %s " , ( char * ) devname ) ;
@ -433,10 +640,10 @@ static int createsocket(AFPThreadVars *ptv, char *devname, int verbose)
memset ( & bind_address , 0 , sizeof ( bind_address ) ) ;
bind_address . sll_family = AF_PACKET ;
bind_address . sll_protocol = htons ( ETH_P_ALL ) ;
bind_address . sll_ifindex = getifnumbyd ev( ptv - > socket , devname , verbose ) ;
bind_address . sll_ifindex = AFPGetIfnumByD ev( ptv - > socket , devname , verbose ) ;
if ( bind_address . sll_ifindex = = - 1 ) {
if ( verbose )
SCLogError ( SC_ERR_AFP_CREATE , " Cou dn't find iface %s" , devname ) ;
SCLogError ( SC_ERR_AFP_CREATE , " Cou l dn't find iface %s" , devname ) ;
return - 1 ;
}
r = bind ( ptv - > socket , ( struct sockaddr * ) & bind_address , sizeof ( bind_address ) ) ;
@ -463,23 +670,44 @@ static int createsocket(AFPThreadVars *ptv, char *devname, int verbose)
r = setsockopt ( ptv - > socket , SOL_PACKET , PACKET_ADD_MEMBERSHIP , ( void * ) & sock_params , sizeof ( sock_params ) ) ;
if ( r < 0 ) {
SCLogError ( SC_ERR_AFP_CREATE ,
" Cou dn't switch iface %s to promiscuous, error %s" ,
" Cou l dn't switch iface %s to promiscuous, error %s" ,
devname ,
strerror ( errno ) ) ;
close ( ptv - > socket ) ;
return - 1 ;
}
/* set socket recv buffer size */
if ( ptv - > buffer_size ! = 0 ) {
/*
* Set the socket buffer size to the specified value .
*/
SCLogInfo ( " Setting AF_PACKET socket buffer to %d " , ptv - > buffer_size ) ;
if ( setsockopt ( ptv - > socket , SOL_SOCKET , SO_RCVBUF ,
& ptv - > buffer_size ,
sizeof ( ptv - > buffer_size ) ) = = - 1 ) {
SCLogError ( SC_ERR_AFP_CREATE ,
" Couldn't set buffer size to %d on iface %s, error %s " ,
ptv - > buffer_size ,
devname ,
strerror ( errno ) ) ;
close ( ptv - > socket ) ;
return - 1 ;
}
}
# ifdef HAVE_PACKET_FANOUT
/* add binded socket to fanout group */
if ( ptv - > fanout ) {
if ( AFPConfGetThreads ( ) > 1 ) {
uint32_t option = 0 ;
uint16_t mode = PACKET_FANOUT_HASH ;
uint16_t id = 1 ;
uint16_t mode = ptv- > cluster_type ;
uint16_t id = ptv - > cluster_id ;
option = ( mode < < 16 ) | ( id & 0xffff ) ;
r = setsockopt ( ptv - > socket , SOL_PACKET , PACKET_FANOUT , ( void * ) & option , sizeof ( option ) ) ;
if ( r < 0 ) {
SCLogError ( SC_ERR_AFP_CREATE ,
" Coudn't set fanout mode, error %s " ,
strerror ( errno ) ) ;
close ( ptv - > socket ) ;
return - 1 ;
}
}
@ -502,7 +730,11 @@ static int createsocket(AFPThreadVars *ptv, char *devname, int verbose)
TmEcode ReceiveAFPThreadInit ( ThreadVars * tv , void * initdata , void * * data ) {
SCEnter ( ) ;
int r ;
int value ;
intmax_t value ;
# ifdef HAVE_PACKET_FANOUT
char * tmpclusterid ;
char * tmpctype ;
# endif
/* use max_pending_packets as AFP read size unless it's bigger than
* our size limit */
@ -522,28 +754,81 @@ TmEcode ReceiveAFPThreadInit(ThreadVars *tv, void *initdata, void **data) {
ptv - > tv = tv ;
ptv - > cooked = 0 ;
str n cpy( ptv - > iface , initdata , AFP_IFACE_NAME_LENGTH ) ;
str l cpy( ptv - > iface , initdata , AFP_IFACE_NAME_LENGTH ) ;
ptv - > iface [ AFP_IFACE_NAME_LENGTH - 1 ] = ' \0 ' ;
r = createsocket ( ptv , initdata , 1 ) ;
if ( ( ConfGetInt ( " af-packet.buffer-size " , & value ) ) = = 1 ) {
ptv - > buffer_size = value ;
} else {
ptv - > buffer_size = 0 ;
}
# ifdef HAVE_PACKET_FANOUT
ptv - > cluster_type = PACKET_FANOUT_LB ;
ptv - > cluster_id = 1 ;
/* We only set cluster info if the number of reader threads is greater than 1 */
if ( AFPConfGetThreads ( ) > 1 ) {
if ( ConfGet ( " af-packet.cluster-id " , & tmpclusterid ) ! = 1 ) {
SCLogError ( SC_ERR_INVALID_ARGUMENT , " could not get af-packet.cluster-id " ) ;
return TM_ECODE_FAILED ;
} else {
ptv - > cluster_id = ( uint16_t ) atoi ( tmpclusterid ) ;
SCLogDebug ( " Going to use cluster-id % " PRId32 , ptv - > cluster_id ) ;
}
if ( ConfGet ( " af-packet.cluster-type " , & tmpctype ) ! = 1 ) {
SCLogError ( SC_ERR_GET_CLUSTER_TYPE_FAILED , " Could not get af-packet.cluster-type " ) ;
return TM_ECODE_FAILED ;
} else if ( strcmp ( tmpctype , " cluster_round_robin " ) = = 0 ) {
SCLogInfo ( " Using round-robin cluster mode for AF_PACKET (thread %s) " ,
ptv - > tv - > name ) ;
ptv - > cluster_type = PACKET_FANOUT_LB ;
} else if ( strcmp ( tmpctype , " cluster_flow " ) = = 0 ) {
/* In hash mode, we also ask for defragmentation needed to
* compute the hash */
uint16_t defrag = 0 ;
SCLogInfo ( " Using flow cluster mode for AF_PACKET (thread %s) " ,
ptv - > tv - > name ) ;
ConfGetBool ( " af-packet.defrag " , ( int * ) & defrag ) ;
if ( defrag ) {
SCLogInfo ( " Using defrag kernel functionnality for AF_PACKET (thread %s) " ,
ptv - > tv - > name ) ;
defrag = PACKET_FANOUT_FLAG_DEFRAG ;
}
ptv - > cluster_type = PACKET_FANOUT_HASH | defrag ;
} else if ( strcmp ( tmpctype , " cluster_cpu " ) = = 0 ) {
SCLogInfo ( " Using cpu cluster mode for AF_PACKET (thread %s) " ,
ptv - > tv - > name ) ;
ptv - > cluster_type = PACKET_FANOUT_CPU ;
} else {
SCLogError ( SC_ERR_INVALID_CLUSTER_TYPE , " invalid cluster-type %s " , tmpctype ) ;
return TM_ECODE_FAILED ;
}
}
# endif
r = AFPCreateSocket ( ptv , initdata , 1 ) ;
if ( r < 0 ) {
SCLogError ( SC_ERR_AFP_CREATE , " Coudn't init AF_PACKET socket " ) ;
SCLogError ( SC_ERR_AFP_CREATE , " Cou l dn't init AF_PACKET socket" ) ;
SCFree ( ptv ) ;
SCReturnInt ( TM_ECODE_FAILED ) ;
}
ptv - > datalink = getdevlinktype ( ptv - > socket , ptv - > iface ) ;
ptv - > datalink = AFPGetDevL inktype( ptv - > socket , ptv - > iface ) ;
switch ( ptv - > datalink ) {
case ARPHRD_PPP :
case ARPHRD_ATM :
ptv - > cooked = 1 ;
}
if ( ( ConfGetBool ( " af-packet.fanout " , & value ) ) = = 1 ) {
ptv - > fanout = value ;
} else {
ptv - > fanout = 0 ;
# define T_DATA_SIZE 70000
ptv - > data = SCMalloc ( T_DATA_SIZE ) ;
if ( ptv - > data = = NULL ) {
SCReturnInt ( TM_ECODE_FAILED ) ;
}
ptv - > datalen = T_DATA_SIZE ;
# undef T_DATA_SIZE
* data = ( void * ) ptv ;
SCReturnInt ( TM_ECODE_OK ) ;
@ -557,9 +842,8 @@ TmEcode ReceiveAFPThreadInit(ThreadVars *tv, void *initdata, void **data) {
void ReceiveAFPThreadExitStats ( ThreadVars * tv , void * data ) {
SCEnter ( ) ;
AFPThreadVars * ptv = ( AFPThreadVars * ) data ;
/**
* \ todo Counter output
*/
SCLogInfo ( " (%s) Packets % " PRIu32 " , bytes % " PRIu64 " " , tv - > name , ptv - > pkts , ptv - > bytes ) ;
}
/**
@ -570,6 +854,12 @@ void ReceiveAFPThreadExitStats(ThreadVars *tv, void *data) {
TmEcode ReceiveAFPThreadDeinit ( ThreadVars * tv , void * data ) {
AFPThreadVars * ptv = ( AFPThreadVars * ) data ;
if ( ptv - > data ! = NULL ) {
SCFree ( ptv - > data ) ;
ptv - > data = NULL ;
}
ptv - > datalen = 0 ;
close ( ptv - > socket ) ;
SCReturnInt ( TM_ECODE_OK ) ;
}
@ -643,4 +933,5 @@ TmEcode DecodeAFPThreadInit(ThreadVars *tv, void *initdata, void **data)
SCReturnInt ( TM_ECODE_OK ) ;
}
# endif /* HAVE_AF_PACKET */
/* eof */