modify post_pq packet handling.

- Lock the q just once, once we have detected the presence of packet(s)
  in the queue.  Unlock it when we consume all packets from the q.
remotes/origin/master-1.1.x
Anoop Saldanha 14 years ago committed by Victor Julien
parent b4887943fb
commit f2bcf9ea2c

@ -161,12 +161,14 @@ void *TmThreadsSlot1NoIn(void *td)
tv->tmqh_out(tv, p); tv->tmqh_out(tv, p);
/* handle post queue */ /* handle post queue */
while (s->slot_post_pq.top != NULL) { if (s->slot_post_pq.top != NULL) {
SCMutexLock(&s->slot_post_pq.mutex_q); SCMutexLock(&s->slot_post_pq.mutex_q);
Packet *extra_p = PacketDequeue(&s->slot_post_pq); while (s->slot_post_pq.top != NULL) {
Packet *extra_p = PacketDequeue(&s->slot_post_pq);
if (extra_p != NULL)
tv->tmqh_out(tv, extra_p);
}
SCMutexUnlock(&s->slot_post_pq.mutex_q); SCMutexUnlock(&s->slot_post_pq.mutex_q);
if (extra_p != NULL)
tv->tmqh_out(tv, extra_p);
} }
if (TmThreadsCheckFlag(tv, THV_KILL)) { if (TmThreadsCheckFlag(tv, THV_KILL)) {
@ -404,14 +406,16 @@ void *TmThreadsSlot1(void *td)
/* output the packet */ /* output the packet */
tv->tmqh_out(tv, p); tv->tmqh_out(tv, p);
} }
while (s->slot_post_pq.top != NULL) { if (s->slot_post_pq.top != NULL) {
/* handle new packets from this func */
SCMutexLock(&s->slot_post_pq.mutex_q); SCMutexLock(&s->slot_post_pq.mutex_q);
Packet *extra_p = PacketDequeue(&s->slot_post_pq); while (s->slot_post_pq.top != NULL) {
SCMutexUnlock(&s->slot_post_pq.mutex_q); /* handle new packets from this func */
if (extra_p != NULL) { Packet *extra_p = PacketDequeue(&s->slot_post_pq);
tv->tmqh_out(tv, extra_p); if (extra_p != NULL) {
tv->tmqh_out(tv, extra_p);
}
} }
SCMutexUnlock(&s->slot_post_pq.mutex_q);
} }
if (TmThreadsCheckFlag(tv, THV_KILL)) { if (TmThreadsCheckFlag(tv, THV_KILL)) {
@ -672,25 +676,28 @@ void *TmThreadsSlotVar(void *td)
/* now handle the post_pq packets */ /* now handle the post_pq packets */
TmSlot *slot; TmSlot *slot;
for (slot = s; slot != NULL; slot = slot->slot_next) { for (slot = s; slot != NULL; slot = slot->slot_next) {
while (slot->slot_post_pq.top != NULL) { if (slot->slot_post_pq.top != NULL) {
SCMutexLock(&slot->slot_post_pq.mutex_q); SCMutexLock(&slot->slot_post_pq.mutex_q);
Packet *extra_p = PacketDequeue(&slot->slot_post_pq); while (slot->slot_post_pq.top != NULL) {
SCMutexUnlock(&slot->slot_post_pq.mutex_q); Packet *extra_p = PacketDequeue(&slot->slot_post_pq);
if (extra_p == NULL) if (extra_p == NULL)
break;
if (slot->slot_next != NULL) {
r = TmThreadsSlotVarRun(tv, extra_p, slot->slot_next);
if (r == TM_ECODE_FAILED) {
TmqhOutputPacketpool(tv, extra_p);
TmThreadsSetFlag(tv, THV_FAILED);
break; break;
if (slot->slot_next != NULL) {
r = TmThreadsSlotVarRun(tv, extra_p, slot->slot_next);
if (r == TM_ECODE_FAILED) {
TmqhReleasePacketsToPacketPool(&slot->slot_post_pq);
TmqhOutputPacketpool(tv, extra_p);
TmThreadsSetFlag(tv, THV_FAILED);
break;
}
} }
} /* output the packet */
/* output the packet */ tv->tmqh_out(tv, extra_p);
tv->tmqh_out(tv, extra_p); } /* while */
} /* while (slot->slot_post_pq.top != NULL) */ SCMutexUnlock(&slot->slot_post_pq.mutex_q);
} /* for (slot = s; slot != NULL; slot = slot->slot_next) */ } /* if */
} /* for */
if (TmThreadsCheckFlag(tv, THV_KILL)) { if (TmThreadsCheckFlag(tv, THV_KILL)) {
run = 0; run = 0;

@ -152,25 +152,27 @@ static inline TmEcode TmThreadsSlotProcessPkt(ThreadVars *tv, TmSlot *s, Packet
/* post process pq */ /* post process pq */
TmSlot *slot = s; TmSlot *slot = s;
while (slot != NULL) { while (slot != NULL) {
while (slot->slot_post_pq.top != NULL) { if (slot->slot_post_pq.top != NULL) {
SCMutexLock(&slot->slot_post_pq.mutex_q); SCMutexLock(&slot->slot_post_pq.mutex_q);
Packet *extra_p = PacketDequeue(&slot->slot_post_pq); while (slot->slot_post_pq.top != NULL) {
SCMutexUnlock(&slot->slot_post_pq.mutex_q); Packet *extra_p = PacketDequeue(&slot->slot_post_pq);
if (extra_p != NULL) { if (extra_p != NULL) {
if (slot->slot_next != NULL) { if (slot->slot_next != NULL) {
r = TmThreadsSlotVarRun(tv, extra_p, slot->slot_next); r = TmThreadsSlotVarRun(tv, extra_p, slot->slot_next);
if (r == TM_ECODE_FAILED) { if (r == TM_ECODE_FAILED) {
TmqhReleasePacketsToPacketPool(&slot->slot_post_pq); TmqhReleasePacketsToPacketPool(&slot->slot_post_pq);
TmqhOutputPacketpool(tv, extra_p); TmqhOutputPacketpool(tv, extra_p);
TmThreadsSetFlag(tv, THV_FAILED); TmThreadsSetFlag(tv, THV_FAILED);
break; break;
}
} }
tv->tmqh_out(tv, extra_p);
} }
tv->tmqh_out(tv, extra_p); } /* while (slot->slot_post_pq.top != NULL) */
} SCMutexUnlock(&slot->slot_post_pq.mutex_q);
} } /* if (slot->slot_post_pq.top != NULL) */
slot = slot->slot_next; slot = slot->slot_next;
} } /* while (slot != NULL) */
} }
return r; return r;

Loading…
Cancel
Save