Exclude parts of a flow that are not changing after init from the flow mutex. Cleanup flow-hash function.

remotes/origin/master-1.0.x
Victor Julien 16 years ago
parent 4775f67ba1
commit 32e3fea9e6

@ -19,6 +19,7 @@
* \file
*
* \author Victor Julien <victor@inliniac.net>
* \author Pablo Rincon Crespo <pablo.rincon.crespo@gmail.com>
*
* Flow Hashing functions.
*/
@ -281,6 +282,77 @@ static inline int FlowCreateCheck(Packet *p) {
return 1;
}
/**
* \brief Get a new flow
*
* Get a new flow. We're checking memcap first and will try to make room
* if the memcap is reached.
*
* \retval f *LOCKED* flow on succes, NULL on error.
*/
static Flow *FlowGetNew(Packet *p) {
Flow *f = NULL;
if (FlowCreateCheck(p) == 0) {
return NULL;
}
/* no, so get a new one */
f = FlowDequeue(&flow_spare_q);
if (f == NULL) {
/* If we reached the max memcap, try to clean some flows:
* 1- first by normal timeouts
* 2- by emergency mode timeouts
* 3- by last time seen
*/
if (flow_memuse + sizeof(Flow) > flow_config.memcap) {
uint32_t not_released = 0;
SCLogDebug("We need to prune some flows(1)");
/* Ok, then try to release flow_try_release flows */
not_released = FlowPruneFlowsCnt(&p->ts, flow_config.flow_try_release);
if (not_released == (uint32_t)flow_config.flow_try_release) {
/* This means that none of the flows was released, so try again
* with more agressive timeout values (emergency mode) */
if ( !(flow_flags & FLOW_EMERGENCY)) {
SCLogWarning(SC_WARN_FLOW_EMERGENCY, "Warning, engine "
"running with FLOW_EMERGENCY bit set "
"(ts.tv_sec: %"PRIuMAX", ts.tv_usec:%"PRIuMAX")",
(uintmax_t)p->ts.tv_sec, (uintmax_t)p->ts.tv_usec);
flow_flags |= FLOW_EMERGENCY; /* XXX mutex this */
}
SCLogDebug("We need to prune some flows with emerg bit (2)");
not_released = FlowPruneFlowsCnt(&p->ts, FLOW_DEFAULT_FLOW_PRUNE);
if (not_released == (uint32_t)flow_config.flow_try_release) {
/* Here the engine is on a real stress situation
* Try to kill the last time seen "flow_try_release" flows
* directly, ignoring timeouts */
SCLogDebug("We need to KILL some flows (3)");
not_released = FlowKillFlowsCnt(FLOW_DEFAULT_FLOW_PRUNE);
if (not_released == (uint32_t)flow_config.flow_try_release) {
return NULL;
}
}
}
}
/* now see if we can alloc a new flow */
f = FlowAllocDirect();
if (f == NULL) {
return NULL;
}
/* f is locked */
} else {
SCMutexLock(&f->m);
}
return f;
}
/* FlowGetFlowFromHash
*
* Hash retrieval function for flows. Looks up the hash bucket containing the
@ -297,8 +369,6 @@ Flow *FlowGetFlowFromHash (Packet *p)
{
Flow *f = NULL;
FlowHashCountInit;
struct timeval ts;
uint32_t not_released = 0;
/* get the key to our bucket */
uint32_t key = FlowGetKey(p);
@ -312,70 +382,20 @@ Flow *FlowGetFlowFromHash (Packet *p)
/* see if the bucket already has a flow */
if (fb->f == NULL) {
if (FlowCreateCheck(p) == 0) {
f = fb->f = FlowGetNew(p);
if (f == NULL) {
SCSpinUnlock(&fb->s);
FlowHashCountUpdate;
return NULL;
}
/* no, so get a new one */
f = fb->f = FlowDequeue(&flow_spare_q);
if (f == NULL) {
f = fb->f = FlowAllocDirect();
if (f == NULL) {
SCSpinUnlock(&fb->s);
FlowHashCountUpdate;
return NULL;
}
/* If we reached the max memcap, try to clean some flows:
* 1- first by normal timeouts
* 2- by emergency mode timeouts
* 3- by last time seen
*/
if (flow_memuse + sizeof(Flow) > flow_config.memcap) {
/* Get the time */
memset(&ts, 0, sizeof(ts));
TimeGet(&ts);
SCLogDebug("We need to prune some flows(1)");
/* Ok, then try to release flow_try_release flows */
not_released = FlowPruneFlowsCnt(&ts, flow_config.flow_try_release);
if (not_released == (uint32_t)flow_config.flow_try_release) {
/* This means that none of the flows was released, so try again
* with more agressive timeout values (emergency mode) */
if ( !(flow_flags & FLOW_EMERGENCY)) {
SCLogWarning(SC_WARN_FLOW_EMERGENCY, "Warning, engine "
"running with FLOW_EMERGENCY bit set "
"(ts.tv_sec: %"PRIuMAX", ts.tv_usec:%"PRIuMAX")",
(uintmax_t)ts.tv_sec, (uintmax_t)ts.tv_usec);
flow_flags |= FLOW_EMERGENCY; /* XXX mutex this */
}
SCLogDebug("We need to prune some flows with emerg bit (2)");
/* flow is locked */
not_released = FlowPruneFlowsCnt(&ts, FLOW_DEFAULT_FLOW_PRUNE);
if (not_released == (uint32_t)flow_config.flow_try_release) {
/* Here the engine is on a real stress situation
* Try to kill the last time seen "flow_try_release" flows
* directly, ignoring timeouts */
SCLogDebug("We need to KILL some flows (3)");
not_released = FlowKillFlowsCnt(FLOW_DEFAULT_FLOW_PRUNE);
if (not_released == (uint32_t)flow_config.flow_try_release) {
SCSpinUnlock(&fb->s);
FlowHashCountUpdate;
return NULL;
}
}
}
}
}
/* these are protected by the bucket lock */
f->hnext = NULL;
f->hprev = NULL;
/* got one, now lock, initialize and return */
SCMutexLock(&f->m);
FlowInit(f,p);
FlowRequeue(f, NULL, &flow_new_q[f->protomap], 1);
f->flags |= FLOW_NEW_LIST;
@ -388,83 +408,31 @@ Flow *FlowGetFlowFromHash (Packet *p)
/* ok, we have a flow in the bucket. Let's find out if it is our flow */
f = fb->f;
/* lock the 'root' flow */
SCMutexLock(&f->m);
/* see if this is the flow we are looking for */
if (FlowCompare(f, p) == 0) {
Flow *pf = NULL; /* previous flow */
SCMutexUnlock(&f->m);
while (f) {
FlowHashCountIncr;
pf = f; /* pf is not locked at this point */
pf = f;
f = f->hnext;
if (f == NULL) {
if (FlowCreateCheck(p) == 0) {
f = pf->hnext = FlowGetNew(p);
if (f == NULL) {
SCSpinUnlock(&fb->s);
FlowHashCountUpdate;
return NULL;
}
/* get us a new one and put it and the list tail */
f = pf->hnext = FlowDequeue(&flow_spare_q);
if (f == NULL) {
f = fb->f = FlowAllocDirect();
if (f == NULL) {
SCSpinUnlock(&fb->s);
FlowHashCountUpdate;
return NULL;
}
/* If we reached the max memcap, try to clean some flows:
* 1- first by normal timeouts
* 2- by emergency mode timeouts
* 3- by last time seen
*/
if (flow_memuse + sizeof(Flow) > flow_config.memcap) {
/* Get the time */
memset(&ts, 0, sizeof(ts));
TimeGet(&ts);
//SCLogDebug("ts %" PRIdMAX "", (intmax_t)ts.tv_sec);
/* Ok, then try to release flow_try_release flows */
SCLogDebug("We need to prune some flows(1)");
not_released = FlowPruneFlowsCnt(&ts, flow_config.flow_try_release);
if (not_released == (uint32_t)flow_config.flow_try_release) {
/* This means that none of the flows was released, so try again
* with more agressive timeout values (emergency mode) */
if ( !(flow_flags & FLOW_EMERGENCY)) {
SCLogWarning(SC_WARN_FLOW_EMERGENCY, "Warning, engine running with FLOW_EMERGENCY bit set (ts.tv_sec: %"PRIuMAX", ts.tv_usec:%"PRIuMAX")", (uintmax_t)ts.tv_sec, (uintmax_t)ts.tv_usec);
flow_flags |= FLOW_EMERGENCY; /* XXX mutex this */
}
SCLogDebug("We need to prune some flows with emerg bit (2)");
not_released = FlowPruneFlowsCnt(&ts, FLOW_DEFAULT_FLOW_PRUNE);
if (not_released == (uint32_t)flow_config.flow_try_release) {
/* Here the engine is on a real stress situation
* Try to kill the last time seen "flow_try_release" flows
* directly, ignoring timeouts */
SCLogDebug("We need to KILL some flows (3)");
not_released = FlowKillFlowsCnt(FLOW_DEFAULT_FLOW_PRUNE);
if (not_released == (uint32_t)flow_config.flow_try_release) {
SCSpinUnlock(&fb->s);
FlowHashCountUpdate;
return NULL;
}
}
}
}
}
/* flow is locked */
f->hnext = NULL;
f->hprev = pf;
/* lock, initialize and return */
SCMutexLock(&f->m);
/* initialize and return */
FlowInit(f,p);
FlowRequeue(f, NULL, &flow_new_q[f->protomap], 1);
@ -476,8 +444,6 @@ Flow *FlowGetFlowFromHash (Packet *p)
return f;
}
SCMutexLock(&f->m);
if (FlowCompare(f, p) != 0) {
/* we found our flow, lets put it on top of the
* hash list -- this rewards active flows */
@ -490,20 +456,19 @@ Flow *FlowGetFlowFromHash (Packet *p)
fb->f = f;
/* found our flow */
SCMutexLock(&f->m);
SCSpinUnlock(&fb->s);
FlowHashCountUpdate;
return f;
}
/* not found, try the next... */
SCMutexUnlock(&f->m);
}
}
/* The 'root' flow was our flow, return it.
* It's already locked. */
SCSpinUnlock(&fb->s);
if (f != NULL) {
SCMutexLock(&f->m);
}
SCSpinUnlock(&fb->s);
FlowHashCountUpdate;
return f;
}

@ -56,7 +56,7 @@ Flow *FlowAllocDirect(void)
flow_memuse += sizeof(Flow);
SCMutexUnlock(&flow_memuse_mutex);
FlowPrintQueueInfo();
//FlowPrintQueueInfo();
SCMutexInit(&f->m, NULL);
f->lnext = NULL;
@ -138,6 +138,7 @@ void FlowInit(Flow *f, Packet *p)
SCEnter();
SCLogDebug("flow %p", f);
f->de_state = NULL;
CLEAR_FLOW(f);
f->proto = p->proto;

@ -98,8 +98,28 @@ typedef struct FlowKey_
} FlowKey;
/**
* \brief Flow data structure.
*
* The flow is a global data structure that is created for new packets of a
* flow and then looked up for the following packets of a flow.
*
* Locking
*
* The flow is updated/used by multiple packets at the same time. This is why
* there is a flow-mutex. It's a mutex and not a spinlock because some
* operations on the flow can be quite expensive, thus spinning would be
* too expensive.
*
* The flow "header" (addresses, ports, proto, recursion level) are static
* after the initialization and remain read-only throughout the entire live
* of a flow. This is why we can access those without protection of the lock.
*/
typedef struct Flow_
{
/* flow "header", used for hashing and flow lookup. Static after init,
* so safe to look at without lock */
Address src, dst;
union {
Port sp; /**< tcp/udp source port */
@ -112,6 +132,8 @@ typedef struct Flow_
uint8_t proto;
uint8_t recursion_level;
/* end of flow "header" */
uint16_t flags;
/* ts of flow init and last update */

Loading…
Cancel
Save