@ -25,7 +25,6 @@
# include "flow-var.h"
# include "flow-private.h"
# include "util-unittest.h"
# include "stream-tcp-private.h"
//#define FLOW_DEFAULT_HASHSIZE 262144
# define FLOW_DEFAULT_HASHSIZE 65536
@ -40,32 +39,51 @@ static int FlowUpdateSpareFlows(void);
int FlowSetProtoTimeout ( uint8_t , uint32_t , uint32_t , uint32_t ) ;
int FlowSetProtoEmergencyTimeout ( uint8_t , uint32_t , uint32_t , uint32_t ) ;
static int FlowClearMemory ( Flow * , uint8_t ) ;
static int FlowGetProtoMapping ( uint8_t ) ;
int FlowSetProtoFreeFunc ( uint8_t , void ( * Free ) ( void * ) ) ;
int FlowSetFlowStateFunc ( uint8_t , int ( * GetProtoState ) ( void * ) ) ;
/** \brief Update the flows position in the queue's
* \ param f Flow to requeue .
* \ todo if we have a flow state func rely on that soly
*
* In - use flows are either in the flow_new_q or flow_est _q lists .
* In - use flows are in the flow_new_q , flow_est_q lists or flow_close _q lists .
*/
static void FlowUpdateQueue ( Flow * f )
void FlowUpdateQueue ( Flow * f )
{
if ( f - > flags & FLOW_NEW_LIST ) {
/* in the new list -- we consider a flow no longer
* new if we have seen at least 2 pkts in both ways . */
if ( f - > todstpktcnt & & f - > tosrcpktcnt ) {
FlowRequeue ( f , & flow_new_q , & flow_est_q ) ;
FlowRequeue ( f , & flow_new_q [f - > protomap ] , & flow_est_q [ f - > protomap ] ) ;
f - > flags | = FLOW_EST_LIST ; /* transition */
f - > flags & = ~ FLOW_NEW_LIST ;
} else {
FlowRequeue ( f , & flow_new_q , & flow_new_q ) ;
FlowRequeue ( f , & flow_new_q [f - > protomap ] , & flow_new_q [ f - > protomap ] ) ;
}
} else if ( f - > flags & FLOW_EST_LIST ) {
if ( flow_proto [ f - > protomap ] . GetProtoState ! = NULL ) {
uint8_t state = flow_proto [ f - > protomap ] . GetProtoState ( f - > protoctx ) ;
if ( state = = FLOW_STATE_CLOSED ) {
f - > flags | = FLOW_CLOSED_LIST ; /* transition */
f - > flags & = ~ FLOW_EST_LIST ;
//printf("FlowUpdateQueue %p was put into closing queue ts %"PRIuMAX"\n", f, (uintmax_t)f->lastts.tv_sec);
FlowRequeue ( f , & flow_est_q [ f - > protomap ] , & flow_close_q [ f - > protomap ] ) ;
} else {
/* Pull and put back -- this way the flows on
* top of the list are least recently used . */
FlowRequeue ( f , & flow_est_q [ f - > protomap ] , & flow_est_q [ f - > protomap ] ) ;
}
} else {
/* Pull and put back -- this way the flows on
* top of the list are least recently used . */
FlowRequeue ( f , & flow_est_q [ f - > protomap ] , & flow_est_q [ f - > protomap ] ) ;
}
} else if ( f - > flags & FLOW_CLOSED_LIST ) {
/* Pull and put back -- this way the flows on
* top of the list are least recently used . */
FlowRequeue ( f , & flow_est_q , & flow_est_q ) ;
FlowRequeue ( f , & flow_ close_q[ f - > protomap ] , & flow_close_q [ f - > protomap ] ) ;
}
}
@ -111,70 +129,69 @@ static int FlowPrune (FlowQueue *q, struct timeval *ts)
/*set the timeout value according to the flow operating mode, flow's state
and protocol . */
uint32_t timeout = 0 ;
uint8_t proto_map ;
proto_map = FlowGetProtoMapping ( f - > proto ) ;
if ( flow_flags & FLOW_EMERGENCY ) {
if ( flow_proto [ proto_map ] . GetProtoState ! = NULL ) {
switch ( flow_proto [ proto_map ] . GetProtoState ( f - > stream ) ) {
if ( flow_proto [ f - > protomap ] . GetProtoState ! = NULL ) {
switch ( flow_proto [ f - > protomap ] . GetProtoState ( f - > protoctx ) ) {
case FLOW_STATE_NEW :
timeout = flow_proto [ proto_ map] . emerg_new_timeout ;
timeout = flow_proto [ f- > protomap] . emerg_new_timeout ;
break ;
case FLOW_STATE_ESTABLISHED :
timeout = flow_proto [ proto_ map] . emerg_est_timeout ;
timeout = flow_proto [ f- > protomap] . emerg_est_timeout ;
break ;
case FLOW_STATE_CLOSED :
timeout = flow_proto [ proto_ map] . emerg_closed_timeout ;
timeout = flow_proto [ f- > protomap] . emerg_closed_timeout ;
break ;
}
} else {
if ( f - > flags & FLOW_EST_LIST )
timeout = flow_proto [ proto_ map] . emerg_est_timeout ;
timeout = flow_proto [ f- > protomap] . emerg_est_timeout ;
else
timeout = flow_proto [ proto_ map] . emerg_new_timeout ;
timeout = flow_proto [ f- > protomap] . emerg_new_timeout ;
}
} else {
if ( flow_proto [ proto_map ] . GetProtoState ! = NULL ) {
switch ( flow_proto [ proto_map ] . GetProtoState ( f - > stream ) ) {
} else { /* impliet not emergency */
if ( flow_proto [ f - > protomap ] . GetProtoState ! = NULL ) {
switch ( flow_proto [ f - > protomap ] . GetProtoState ( f - > protoctx ) ) {
case FLOW_STATE_NEW :
timeout = flow_proto [ proto_ map] . new_timeout ;
timeout = flow_proto [ f - > protomap ] . new_timeout ;
break ;
case FLOW_STATE_ESTABLISHED :
timeout = flow_proto [ proto_ map] . est_timeout ;
timeout = flow_proto [ f- > protomap] . est_timeout ;
break ;
case FLOW_STATE_CLOSED :
timeout = flow_proto [ proto_ map] . closed_timeout ;
timeout = flow_proto [ f- > protomap] . closed_timeout ;
break ;
}
} else {
if ( f - > flags & FLOW_EST_LIST )
timeout = flow_proto [ proto_ map] . est_timeout ;
timeout = flow_proto [ f- > protomap] . est_timeout ;
else
timeout = flow_proto [ proto_ map] . new_timeout ;
timeout = flow_proto [ f- > protomap] . new_timeout ;
}
}
DEBUGPRINT ( " got lock, now check: % " PRId64 " +% " PRIu32 " =(% " PRId64 " ) < % " PRId64 " " , f - > lastts . tv_sec ,
timeout , f - > lastts . tv_sec + timeout , ts - > tv_sec ) ;
/** never prune a flow that is used by a packet or stream msg
* we are currently processing in one of the threads */
if ( f - > use_cnt > 0 ) {
/* do the timeout check */
if ( ( f - > lastts . tv_sec + timeout ) > = ts - > tv_sec ) {
mutex_unlock ( & f - > fb - > m ) ;
mutex_unlock ( & f - > m ) ;
return 0 ;
}
/* do the timeout check */
if ( ( f - > lastts . tv_sec + timeout ) > = ts - > tv_sec ) {
/** never prune a flow that is used by a packet or stream msg
* we are currently processing in one of the threads */
if ( f - > use_cnt > 0 ) {
printf ( " FlowPrune: timed out but use_cnt > 0: % " PRIu16 " , %p, proto % " PRIu8 " \n " , f - > use_cnt , f , f - > proto ) ;
mutex_unlock ( & f - > fb - > m ) ;
mutex_unlock ( & f - > m ) ;
return 0 ;
}
//printf("timed out %" PRIuMAX "+%" PRIu32 "=(%" PRIuMAX ") < %" PRIuMAX ": %p, proto %"PRIu8"\n", (uintmax_t)f->lastts.tv_sec,
// timeout, (uintmax_t)(f->lastts.tv_sec + timeout), (uintmax_t)ts->tv_sec, f, f->proto);
/* remove from the hash */
if ( f - > hprev )
f - > hprev - > hnext = f - > hnext ;
@ -189,7 +206,7 @@ static int FlowPrune (FlowQueue *q, struct timeval *ts)
mutex_unlock ( & f - > fb - > m ) ;
f - > fb = NULL ;
FlowClearMemory ( f , proto_ map) ;
FlowClearMemory ( f , f- > protomap) ;
/* move to spare list */
FlowRequeue ( f , q , & flow_spare_q ) ;
@ -396,20 +413,26 @@ void FlowInitConfig (char quiet)
* \ warning Not thread safe */
void FlowPrintQueueInfo ( void )
{
int i ;
printf ( " * Flow Queue info: \n " ) ;
printf ( " - SPARE % " PRIu32 " ( " , flow_spare_q . len ) ;
# ifdef DBG_PERF
printf ( " flow_spare_q.dbg_maxlen % " PRIu32 " ) \n " , flow_spare_q . dbg_maxlen ) ;
# endif
printf ( " - NEW % " PRIu32 " ( " , flow_new_q . len ) ;
for ( i = 0 ; i < FLOW_PROTO_MAX ; i + + ) {
printf ( " - NEW % " PRIu32 " ( " , flow_new_q [ i ] . len ) ;
# ifdef DBG_PERF
printf ( " flow_new_q.dbg_maxlen % " PRIu32 " ) \n " , flow_new_q . dbg_maxlen ) ;
printf ( " flow_new_q.dbg_maxlen % " PRIu32 " ) \n " , flow_new_q [ i ] . dbg_maxlen ) ;
# endif
printf ( " - ESTABLISHED % " PRIu32 " ( " , flow_est_q . len ) ;
printf ( " - ESTABLISHED % " PRIu32 " ( " , flow_est_q [ i ] . len ) ;
# ifdef DBG_PERF
printf ( " flow_est_q.dbg_maxlen % " PRIu32 " ) \n " , flow_est_q . dbg_maxlen ) ;
printf ( " flow_est_q.dbg_maxlen % " PRIu32 " ) \n " , flow_est_q [ i ] . dbg_maxlen ) ;
# endif
printf ( " - CLOSING % " PRIu32 " ( " , flow_close_q [ i ] . len ) ;
# ifdef DBG_PERF
printf ( " flow_closing_q.dbg_maxlen % " PRIu32 " ) \n " , flow_close_q [ i ] . dbg_maxlen ) ;
# endif
}
# ifdef FLOWBITS_STATS
printf ( " * Flowbits added: % " PRIu32 " , removed: % " PRIu32 " , " , flowbits_added , flowbits_removed ) ;
printf ( " max memory usage: % " PRIu32 " \n " , flowbits_memuse_max ) ;
@ -420,19 +443,27 @@ void FlowPrintQueueInfo (void)
* \ warning Not thread safe */
void FlowShutdown ( void ) {
Flow * f ;
int i ;
while ( ( f = FlowDequeue ( & flow_spare_q ) ) ) {
FlowFree ( f ) ;
}
while ( ( f = FlowDequeue ( & flow_new_q ) ) ) {
uint8_t proto_map = FlowGetProtoMapping ( f - > proto ) ;
FlowClearMemory ( f , proto_map ) ;
FlowFree ( f ) ;
}
while ( ( f = FlowDequeue ( & flow_est_q ) ) ) {
uint8_t proto_map = FlowGetProtoMapping ( f - > proto ) ;
FlowClearMemory ( f , proto_map ) ;
FlowFree ( f ) ;
for ( i = 0 ; i < FLOW_PROTO_MAX ; i + + ) {
while ( ( f = FlowDequeue ( & flow_new_q [ i ] ) ) ) {
uint8_t proto_map = FlowGetProtoMapping ( f - > proto ) ;
FlowClearMemory ( f , proto_map ) ;
FlowFree ( f ) ;
}
while ( ( f = FlowDequeue ( & flow_est_q [ i ] ) ) ) {
uint8_t proto_map = FlowGetProtoMapping ( f - > proto ) ;
FlowClearMemory ( f , proto_map ) ;
FlowFree ( f ) ;
}
while ( ( f = FlowDequeue ( & flow_close_q [ i ] ) ) ) {
uint8_t proto_map = FlowGetProtoMapping ( f - > proto ) ;
FlowClearMemory ( f , proto_map ) ;
FlowFree ( f ) ;
}
}
free ( flow_hash ) ;
@ -458,7 +489,7 @@ void *FlowManagerThread(void *td)
{
ThreadVars * th_v = ( ThreadVars * ) td ;
struct timeval ts ;
uint32_t established_cnt = 0 , new_cnt = 0 , nowcnt;
uint32_t established_cnt = 0 , new_cnt = 0 , closing_cnt = 0 , nowcnt;
uint32_t sleeping = 0 ;
uint8_t emerg = FALSE ;
@ -484,23 +515,30 @@ void *FlowManagerThread(void *td)
DEBUGPRINT ( " ts % " PRId64 " " , ts . tv_sec ) ;
/* see if we still have enough spare flows */
if ( ! ( FlowUpdateSpareFlows ( ) ) & & emerg = = TRUE ) {
/*timeout_new = flow_config.emerg_timeout_new;
timeout_est = flow_config . emerg_timeout_est ; */
}
/* prune new list */
nowcnt = FlowPruneFlows ( & flow_new_q , & ts ) ;
if ( nowcnt ) {
DEBUGPRINT ( " Pruned % " PRIu32 " new flows... \n " , nowcnt ) ;
new_cnt + = nowcnt ;
}
/* prune established list */
nowcnt = FlowPruneFlows ( & flow_est_q , & ts ) ;
if ( nowcnt ) {
DEBUGPRINT ( " Pruned % " PRIu32 " established flows... \n " , nowcnt ) ;
established_cnt + = nowcnt ;
FlowUpdateSpareFlows ( ) ;
int i ;
for ( i = 0 ; i < FLOW_PROTO_MAX ; i + + ) {
/* prune closing list */
nowcnt = FlowPruneFlows ( & flow_close_q [ i ] , & ts ) ;
if ( nowcnt ) {
DEBUGPRINT ( " Pruned % " PRIu32 " closing flows... \n " , nowcnt ) ;
closing_cnt + = nowcnt ;
}
/* prune new list */
nowcnt = FlowPruneFlows ( & flow_new_q [ i ] , & ts ) ;
if ( nowcnt ) {
DEBUGPRINT ( " Pruned % " PRIu32 " new flows... \n " , nowcnt ) ;
new_cnt + = nowcnt ;
}
/* prune established list */
nowcnt = FlowPruneFlows ( & flow_est_q [ i ] , & ts ) ;
if ( nowcnt ) {
DEBUGPRINT ( " Pruned % " PRIu32 " established flows... \n " , nowcnt ) ;
established_cnt + = nowcnt ;
}
}
sleeping = 0 ;
@ -589,26 +627,6 @@ void FlowInitFlowProto(void) {
flow_proto [ FLOW_PROTO_ICMP ] . GetProtoState = NULL ;
}
/**
* \ brief Function to map the protocol to the defined FLOW_PROTO_ * enumeration .
*
* \ param proto protocol which is needed to be mapped
*/
static int FlowGetProtoMapping ( uint8_t proto ) {
switch ( proto ) {
case IPPROTO_TCP :
return FLOW_PROTO_TCP ;
case IPPROTO_UDP :
return FLOW_PROTO_UDP ;
case IPPROTO_ICMP :
return FLOW_PROTO_ICMP ;
default :
return FLOW_PROTO_DEFAULT ;
}
}
/**
* \ brief Function clear the flow memory before queueing it to spare flow
* queue .
@ -620,11 +638,10 @@ static int FlowGetProtoMapping(uint8_t proto) {
static int FlowClearMemory ( Flow * f , uint8_t proto_map ) {
/* call the protocol specific free function if we have one */
if ( flow_proto [ proto_map ] . Freefunc ! = NULL ) {
flow_proto [ proto_map ] . Freefunc ( f - > stream ) ;
flow_proto [ proto_map ] . Freefunc ( f - > protoctx ) ;
}
f - > stream = NULL ;
f - > protoctx = NULL ;
//memset(f, 0, sizeof(Flow));
CLEAR_FLOW ( f ) ;
return 1 ;
}
@ -705,6 +722,9 @@ int FlowSetProtoEmergencyTimeout(uint8_t proto, uint32_t emerg_new_timeout, uint
return 1 ;
}
# ifdef UNITTESTS
# include "stream-tcp-private.h"
/**
* \ test Test the setting of the per protocol timeouts .
*
@ -813,7 +833,7 @@ static int FlowTestPrune(Flow *f, struct timeval *ts) {
return 0 ;
}
if ( f - > stream ! = NULL ) {
if ( f - > protoctx ! = NULL ) {
printf ( " Failed in freeing the TcpSession \n " ) ;
return 0 ;
}
@ -841,7 +861,7 @@ static int FlowTest03 (void) {
TimeGet ( & ts ) ;
f . lastts . tv_sec = ts . tv_sec - 5000 ;
f . stream = & ssn ;
f . protoctx = & ssn ;
f . fb = & fb ;
f . proto = IPPROTO_TCP ;
@ -885,7 +905,7 @@ static int FlowTest04 (void) {
ssn . server = client ;
ssn . state = TCP_ESTABLISHED ;
f . lastts . tv_sec = ts . tv_sec - 5000 ;
f . stream = & ssn ;
f . protoctx = & ssn ;
f . fb = & fb ;
f . proto = IPPROTO_TCP ;
@ -918,7 +938,7 @@ static int FlowTest05 (void) {
TimeGet ( & ts ) ;
ssn . state = TCP_SYN_SENT ;
f . lastts . tv_sec = ts . tv_sec - 300 ;
f . stream = & ssn ;
f . protoctx = & ssn ;
f . fb = & fb ;
f . proto = IPPROTO_TCP ;
f . flags = FLOW_EMERGENCY ;
@ -963,7 +983,7 @@ static int FlowTest06 (void) {
ssn . server = client ;
ssn . state = TCP_ESTABLISHED ;
f . lastts . tv_sec = ts . tv_sec - 5000 ;
f . stream = & ssn ;
f . protoctx = & ssn ;
f . fb = & fb ;
f . proto = IPPROTO_TCP ;
f . flags = FLOW_EMERGENCY ;
@ -974,15 +994,18 @@ static int FlowTest06 (void) {
return 1 ;
}
# endif /* UNITTESTS */
/**
* \ brief Function to register the Flow Unitests .
*/
void FlowRegisterTests ( void ) {
# ifdef UNITTESTS
UtRegisterTest ( " FlowTest01 -- Protocol Specific Timeouts " , FlowTest01 , 1 ) ;
UtRegisterTest ( " FlowTest02 -- Setting Protocol Specific Free Function " , FlowTest02 , 1 ) ;
UtRegisterTest ( " FlowTest03 -- Timeout a flow having fresh TcpSession " , FlowTest03 , 1 ) ;
UtRegisterTest ( " FlowTest04 -- Timeout a flow having TcpSession with segments " , FlowTest04 , 1 ) ;
UtRegisterTest ( " FlowTest05 -- Timeout a flow in emergency having fresh TcpSession " , FlowTest05 , 1 ) ;
UtRegisterTest ( " FlowTest06 -- Timeout a flow in emergency having TcpSession with segments " , FlowTest06 , 1 ) ;
# endif /* UNITTESTS */
}