flow: fix and simplify locking

Since:

9551cd0535 ("threading: don't pass locked flow between threads")

`MoveToWorkQueue()` unconditionally unlocks the flow. This allows simpler
locking handling, including of tcp reuse flows.

The simpler logic also fixes a scenario where TCP reuse flows got "unlocked"
twice, once in `FlowGetFlowFromHash()` and once in `MoveToWorkQueue()`.

Bug: #5248.
Coverity: 1494354.
pull/7225/head
Victor Julien 3 years ago
parent 7eb279ac53
commit 57533d3e47

@ -641,8 +641,7 @@ static Flow *TcpReuseReplace(ThreadVars *tv, FlowLookupStruct *fls,
/* get some settings that we move over to the new flow */
FlowThreadId thread_id[2] = { old_f->thread_id[0], old_f->thread_id[1] };
/* since fb lock is still held this flow won't be found until we are done */
FLOWLOCK_UNLOCK(old_f);
/* flow is unlocked by caller */
/* Get a new flow. It will be either a locked flow or NULL */
Flow *f = FlowGetNew(tv, fls, p);
@ -650,8 +649,6 @@ static Flow *TcpReuseReplace(ThreadVars *tv, FlowLookupStruct *fls,
return NULL;
}
/* flow is locked */
/* put at the start of the list */
f->next = fb->head;
fb->head = f;
@ -694,7 +691,6 @@ static inline void MoveToWorkQueue(ThreadVars *tv, FlowLookupStruct *fls,
f->fb = NULL;
f->next = NULL;
FlowQueuePrivateAppendFlow(&fls->work_queue, f);
FLOWLOCK_UNLOCK(f);
} else {
/* implied: TCP but our thread does not own it. So set it
* aside for the Flow Manager to pick it up. */
@ -703,7 +699,6 @@ static inline void MoveToWorkQueue(ThreadVars *tv, FlowLookupStruct *fls,
if (SC_ATOMIC_GET(f->fb->next_ts) != 0) {
SC_ATOMIC_SET(f->fb->next_ts, 0);
}
FLOWLOCK_UNLOCK(f);
}
}
@ -798,10 +793,10 @@ Flow *FlowGetFlowFromHash(ThreadVars *tv, FlowLookupStruct *fls,
(fb_nextts < (uint32_t)p->ts.tv_sec && FlowIsTimedOut(f, (uint32_t)p->ts.tv_sec, emerg));
if (timedout) {
FromHashLockTO(f);//FLOWLOCK_WRLOCK(f);
if (f->use_cnt == 0) {
if (likely(f->use_cnt == 0)) {
next_f = f->next;
MoveToWorkQueue(tv, fls, fb, f, prev_f);
/* flow stays locked, ownership xfer'd to MoveToWorkQueue */
FLOWLOCK_UNLOCK(f);
goto flow_removed;
}
FLOWLOCK_UNLOCK(f);
@ -810,11 +805,13 @@ Flow *FlowGetFlowFromHash(ThreadVars *tv, FlowLookupStruct *fls,
/* found a matching flow that is not timed out */
if (unlikely(TcpSessionPacketSsnReuse(p, f, f->protoctx) == 1)) {
Flow *new_f = TcpReuseReplace(tv, fls, fb, f, hash, p);
if (f->use_cnt == 0) {
if (likely(f->use_cnt == 0)) {
if (prev_f == NULL) /* if we have no prev it means new_f is now our prev */
prev_f = new_f;
MoveToWorkQueue(tv, fls, fb, f, prev_f); /* evict old flow */
}
FLOWLOCK_UNLOCK(f); /* unlock old replaced flow */
if (new_f == NULL) {
FBLOCK_UNLOCK(fb);
return NULL;

Loading…
Cancel
Save