diff --git a/src/flow-hash.c b/src/flow-hash.c index 12b21bb8e3..3fb38dd846 100644 --- a/src/flow-hash.c +++ b/src/flow-hash.c @@ -19,6 +19,7 @@ * \file * * \author Victor Julien + * \author Pablo Rincon Crespo * * Flow Hashing functions. */ @@ -281,6 +282,77 @@ static inline int FlowCreateCheck(Packet *p) { return 1; } +/** + * \brief Get a new flow + * + * Get a new flow. We're checking memcap first and will try to make room + * if the memcap is reached. + * + * \retval f *LOCKED* flow on succes, NULL on error. + */ +static Flow *FlowGetNew(Packet *p) { + Flow *f = NULL; + + if (FlowCreateCheck(p) == 0) { + return NULL; + } + + /* no, so get a new one */ + f = FlowDequeue(&flow_spare_q); + if (f == NULL) { + /* If we reached the max memcap, try to clean some flows: + * 1- first by normal timeouts + * 2- by emergency mode timeouts + * 3- by last time seen + */ + if (flow_memuse + sizeof(Flow) > flow_config.memcap) { + uint32_t not_released = 0; + + SCLogDebug("We need to prune some flows(1)"); + + /* Ok, then try to release flow_try_release flows */ + not_released = FlowPruneFlowsCnt(&p->ts, flow_config.flow_try_release); + if (not_released == (uint32_t)flow_config.flow_try_release) { + /* This means that none of the flows was released, so try again + * with more agressive timeout values (emergency mode) */ + + if ( !(flow_flags & FLOW_EMERGENCY)) { + SCLogWarning(SC_WARN_FLOW_EMERGENCY, "Warning, engine " + "running with FLOW_EMERGENCY bit set " + "(ts.tv_sec: %"PRIuMAX", ts.tv_usec:%"PRIuMAX")", + (uintmax_t)p->ts.tv_sec, (uintmax_t)p->ts.tv_usec); + flow_flags |= FLOW_EMERGENCY; /* XXX mutex this */ + } + SCLogDebug("We need to prune some flows with emerg bit (2)"); + + not_released = FlowPruneFlowsCnt(&p->ts, FLOW_DEFAULT_FLOW_PRUNE); + if (not_released == (uint32_t)flow_config.flow_try_release) { + /* Here the engine is on a real stress situation + * Try to kill the last time seen "flow_try_release" flows + * directly, ignoring timeouts */ + SCLogDebug("We need to KILL some flows (3)"); + not_released = FlowKillFlowsCnt(FLOW_DEFAULT_FLOW_PRUNE); + if (not_released == (uint32_t)flow_config.flow_try_release) { + return NULL; + } + } + } + } + + /* now see if we can alloc a new flow */ + f = FlowAllocDirect(); + if (f == NULL) { + return NULL; + } + + /* f is locked */ + } else { + SCMutexLock(&f->m); + } + + return f; +} + /* FlowGetFlowFromHash * * Hash retrieval function for flows. Looks up the hash bucket containing the @@ -297,8 +369,6 @@ Flow *FlowGetFlowFromHash (Packet *p) { Flow *f = NULL; FlowHashCountInit; - struct timeval ts; - uint32_t not_released = 0; /* get the key to our bucket */ uint32_t key = FlowGetKey(p); @@ -312,70 +382,20 @@ Flow *FlowGetFlowFromHash (Packet *p) /* see if the bucket already has a flow */ if (fb->f == NULL) { - if (FlowCreateCheck(p) == 0) { + f = fb->f = FlowGetNew(p); + if (f == NULL) { SCSpinUnlock(&fb->s); FlowHashCountUpdate; return NULL; } - /* no, so get a new one */ - f = fb->f = FlowDequeue(&flow_spare_q); - if (f == NULL) { - f = fb->f = FlowAllocDirect(); - if (f == NULL) { - SCSpinUnlock(&fb->s); - FlowHashCountUpdate; - return NULL; - } - - /* If we reached the max memcap, try to clean some flows: - * 1- first by normal timeouts - * 2- by emergency mode timeouts - * 3- by last time seen - */ - if (flow_memuse + sizeof(Flow) > flow_config.memcap) { - /* Get the time */ - memset(&ts, 0, sizeof(ts)); - TimeGet(&ts); - - SCLogDebug("We need to prune some flows(1)"); - /* Ok, then try to release flow_try_release flows */ - not_released = FlowPruneFlowsCnt(&ts, flow_config.flow_try_release); - if (not_released == (uint32_t)flow_config.flow_try_release) { - /* This means that none of the flows was released, so try again - * with more agressive timeout values (emergency mode) */ - - if ( !(flow_flags & FLOW_EMERGENCY)) { - SCLogWarning(SC_WARN_FLOW_EMERGENCY, "Warning, engine " - "running with FLOW_EMERGENCY bit set " - "(ts.tv_sec: %"PRIuMAX", ts.tv_usec:%"PRIuMAX")", - (uintmax_t)ts.tv_sec, (uintmax_t)ts.tv_usec); - flow_flags |= FLOW_EMERGENCY; /* XXX mutex this */ - } - SCLogDebug("We need to prune some flows with emerg bit (2)"); + /* flow is locked */ - not_released = FlowPruneFlowsCnt(&ts, FLOW_DEFAULT_FLOW_PRUNE); - if (not_released == (uint32_t)flow_config.flow_try_release) { - /* Here the engine is on a real stress situation - * Try to kill the last time seen "flow_try_release" flows - * directly, ignoring timeouts */ - SCLogDebug("We need to KILL some flows (3)"); - not_released = FlowKillFlowsCnt(FLOW_DEFAULT_FLOW_PRUNE); - if (not_released == (uint32_t)flow_config.flow_try_release) { - SCSpinUnlock(&fb->s); - FlowHashCountUpdate; - return NULL; - } - } - } - } - } /* these are protected by the bucket lock */ f->hnext = NULL; f->hprev = NULL; /* got one, now lock, initialize and return */ - SCMutexLock(&f->m); FlowInit(f,p); FlowRequeue(f, NULL, &flow_new_q[f->protomap], 1); f->flags |= FLOW_NEW_LIST; @@ -388,83 +408,31 @@ Flow *FlowGetFlowFromHash (Packet *p) /* ok, we have a flow in the bucket. Let's find out if it is our flow */ f = fb->f; - /* lock the 'root' flow */ - SCMutexLock(&f->m); /* see if this is the flow we are looking for */ if (FlowCompare(f, p) == 0) { Flow *pf = NULL; /* previous flow */ - SCMutexUnlock(&f->m); while (f) { FlowHashCountIncr; - pf = f; /* pf is not locked at this point */ + pf = f; f = f->hnext; if (f == NULL) { - if (FlowCreateCheck(p) == 0) { + f = pf->hnext = FlowGetNew(p); + if (f == NULL) { SCSpinUnlock(&fb->s); FlowHashCountUpdate; return NULL; } - /* get us a new one and put it and the list tail */ - f = pf->hnext = FlowDequeue(&flow_spare_q); - if (f == NULL) { - - f = fb->f = FlowAllocDirect(); - if (f == NULL) { - SCSpinUnlock(&fb->s); - FlowHashCountUpdate; - return NULL; - } - - /* If we reached the max memcap, try to clean some flows: - * 1- first by normal timeouts - * 2- by emergency mode timeouts - * 3- by last time seen - */ - if (flow_memuse + sizeof(Flow) > flow_config.memcap) { - /* Get the time */ - memset(&ts, 0, sizeof(ts)); - TimeGet(&ts); - //SCLogDebug("ts %" PRIdMAX "", (intmax_t)ts.tv_sec); - /* Ok, then try to release flow_try_release flows */ - SCLogDebug("We need to prune some flows(1)"); - not_released = FlowPruneFlowsCnt(&ts, flow_config.flow_try_release); - if (not_released == (uint32_t)flow_config.flow_try_release) { - /* This means that none of the flows was released, so try again - * with more agressive timeout values (emergency mode) */ - if ( !(flow_flags & FLOW_EMERGENCY)) { - SCLogWarning(SC_WARN_FLOW_EMERGENCY, "Warning, engine running with FLOW_EMERGENCY bit set (ts.tv_sec: %"PRIuMAX", ts.tv_usec:%"PRIuMAX")", (uintmax_t)ts.tv_sec, (uintmax_t)ts.tv_usec); - flow_flags |= FLOW_EMERGENCY; /* XXX mutex this */ - } - SCLogDebug("We need to prune some flows with emerg bit (2)"); - - not_released = FlowPruneFlowsCnt(&ts, FLOW_DEFAULT_FLOW_PRUNE); - if (not_released == (uint32_t)flow_config.flow_try_release) { - /* Here the engine is on a real stress situation - * Try to kill the last time seen "flow_try_release" flows - * directly, ignoring timeouts */ - SCLogDebug("We need to KILL some flows (3)"); - not_released = FlowKillFlowsCnt(FLOW_DEFAULT_FLOW_PRUNE); - if (not_released == (uint32_t)flow_config.flow_try_release) { - SCSpinUnlock(&fb->s); - FlowHashCountUpdate; - return NULL; - } - } - } - } - - } + /* flow is locked */ f->hnext = NULL; f->hprev = pf; - /* lock, initialize and return */ - SCMutexLock(&f->m); + /* initialize and return */ FlowInit(f,p); FlowRequeue(f, NULL, &flow_new_q[f->protomap], 1); @@ -476,8 +444,6 @@ Flow *FlowGetFlowFromHash (Packet *p) return f; } - SCMutexLock(&f->m); - if (FlowCompare(f, p) != 0) { /* we found our flow, lets put it on top of the * hash list -- this rewards active flows */ @@ -490,20 +456,19 @@ Flow *FlowGetFlowFromHash (Packet *p) fb->f = f; /* found our flow */ + SCMutexLock(&f->m); SCSpinUnlock(&fb->s); FlowHashCountUpdate; return f; } - - /* not found, try the next... */ - SCMutexUnlock(&f->m); } } - /* The 'root' flow was our flow, return it. - * It's already locked. */ - SCSpinUnlock(&fb->s); + if (f != NULL) { + SCMutexLock(&f->m); + } + SCSpinUnlock(&fb->s); FlowHashCountUpdate; return f; } diff --git a/src/flow-util.c b/src/flow-util.c index f8d2d47f54..a4cf654d39 100644 --- a/src/flow-util.c +++ b/src/flow-util.c @@ -56,7 +56,7 @@ Flow *FlowAllocDirect(void) flow_memuse += sizeof(Flow); SCMutexUnlock(&flow_memuse_mutex); - FlowPrintQueueInfo(); + //FlowPrintQueueInfo(); SCMutexInit(&f->m, NULL); f->lnext = NULL; @@ -138,6 +138,7 @@ void FlowInit(Flow *f, Packet *p) SCEnter(); SCLogDebug("flow %p", f); + f->de_state = NULL; CLEAR_FLOW(f); f->proto = p->proto; diff --git a/src/flow.h b/src/flow.h index 322c89fd12..7726365513 100644 --- a/src/flow.h +++ b/src/flow.h @@ -98,8 +98,28 @@ typedef struct FlowKey_ } FlowKey; +/** + * \brief Flow data structure. + * + * The flow is a global data structure that is created for new packets of a + * flow and then looked up for the following packets of a flow. + * + * Locking + * + * The flow is updated/used by multiple packets at the same time. This is why + * there is a flow-mutex. It's a mutex and not a spinlock because some + * operations on the flow can be quite expensive, thus spinning would be + * too expensive. + * + * The flow "header" (addresses, ports, proto, recursion level) are static + * after the initialization and remain read-only throughout the entire live + * of a flow. This is why we can access those without protection of the lock. + */ + typedef struct Flow_ { + /* flow "header", used for hashing and flow lookup. Static after init, + * so safe to look at without lock */ Address src, dst; union { Port sp; /**< tcp/udp source port */ @@ -112,6 +132,8 @@ typedef struct Flow_ uint8_t proto; uint8_t recursion_level; + /* end of flow "header" */ + uint16_t flags; /* ts of flow init and last update */