flow timeout: cleanup

Remove now unused old flow timeout code.
pull/1272/head
Victor Julien 11 years ago
parent de4bda14e6
commit 6e69b51123

@ -714,7 +714,6 @@ void FlowManagerThreadSpawn()
flowmgr_number = (uint32_t)setting;
SCLogInfo("using %u flow manager threads", flowmgr_number);
FlowForceReassemblySetup(g_detect_disabled);
SCCtrlCondInit(&flow_manager_ctrl_cond, NULL);
SCCtrlMutexInit(&flow_manager_ctrl_mutex, NULL);

@ -61,40 +61,6 @@
#include "app-layer.h"
#include "util-profiling.h"
#if 0
static TmSlot *stream_pseudo_pkt_stream_tm_slot = NULL;
static ThreadVars *stream_pseudo_pkt_stream_TV = NULL;
static TmSlot *stream_pseudo_pkt_detect_tm_slot = NULL;
static ThreadVars *stream_pseudo_pkt_detect_TV = NULL;
static ThreadVars *stream_pseudo_pkt_detect_prev_TV = NULL;
static TmSlot *stream_pseudo_pkt_decode_tm_slot = NULL;
static ThreadVars *stream_pseudo_pkt_decode_TV = NULL;
#endif
/**
* \internal
* \brief Flush out if we have any unattended packets.
*/
static inline void FlowForceReassemblyFlushPendingPseudoPackets(void)
{
#if 0
/* we don't lock the queue, since flow manager is dead */
if (stream_pseudo_pkt_decode_tm_slot->slot_post_pq.len == 0)
return;
SCMutexLock(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq.mutex_q);
Packet *p = PacketDequeue(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq);
SCMutexUnlock(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq.mutex_q);
if (TmThreadsSlotProcessPkt(stream_pseudo_pkt_decode_TV,
stream_pseudo_pkt_decode_tm_slot,
p) != TM_ECODE_OK) {
SCLogError(SC_ERR_TM_THREADS_ERROR, "Received error from FFR on "
"flushing packets through decode->.. TMs");
}
#endif
return;
}
/**
* \internal
@ -528,12 +494,6 @@ static inline void FlowForceReassemblyForHash(void)
int server_ok = 0;
uint32_t idx = 0;
#if 0
/* We use this packet just for reassembly purpose */
Packet *reassemble_p = PacketGetFromAlloc();
if (reassemble_p == NULL)
return;
#endif
for (idx = 0; idx < flow_config.hash_size; idx++) {
FlowBucket *fb = &flow_hash[idx];
@ -544,9 +504,6 @@ static inline void FlowForceReassemblyForHash(void)
/* we need to loop through all the flows in the queue */
while (f != NULL) {
#if 0
PACKET_RECYCLE(reassemble_p);
#endif
FLOWLOCK_WRLOCK(f);
/* Get the tcp session for the flow */
@ -559,115 +516,17 @@ static inline void FlowForceReassemblyForHash(void)
continue;
}
if (FlowForceReassemblyNeedReassembly(f, &server_ok, &client_ok) == 1)
if (FlowForceReassemblyNeedReassembly(f, &server_ok, &client_ok) == 1) {
FlowForceReassemblyForFlowV2(f, server_ok, client_ok);
FLOWLOCK_UNLOCK(f);
#if 0
/* ah ah! We have some unattended toserver segments */
if (client_ok == STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_REASSEMBLY) {
StreamTcpThread *stt = SC_ATOMIC_GET(stream_pseudo_pkt_stream_tm_slot->slot_data);
ssn->client.last_ack = (ssn->client.seg_list_tail->seq +
ssn->client.seg_list_tail->payload_len);
FlowForceReassemblyPseudoPacketSetup(reassemble_p, 1, f, ssn, 1);
StreamTcpReassembleHandleSegment(stream_pseudo_pkt_stream_TV,
stt->ra_ctx, ssn, &ssn->server,
reassemble_p, NULL);
FlowDeReference(&reassemble_p->flow);
}
/* oh oh! We have some unattended toclient segments */
if (server_ok == STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_REASSEMBLY) {
StreamTcpThread *stt = SC_ATOMIC_GET(stream_pseudo_pkt_stream_tm_slot->slot_data);
ssn->server.last_ack = (ssn->server.seg_list_tail->seq +
ssn->server.seg_list_tail->payload_len);
FlowForceReassemblyPseudoPacketSetup(reassemble_p, 0, f, ssn, 1);
StreamTcpReassembleHandleSegment(stream_pseudo_pkt_stream_TV,
stt->ra_ctx, ssn, &ssn->client,
reassemble_p, NULL);
FlowDeReference(&reassemble_p->flow);
}
FLOWLOCK_UNLOCK(f);
/* insert a pseudo packet in the toserver direction */
if (client_ok) {
FLOWLOCK_WRLOCK(f);
Packet *p = FlowForceReassemblyPseudoPacketGet(0, f, ssn, 1);
FLOWLOCK_UNLOCK(f);
if (p == NULL) {
TmqhOutputPacketpool(NULL, reassemble_p);
FBLOCK_UNLOCK(fb);
return;
}
PKT_SET_SRC(p, PKT_SRC_FFR_SHUTDOWN);
if (stream_pseudo_pkt_detect_prev_TV != NULL) {
stream_pseudo_pkt_detect_prev_TV->
tmqh_out(stream_pseudo_pkt_detect_prev_TV, p);
} else {
TmSlot *s = stream_pseudo_pkt_detect_tm_slot;
while (s != NULL) {
TmSlotFunc SlotFunc = SC_ATOMIC_GET(s->SlotFunc);
SlotFunc(NULL, p, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq,
&s->slot_post_pq);
s = s->slot_next;
}
if (stream_pseudo_pkt_detect_TV != NULL) {
stream_pseudo_pkt_detect_TV->
tmqh_out(stream_pseudo_pkt_detect_TV, p);
} else {
TmqhOutputPacketpool(NULL, p);
}
}
}
if (server_ok) {
FLOWLOCK_WRLOCK(f);
Packet *p = FlowForceReassemblyPseudoPacketGet(1, f, ssn, 1);
FLOWLOCK_UNLOCK(f);
if (p == NULL) {
TmqhOutputPacketpool(NULL, reassemble_p);
FBLOCK_UNLOCK(fb);
return;
}
PKT_SET_SRC(p, PKT_SRC_FFR_SHUTDOWN);
if (stream_pseudo_pkt_detect_prev_TV != NULL) {
stream_pseudo_pkt_detect_prev_TV->
tmqh_out(stream_pseudo_pkt_detect_prev_TV, p);
} else {
TmSlot *s = stream_pseudo_pkt_detect_tm_slot;
while (s != NULL) {
TmSlotFunc SlotFunc = SC_ATOMIC_GET(s->SlotFunc);
SlotFunc(NULL, p, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq,
&s->slot_post_pq);
s = s->slot_next;
}
if (stream_pseudo_pkt_detect_TV != NULL) {
stream_pseudo_pkt_detect_TV->
tmqh_out(stream_pseudo_pkt_detect_TV, p);
} else {
TmqhOutputPacketpool(NULL, p);
}
}
}
#endif
/* next flow in the queue */
f = f->hnext;
}
FBLOCK_UNLOCK(fb);
}
#if 0
PKT_SET_SRC(reassemble_p, PKT_SRC_FFR_SHUTDOWN);
TmqhOutputPacketpool(NULL, reassemble_p);
#endif
return;
}
@ -676,143 +535,9 @@ static inline void FlowForceReassemblyForHash(void)
*/
void FlowForceReassembly(void)
{
/* Do remember. We need to have packet acquire disabled by now */
#if 0
/** ----- Part 1 ------*/
/* Flush out unattended packets */
FlowForceReassemblyFlushPendingPseudoPackets();
/** ----- Part 2 ----- **/
/* Check if all threads are idle. We need this so that we have all
* packets freeds. As a consequence, no flows are in use */
SCMutexLock(&tv_root_lock);
/* all receive threads are part of packet processing threads */
ThreadVars *tv = tv_root[TVT_PPT];
/* we are doing this in order receive -> decode -> ... -> log */
while (tv != NULL) {
if (tv->inq != NULL) {
/* we wait till we dry out all the inq packets, before we
* kill this thread. Do note that you should have disabled
* packet acquire by now using TmThreadDisableReceiveThreads()*/
if (!(strlen(tv->inq->name) == strlen("packetpool") &&
strcasecmp(tv->inq->name, "packetpool") == 0)) {
PacketQueue *q = &trans_q[tv->inq->id];
while (q->len != 0) {
usleep(100);
}
TmThreadsSetFlag(tv, THV_PAUSE);
if (tv->inq->q_type == 0)
SCCondSignal(&trans_q[tv->inq->id].cond_q);
else
SCCondSignal(&data_queues[tv->inq->id].cond_q);
while (!TmThreadsCheckFlag(tv, THV_PAUSED)) {
if (tv->inq->q_type == 0)
SCCondSignal(&trans_q[tv->inq->id].cond_q);
else
SCCondSignal(&data_queues[tv->inq->id].cond_q);
usleep(100);
}
TmThreadsUnsetFlag(tv, THV_PAUSE);
}
}
tv = tv->next;
}
SCMutexUnlock(&tv_root_lock);
#endif
/** ----- Part 3 ----- **/
/* Carry out flow reassembly for unattended flows */
FlowForceReassemblyForHash();
return;
}
/**
* \param detect_disabled bool, indicating if we use a detection engine (true)
*/
void FlowForceReassemblySetup(int detect_disabled)
{
#if 0
/* get StreamTCP TM's slot and TV containing this slot */
stream_pseudo_pkt_stream_tm_slot = TmSlotGetSlotForTM(TMM_STREAMTCP);
if (stream_pseudo_pkt_stream_tm_slot == NULL) {
/* yes, this is fatal! */
SCLogError(SC_ERR_TM_MODULES_ERROR, "Looks like we have failed to "
"retrieve the slot for STREAMTCP TM");
exit(EXIT_FAILURE);
}
stream_pseudo_pkt_stream_TV =
TmThreadsGetTVContainingSlot(stream_pseudo_pkt_stream_tm_slot);
if (stream_pseudo_pkt_stream_TV == NULL) {
/* yes, this is fatal! */
SCLogError(SC_ERR_TM_MODULES_ERROR, "Looks like we have failed to "
"retrieve the TV containing STREAMTCP TM slot");
exit(EXIT_FAILURE);
}
if (!detect_disabled) {
/* get detect TM's slot and TV containing this slot */
stream_pseudo_pkt_detect_tm_slot = TmSlotGetSlotForTM(TMM_DETECT);
if (stream_pseudo_pkt_detect_tm_slot == NULL) {
/* yes, this is fatal! */
SCLogError(SC_ERR_TM_MODULES_ERROR, "Looks like we have failed to "
"retrieve a slot for DETECT TM");
exit(EXIT_FAILURE);
}
stream_pseudo_pkt_detect_TV =
TmThreadsGetTVContainingSlot(stream_pseudo_pkt_detect_tm_slot);
if (stream_pseudo_pkt_detect_TV == NULL) {
/* yes, this is fatal! */
SCLogError(SC_ERR_TM_MODULES_ERROR, "Looks like we have failed to "
"retrieve the TV containing the Detect TM slot");
exit(EXIT_FAILURE);
}
if (stream_pseudo_pkt_detect_TV->tm_slots == stream_pseudo_pkt_detect_tm_slot) {
stream_pseudo_pkt_detect_prev_TV = stream_pseudo_pkt_detect_TV->prev;
}
if (strcasecmp(stream_pseudo_pkt_detect_TV->outqh_name, "packetpool") == 0) {
stream_pseudo_pkt_detect_TV = NULL;
}
}
SCMutexLock(&tv_root_lock);
ThreadVars *tv = tv_root[TVT_PPT];
int done = 0;
while (tv) {
TmSlot *slots = tv->tm_slots;
while (slots) {
TmModule *tm = TmModuleGetById(slots->tm_id);
if (tm->flags & TM_FLAG_DECODE_TM) {
done = 1;
stream_pseudo_pkt_decode_tm_slot = slots;
break;
}
slots = slots->slot_next;
}
if (done)
break;
tv = tv->next;
}
SCMutexUnlock(&tv_root_lock);
if (stream_pseudo_pkt_decode_tm_slot == NULL) {
/* yes, this is fatal! */
SCLogError(SC_ERR_TM_MODULES_ERROR, "Looks like we have failed to "
"retrieve the slot for DECODE TM");
exit(EXIT_FAILURE);
}
stream_pseudo_pkt_decode_TV =
TmThreadsGetTVContainingSlot(stream_pseudo_pkt_decode_tm_slot);
if (stream_pseudo_pkt_decode_TV == NULL) {
/* yes, this is fatal! */
SCLogError(SC_ERR_TM_MODULES_ERROR, "Looks like we have failed to "
"retrieve the TV containing the Decode TM slot");
exit(EXIT_FAILURE);
}
return;
#endif
}

Loading…
Cancel
Save