flowworker/stream: use no-lock packet queue

Use smaller structure for temporary packet queues.
pull/4531/head
Victor Julien 6 years ago
parent f8aed4ce2d
commit e3fbdf1948

@ -973,8 +973,8 @@ void AppLayerDeSetupCounters()
ThreadVars tv;\
StreamTcpThread *stt = NULL;\
TCPHdr tcph;\
PacketQueue pq;\
memset(&pq,0,sizeof(PacketQueue));\
PacketQueueNoLock pq;\
memset(&pq,0,sizeof(PacketQueueNoLock));\
memset(p, 0, SIZE_OF_PACKET);\
memset (&f, 0, sizeof(Flow));\
memset(&tv, 0, sizeof (ThreadVars));\

@ -65,7 +65,7 @@ typedef struct FlowWorkerThreadData_ {
uint16_t both_bypass_pkts;
uint16_t both_bypass_bytes;
PacketQueue pq;
PacketQueueNoLock pq;
} FlowWorkerThreadData;
@ -142,8 +142,7 @@ static TmEcode FlowWorkerThreadInit(ThreadVars *tv, const void *initdata, void *
AppLayerRegisterThreadCounters(tv);
/* setup pq for stream end pkts */
memset(&fw->pq, 0, sizeof(PacketQueue));
SCMutexInit(&fw->pq.mutex_q, NULL);
memset(&fw->pq, 0, sizeof(PacketQueueNoLock));
*data = fw;
return TM_ECODE_OK;
@ -170,7 +169,6 @@ static TmEcode FlowWorkerThreadDeinit(ThreadVars *tv, void *data)
/* free pq */
BUG_ON(fw->pq.len);
SCMutexDestroy(&fw->pq.mutex_q);
SC_ATOMIC_DESTROY(fw->detect_thread);
SCFree(fw);
@ -250,7 +248,7 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data)
/* Packets here can safely access p->flow as it's locked */
SCLogDebug("packet %"PRIu64": extra packets %u", p->pcap_cnt, fw->pq.len);
Packet *x;
while ((x = PacketDequeue(&fw->pq))) {
while ((x = PacketDequeueNoLock(&fw->pq))) {
SCLogDebug("packet %"PRIu64" extra packet %p", p->pcap_cnt, x);
// TODO do we need to call StreamTcp on these pseudo packets or not?

@ -1724,7 +1724,7 @@ static int StreamTcpReassembleHandleSegmentUpdateACK (ThreadVars *tv,
int StreamTcpReassembleHandleSegment(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx,
TcpSession *ssn, TcpStream *stream,
Packet *p, PacketQueue *pq)
Packet *p, PacketQueueNoLock *pq)
{
SCEnter();
@ -2057,8 +2057,8 @@ static int StreamTcpReassembleTest33(void)
StreamTcpUTInit(&ra_ctx);
StreamTcpUTSetupSession(&ssn);
PacketQueue pq;
memset(&pq,0,sizeof(PacketQueue));
PacketQueueNoLock pq;
memset(&pq,0,sizeof(PacketQueueNoLock));
memset(&f, 0, sizeof (Flow));
memset(&tcph, 0, sizeof (TCPHdr));
ThreadVars tv;
@ -2119,8 +2119,8 @@ static int StreamTcpReassembleTest34(void)
StreamTcpUTInit(&ra_ctx);
StreamTcpUTSetupSession(&ssn);
PacketQueue pq;
memset(&pq,0,sizeof(PacketQueue));
PacketQueueNoLock pq;
memset(&pq,0,sizeof(PacketQueueNoLock));
memset(&f, 0, sizeof (Flow));
memset(&tcph, 0, sizeof (TCPHdr));
ThreadVars tv;
@ -2177,7 +2177,7 @@ static int StreamTcpReassembleTest37(void)
TCPHdr tcph;
TcpReassemblyThreadCtx *ra_ctx = NULL;
uint8_t packet[1460] = "";
PacketQueue pq;
PacketQueueNoLock pq;
ThreadVars tv;
memset(&tv, 0, sizeof (ThreadVars));
@ -2186,7 +2186,7 @@ static int StreamTcpReassembleTest37(void)
StreamTcpUTInit(&ra_ctx);
StreamTcpUTSetupSession(&ssn);
memset(&pq,0,sizeof(PacketQueue));
memset(&pq,0,sizeof(PacketQueueNoLock));
memset(&f, 0, sizeof (Flow));
memset(&tcph, 0, sizeof (TCPHdr));
memset(&tv, 0, sizeof (ThreadVars));
@ -2250,8 +2250,8 @@ static int StreamTcpReassembleTest39 (void)
ThreadVars tv;
StreamTcpThread stt;
TCPHdr tcph;
PacketQueue pq;
memset(&pq,0,sizeof(PacketQueue));
PacketQueueNoLock pq;
memset(&pq,0,sizeof(PacketQueueNoLock));
memset (&f, 0, sizeof(Flow));
memset(&tv, 0, sizeof (ThreadVars));
memset(&stt, 0, sizeof (stt));
@ -2733,8 +2733,8 @@ static int StreamTcpReassembleTest40 (void)
Flow *f = NULL;
TCPHdr tcph;
TcpSession ssn;
PacketQueue pq;
memset(&pq,0,sizeof(PacketQueue));
PacketQueueNoLock pq;
memset(&pq,0,sizeof(PacketQueueNoLock));
memset(&tcph, 0, sizeof (TCPHdr));
ThreadVars tv;
memset(&tv, 0, sizeof (ThreadVars));
@ -2982,8 +2982,8 @@ static int StreamTcpReassembleTest47 (void)
TCPHdr tcph;
TcpSession ssn;
ThreadVars tv;
PacketQueue pq;
memset(&pq,0,sizeof(PacketQueue));
PacketQueueNoLock pq;
memset(&pq,0,sizeof(PacketQueueNoLock));
memset(&tcph, 0, sizeof (TCPHdr));
memset(&tv, 0, sizeof (ThreadVars));
StreamTcpInitConfig(TRUE);

@ -82,7 +82,7 @@ typedef struct TcpReassemblyThreadCtx_ {
#define OS_POLICY_DEFAULT OS_POLICY_BSD
void StreamTcpReassembleInitMemuse(void);
int StreamTcpReassembleHandleSegment(ThreadVars *, TcpReassemblyThreadCtx *, TcpSession *, TcpStream *, Packet *, PacketQueue *);
int StreamTcpReassembleHandleSegment(ThreadVars *, TcpReassemblyThreadCtx *, TcpSession *, TcpStream *, Packet *, PacketQueueNoLock *);
int StreamTcpReassembleInit(char);
void StreamTcpReassembleFree(char);
void StreamTcpReassembleRegisterTests(void);

@ -94,7 +94,7 @@
#define STREAMTCP_EMERG_EST_TIMEOUT 300
#define STREAMTCP_EMERG_CLOSED_TIMEOUT 20
static int StreamTcpHandleFin(ThreadVars *tv, StreamTcpThread *, TcpSession *, Packet *, PacketQueue *);
static int StreamTcpHandleFin(ThreadVars *tv, StreamTcpThread *, TcpSession *, Packet *, PacketQueueNoLock *);
void StreamTcpReturnStreamSegments (TcpStream *);
void StreamTcpInitConfig(char);
int StreamTcpGetFlowState(void *);
@ -105,7 +105,7 @@ static int StreamTcpHandleTimestamp(TcpSession * , Packet *);
static int StreamTcpValidateRst(TcpSession * , Packet *);
static inline int StreamTcpValidateAck(TcpSession *ssn, TcpStream *, Packet *);
static int StreamTcpStateDispatch(ThreadVars *tv, Packet *p,
StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq,
StreamTcpThread *stt, TcpSession *ssn, PacketQueueNoLock *pq,
uint8_t state);
extern int g_detect_disabled;
@ -899,7 +899,8 @@ static int StreamTcpPacketIsRetransmission(TcpStream *stream, Packet *p)
* \retval -1 error
*/
static int StreamTcpPacketStateNone(ThreadVars *tv, Packet *p,
StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq)
StreamTcpThread *stt, TcpSession *ssn,
PacketQueueNoLock *pq)
{
if (p->tcph->th_flags & TH_RST) {
StreamTcpSetEvent(p, STREAM_RST_BUT_NO_SESSION);
@ -1407,7 +1408,8 @@ static inline bool StateSynSentValidateTimestamp(TcpSession *ssn, Packet *p)
*/
static int StreamTcpPacketStateSynSent(ThreadVars *tv, Packet *p,
StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq)
StreamTcpThread *stt, TcpSession *ssn,
PacketQueueNoLock *pq)
{
if (ssn == NULL)
return -1;
@ -1732,7 +1734,8 @@ static int StreamTcpPacketStateSynSent(ThreadVars *tv, Packet *p,
*/
static int StreamTcpPacketStateSynRecv(ThreadVars *tv, Packet *p,
StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq)
StreamTcpThread *stt, TcpSession *ssn,
PacketQueueNoLock *pq)
{
if (ssn == NULL)
return -1;
@ -2136,7 +2139,7 @@ static int StreamTcpPacketStateSynRecv(ThreadVars *tv, Packet *p,
* \param stt Strean Thread module registered to handle the stream handling
*/
static int HandleEstablishedPacketToServer(ThreadVars *tv, TcpSession *ssn, Packet *p,
StreamTcpThread *stt, PacketQueue *pq)
StreamTcpThread *stt, PacketQueueNoLock *pq)
{
SCLogDebug("ssn %p: =+ pkt (%" PRIu32 ") is to server: SEQ %" PRIu32 ","
"ACK %" PRIu32 ", WIN %"PRIu16"", ssn, p->payload_len,
@ -2322,7 +2325,7 @@ static int HandleEstablishedPacketToServer(ThreadVars *tv, TcpSession *ssn, Pack
* \param stt Strean Thread module registered to handle the stream handling
*/
static int HandleEstablishedPacketToClient(ThreadVars *tv, TcpSession *ssn, Packet *p,
StreamTcpThread *stt, PacketQueue *pq)
StreamTcpThread *stt, PacketQueueNoLock *pq)
{
SCLogDebug("ssn %p: =+ pkt (%" PRIu32 ") is to client: SEQ %" PRIu32 ","
" ACK %" PRIu32 ", WIN %"PRIu16"", ssn, p->payload_len,
@ -2494,7 +2497,7 @@ static inline uint32_t StreamTcpResetGetMaxAck(TcpStream *stream, uint32_t seq)
*/
static int StreamTcpPacketStateEstablished(ThreadVars *tv, Packet *p,
StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq)
StreamTcpThread *stt, TcpSession *ssn, PacketQueueNoLock *pq)
{
if (ssn == NULL)
return -1;
@ -2705,7 +2708,7 @@ static int StreamTcpPacketStateEstablished(ThreadVars *tv, Packet *p,
*/
static int StreamTcpHandleFin(ThreadVars *tv, StreamTcpThread *stt,
TcpSession *ssn, Packet *p, PacketQueue *pq)
TcpSession *ssn, Packet *p, PacketQueueNoLock *pq)
{
if (PKT_IS_TOSERVER(p)) {
SCLogDebug("ssn %p: pkt (%" PRIu32 ") is to server: SEQ %" PRIu32 ","
@ -2820,7 +2823,7 @@ static int StreamTcpHandleFin(ThreadVars *tv, StreamTcpThread *stt,
*/
static int StreamTcpPacketStateFinWait1(ThreadVars *tv, Packet *p,
StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq)
StreamTcpThread *stt, TcpSession *ssn, PacketQueueNoLock *pq)
{
if (ssn == NULL)
return -1;
@ -3259,7 +3262,7 @@ static int StreamTcpPacketStateFinWait1(ThreadVars *tv, Packet *p,
*/
static int StreamTcpPacketStateFinWait2(ThreadVars *tv, Packet *p,
StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq)
StreamTcpThread *stt, TcpSession *ssn, PacketQueueNoLock *pq)
{
if (ssn == NULL)
return -1;
@ -3558,7 +3561,7 @@ static int StreamTcpPacketStateFinWait2(ThreadVars *tv, Packet *p,
*/
static int StreamTcpPacketStateClosing(ThreadVars *tv, Packet *p,
StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq)
StreamTcpThread *stt, TcpSession *ssn, PacketQueueNoLock *pq)
{
if (ssn == NULL)
return -1;
@ -3724,7 +3727,7 @@ static int StreamTcpPacketStateClosing(ThreadVars *tv, Packet *p,
*/
static int StreamTcpPacketStateCloseWait(ThreadVars *tv, Packet *p,
StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq)
StreamTcpThread *stt, TcpSession *ssn, PacketQueueNoLock *pq)
{
SCEnter();
@ -4027,7 +4030,7 @@ static int StreamTcpPacketStateCloseWait(ThreadVars *tv, Packet *p,
*/
static int StreamTcpPacketStateLastAck(ThreadVars *tv, Packet *p,
StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq)
StreamTcpThread *stt, TcpSession *ssn, PacketQueueNoLock *pq)
{
if (ssn == NULL)
return -1;
@ -4154,7 +4157,7 @@ static int StreamTcpPacketStateLastAck(ThreadVars *tv, Packet *p,
*/
static int StreamTcpPacketStateTimeWait(ThreadVars *tv, Packet *p,
StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq)
StreamTcpThread *stt, TcpSession *ssn, PacketQueueNoLock *pq)
{
if (ssn == NULL)
return -1;
@ -4317,7 +4320,7 @@ static int StreamTcpPacketStateTimeWait(ThreadVars *tv, Packet *p,
}
static int StreamTcpPacketStateClosed(ThreadVars *tv, Packet *p,
StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq)
StreamTcpThread *stt, TcpSession *ssn, PacketQueueNoLock *pq)
{
if (ssn == NULL)
return -1;
@ -4667,7 +4670,7 @@ static int StreamTcpPacketIsBadWindowUpdate(TcpSession *ssn, Packet *p)
* \param state current TCP state
*/
static inline int StreamTcpStateDispatch(ThreadVars *tv, Packet *p,
StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq,
StreamTcpThread *stt, TcpSession *ssn, PacketQueueNoLock *pq,
const uint8_t state)
{
SCLogDebug("ssn: %p", ssn);
@ -4760,7 +4763,7 @@ static inline void HandleThreadId(ThreadVars *tv, Packet *p, StreamTcpThread *st
/* flow is and stays locked */
int StreamTcpPacket (ThreadVars *tv, Packet *p, StreamTcpThread *stt,
PacketQueue *pq)
PacketQueueNoLock *pq)
{
SCEnter();
@ -4891,7 +4894,7 @@ int StreamTcpPacket (ThreadVars *tv, Packet *p, StreamTcpThread *stt,
if (ssn != NULL) {
while (stt->pseudo_queue.len > 0) {
SCLogDebug("processing pseudo packet / stream end");
Packet *np = PacketDequeue(&stt->pseudo_queue);
Packet *np = PacketDequeueNoLock(&stt->pseudo_queue);
if (np != NULL) {
/* process the opposing direction of the original packet */
if (PKT_IS_TOSERVER(np)) {
@ -4905,7 +4908,7 @@ int StreamTcpPacket (ThreadVars *tv, Packet *p, StreamTcpThread *stt,
}
/* enqueue this packet so we inspect it in detect etc */
PacketEnqueue(pq, np);
PacketEnqueueNoLock(pq, np);
}
SCLogDebug("processing pseudo packet / stream end done");
}
@ -4961,9 +4964,9 @@ int StreamTcpPacket (ThreadVars *tv, Packet *p, StreamTcpThread *stt,
error:
/* make sure we don't leave packets in our pseudo queue */
while (stt->pseudo_queue.len > 0) {
Packet *np = PacketDequeue(&stt->pseudo_queue);
Packet *np = PacketDequeueNoLock(&stt->pseudo_queue);
if (np != NULL) {
PacketEnqueue(pq, np);
PacketEnqueueNoLock(pq, np);
}
}
@ -5177,7 +5180,7 @@ int TcpSessionPacketSsnReuse(const Packet *p, const Flow *f, const void *tcp_ssn
return 0;
}
TmEcode StreamTcp (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq)
TmEcode StreamTcp (ThreadVars *tv, Packet *p, void *data, PacketQueueNoLock *pq)
{
StreamTcpThread *stt = (StreamTcpThread *)data;
@ -5975,7 +5978,7 @@ Packet *StreamTcpPseudoSetup(Packet *parent, uint8_t *pkt, uint32_t len)
*/
static void StreamTcpPseudoPacketCreateDetectLogFlush(ThreadVars *tv,
StreamTcpThread *stt, Packet *parent,
TcpSession *ssn, PacketQueue *pq, int dir)
TcpSession *ssn, PacketQueueNoLock *pq, int dir)
{
SCEnter();
Flow *f = parent->flow;
@ -6146,7 +6149,7 @@ static void StreamTcpPseudoPacketCreateDetectLogFlush(ThreadVars *tv,
memcpy(&np->ts, &parent->ts, sizeof(struct timeval));
SCLogDebug("np %p", np);
PacketEnqueue(pq, np);
PacketEnqueueNoLock(pq, np);
StatsIncr(tv, stt->counter_tcp_pseudo);
SCReturn;
@ -6162,7 +6165,8 @@ error:
* Flag TCP engine that data needs to be inspected regardless
* of how far we are wrt inspect limits.
*/
void StreamTcpDetectLogFlush(ThreadVars *tv, StreamTcpThread *stt, Flow *f, Packet *p, PacketQueue *pq)
void StreamTcpDetectLogFlush(ThreadVars *tv, StreamTcpThread *stt, Flow *f, Packet *p,
PacketQueueNoLock *pq)
{
TcpSession *ssn = f->protoctx;
ssn->client.flags |= STREAMTCP_STREAM_FLAG_TRIGGER_RAW;
@ -6325,8 +6329,8 @@ static int StreamTcpTest02 (void)
StreamTcpThread stt;
uint8_t payload[4];
TCPHdr tcph;
PacketQueue pq;
memset(&pq,0,sizeof(PacketQueue));
PacketQueueNoLock pq;
memset(&pq,0,sizeof(pq));
memset(p, 0, SIZE_OF_PACKET);
memset (&f, 0, sizeof(Flow));
memset(&tv, 0, sizeof (ThreadVars));
@ -6412,8 +6416,8 @@ static int StreamTcpTest03 (void)
StreamTcpThread stt;
TCPHdr tcph;
memset(p, 0, SIZE_OF_PACKET);
PacketQueue pq;
memset(&pq,0,sizeof(PacketQueue));
PacketQueueNoLock pq;
memset(&pq,0,sizeof(pq));
memset (&f, 0, sizeof(Flow));
memset(&tv, 0, sizeof (ThreadVars));
memset(&stt, 0, sizeof (StreamTcpThread));
@ -6488,8 +6492,8 @@ static int StreamTcpTest04 (void)
StreamTcpThread stt;
TCPHdr tcph;
memset(p, 0, SIZE_OF_PACKET);
PacketQueue pq;
memset(&pq,0,sizeof(PacketQueue));
PacketQueueNoLock pq;
memset(&pq,0,sizeof(pq));
memset (&f, 0, sizeof(Flow));
memset(&tv, 0, sizeof (ThreadVars));
memset(&stt, 0, sizeof (StreamTcpThread));
@ -6558,8 +6562,8 @@ static int StreamTcpTest05 (void)
TCPHdr tcph;
uint8_t payload[4];
memset(p, 0, SIZE_OF_PACKET);
PacketQueue pq;
memset(&pq,0,sizeof(PacketQueue));
PacketQueueNoLock pq;
memset(&pq,0,sizeof(PacketQueueNoLock));
memset (&f, 0, sizeof(Flow));
memset(&tv, 0, sizeof (ThreadVars));
memset(&stt, 0, sizeof (StreamTcpThread));
@ -6658,8 +6662,8 @@ static int StreamTcpTest06 (void)
StreamTcpThread stt;
TCPHdr tcph;
memset(p, 0, SIZE_OF_PACKET);
PacketQueue pq;
memset(&pq,0,sizeof(PacketQueue));
PacketQueueNoLock pq;
memset(&pq,0,sizeof(PacketQueueNoLock));
memset (&f, 0, sizeof(Flow));
memset(&ssn, 0, sizeof (TcpSession));
memset(&tv, 0, sizeof (ThreadVars));
@ -6719,10 +6723,10 @@ static int StreamTcpTest07 (void)
StreamTcpThread stt;
TCPHdr tcph;
uint8_t payload[1] = {0x42};
PacketQueue pq;
PacketQueueNoLock pq;
memset(p, 0, SIZE_OF_PACKET);
memset(&pq,0,sizeof(PacketQueue));
memset(&pq,0,sizeof(PacketQueueNoLock));
memset (&f, 0, sizeof(Flow));
memset(&tv, 0, sizeof (ThreadVars));
memset(&stt, 0, sizeof(StreamTcpThread));
@ -6783,8 +6787,8 @@ static int StreamTcpTest08 (void)
uint8_t payload[1] = {0x42};
memset(p, 0, SIZE_OF_PACKET);
PacketQueue pq;
memset(&pq,0,sizeof(PacketQueue));
PacketQueueNoLock pq;
memset(&pq,0,sizeof(PacketQueueNoLock));
memset (&f, 0, sizeof(Flow));
memset(&tv, 0, sizeof (ThreadVars));
memset(&stt, 0, sizeof(StreamTcpThread));
@ -6846,8 +6850,8 @@ static int StreamTcpTest09 (void)
uint8_t payload[1] = {0x42};
memset(p, 0, SIZE_OF_PACKET);
PacketQueue pq;
memset(&pq,0,sizeof(PacketQueue));
PacketQueueNoLock pq;
memset(&pq,0,sizeof(PacketQueueNoLock));
memset (&f, 0, sizeof(Flow));
memset(&tv, 0, sizeof (ThreadVars));
memset(&stt, 0, sizeof(StreamTcpThread));
@ -6916,8 +6920,8 @@ static int StreamTcpTest10 (void)
TCPHdr tcph;
uint8_t payload[4];
memset(p, 0, SIZE_OF_PACKET);
PacketQueue pq;
memset(&pq,0,sizeof(PacketQueue));
PacketQueueNoLock pq;
memset(&pq,0,sizeof(PacketQueueNoLock));
memset (&f, 0, sizeof(Flow));
memset(&tv, 0, sizeof (ThreadVars));
memset(&stt, 0, sizeof (StreamTcpThread));
@ -6995,8 +6999,8 @@ static int StreamTcpTest11 (void)
TCPHdr tcph;
uint8_t payload[4];
memset(p, 0, SIZE_OF_PACKET);
PacketQueue pq;
memset(&pq,0,sizeof(PacketQueue));
PacketQueueNoLock pq;
memset(&pq,0,sizeof(PacketQueueNoLock));
memset (&f, 0, sizeof(Flow));
memset(&tv, 0, sizeof (ThreadVars));
memset(&stt, 0, sizeof (StreamTcpThread));
@ -7076,8 +7080,8 @@ static int StreamTcpTest12 (void)
TCPHdr tcph;
uint8_t payload[4];
memset(p, 0, SIZE_OF_PACKET);
PacketQueue pq;
memset(&pq,0,sizeof(PacketQueue));
PacketQueueNoLock pq;
memset(&pq,0,sizeof(PacketQueueNoLock));
memset (&f, 0, sizeof(Flow));
memset(&tv, 0, sizeof (ThreadVars));
memset(&stt, 0, sizeof (StreamTcpThread));
@ -7172,8 +7176,8 @@ static int StreamTcpTest13 (void)
TCPHdr tcph;
uint8_t payload[4];
memset(p, 0, SIZE_OF_PACKET);
PacketQueue pq;
memset(&pq,0,sizeof(PacketQueue));
PacketQueueNoLock pq;
memset(&pq,0,sizeof(PacketQueueNoLock));
memset (&f, 0, sizeof(Flow));
memset(&tv, 0, sizeof (ThreadVars));
memset(&stt, 0, sizeof (StreamTcpThread));
@ -7375,8 +7379,8 @@ static int StreamTcpTest14 (void)
IPV4Hdr ipv4h;
char os_policy_name[10] = "windows";
const char *ip_addr;
PacketQueue pq;
memset(&pq,0,sizeof(PacketQueue));
PacketQueueNoLock pq;
memset(&pq,0,sizeof(PacketQueueNoLock));
memset(p, 0, SIZE_OF_PACKET);
memset (&f, 0, sizeof(Flow));
@ -7541,8 +7545,8 @@ static int StreamTcp4WHSTest01 (void)
StreamTcpThread stt;
TCPHdr tcph;
memset(p, 0, SIZE_OF_PACKET);
PacketQueue pq;
memset(&pq,0,sizeof(PacketQueue));
PacketQueueNoLock pq;
memset(&pq,0,sizeof(PacketQueueNoLock));
memset (&f, 0, sizeof(Flow));
memset(&tv, 0, sizeof (ThreadVars));
memset(&stt, 0, sizeof (StreamTcpThread));
@ -7623,8 +7627,8 @@ static int StreamTcp4WHSTest02 (void)
StreamTcpThread stt;
TCPHdr tcph;
memset(p, 0, SIZE_OF_PACKET);
PacketQueue pq;
memset(&pq,0,sizeof(PacketQueue));
PacketQueueNoLock pq;
memset(&pq,0,sizeof(PacketQueueNoLock));
memset (&f, 0, sizeof(Flow));
memset(&tv, 0, sizeof (ThreadVars));
memset(&stt, 0, sizeof (StreamTcpThread));
@ -7693,8 +7697,8 @@ static int StreamTcp4WHSTest03 (void)
StreamTcpThread stt;
TCPHdr tcph;
memset(p, 0, SIZE_OF_PACKET);
PacketQueue pq;
memset(&pq,0,sizeof(PacketQueue));
PacketQueueNoLock pq;
memset(&pq,0,sizeof(PacketQueueNoLock));
memset (&f, 0, sizeof(Flow));
memset(&tv, 0, sizeof (ThreadVars));
memset(&stt, 0, sizeof (StreamTcpThread));
@ -7777,8 +7781,8 @@ static int StreamTcpTest15 (void)
IPV4Hdr ipv4h;
char os_policy_name[10] = "windows";
const char *ip_addr;
PacketQueue pq;
memset(&pq,0,sizeof(PacketQueue));
PacketQueueNoLock pq;
memset(&pq,0,sizeof(PacketQueueNoLock));
memset(p, 0, SIZE_OF_PACKET);
memset (&f, 0, sizeof(Flow));
@ -7944,8 +7948,8 @@ static int StreamTcpTest16 (void)
IPV4Hdr ipv4h;
char os_policy_name[10] = "windows";
const char *ip_addr;
PacketQueue pq;
memset(&pq,0,sizeof(PacketQueue));
PacketQueueNoLock pq;
memset(&pq,0,sizeof(PacketQueueNoLock));
memset(p, 0, SIZE_OF_PACKET);
memset (&f, 0, sizeof(Flow));
@ -8114,8 +8118,8 @@ static int StreamTcpTest17 (void)
IPV4Hdr ipv4h;
char os_policy_name[10] = "windows";
const char *ip_addr;
PacketQueue pq;
memset(&pq,0,sizeof(PacketQueue));
PacketQueueNoLock pq;
memset(&pq,0,sizeof(PacketQueueNoLock));
memset(p, 0, SIZE_OF_PACKET);
memset (&f, 0, sizeof(Flow));
@ -8529,12 +8533,12 @@ static int StreamTcpTest23(void)
TCPHdr tcph;
uint8_t packet[1460] = "";
ThreadVars tv;
PacketQueue pq;
PacketQueueNoLock pq;
Packet *p = SCMalloc(SIZE_OF_PACKET);
FAIL_IF(p == NULL);
memset(&pq,0,sizeof(PacketQueue));
memset(&pq,0,sizeof(PacketQueueNoLock));
memset(p, 0, SIZE_OF_PACKET);
memset(&f, 0, sizeof (Flow));
memset(&tcph, 0, sizeof (TCPHdr));
@ -8598,8 +8602,8 @@ static int StreamTcpTest24(void)
uint8_t packet[1460] = "";
ThreadVars tv;
memset(&tv, 0, sizeof (ThreadVars));
PacketQueue pq;
memset(&pq,0,sizeof(PacketQueue));
PacketQueueNoLock pq;
memset(&pq,0,sizeof(PacketQueueNoLock));
StreamTcpUTInit(&stt.ra_ctx);
StreamTcpUTSetupSession(&ssn);
@ -8668,8 +8672,8 @@ static int StreamTcpTest25(void)
uint8_t payload[4];
TCPHdr tcph;
int ret = 0;
PacketQueue pq;
memset(&pq,0,sizeof(PacketQueue));
PacketQueueNoLock pq;
memset(&pq,0,sizeof(PacketQueueNoLock));
memset(p, 0, SIZE_OF_PACKET);
memset (&f, 0, sizeof(Flow));
@ -8761,8 +8765,8 @@ static int StreamTcpTest26(void)
uint8_t payload[4];
TCPHdr tcph;
int ret = 0;
PacketQueue pq;
memset(&pq,0,sizeof(PacketQueue));
PacketQueueNoLock pq;
memset(&pq,0,sizeof(PacketQueueNoLock));
memset(p, 0, SIZE_OF_PACKET);
memset (&f, 0, sizeof(Flow));
@ -8855,8 +8859,8 @@ static int StreamTcpTest27(void)
uint8_t payload[4];
TCPHdr tcph;
int ret = 0;
PacketQueue pq;
memset(&pq,0,sizeof(PacketQueue));
PacketQueueNoLock pq;
memset(&pq,0,sizeof(PacketQueueNoLock));
memset(p, 0, SIZE_OF_PACKET);
memset (&f, 0, sizeof(Flow));
@ -9412,8 +9416,8 @@ static int StreamTcpTest32(void)
TCPHdr tcph;
TcpReassemblyThreadCtx *ra_ctx = StreamTcpReassembleInitThreadCtx(NULL);
int ret = 0;
PacketQueue pq;
memset(&pq,0,sizeof(PacketQueue));
PacketQueueNoLock pq;
memset(&pq,0,sizeof(PacketQueueNoLock));
memset (&p, 0, SIZE_OF_PACKET);
memset (&f, 0, sizeof(Flow));
@ -9502,8 +9506,8 @@ static int StreamTcpTest33 (void)
StreamTcpThread stt;
TCPHdr tcph;
TcpReassemblyThreadCtx ra_ctx;
PacketQueue pq;
memset(&pq,0,sizeof(PacketQueue));
PacketQueueNoLock pq;
memset(&pq,0,sizeof(PacketQueueNoLock));
memset(&ra_ctx, 0, sizeof(TcpReassemblyThreadCtx));
memset (&p, 0, SIZE_OF_PACKET);
memset (&f, 0, sizeof(Flow));
@ -9603,8 +9607,8 @@ static int StreamTcpTest34 (void)
StreamTcpThread stt;
TCPHdr tcph;
TcpReassemblyThreadCtx ra_ctx;
PacketQueue pq;
memset(&pq,0,sizeof(PacketQueue));
PacketQueueNoLock pq;
memset(&pq,0,sizeof(PacketQueueNoLock));
memset(&ra_ctx, 0, sizeof(TcpReassemblyThreadCtx));
memset (&p, 0, SIZE_OF_PACKET);
memset (&f, 0, sizeof(Flow));
@ -9668,8 +9672,8 @@ static int StreamTcpTest35 (void)
StreamTcpThread stt;
TCPHdr tcph;
TcpReassemblyThreadCtx ra_ctx;
PacketQueue pq;
memset(&pq,0,sizeof(PacketQueue));
PacketQueueNoLock pq;
memset(&pq,0,sizeof(PacketQueueNoLock));
memset(&ra_ctx, 0, sizeof(TcpReassemblyThreadCtx));
memset (&p, 0, SIZE_OF_PACKET);
memset (&f, 0, sizeof(Flow));
@ -9733,8 +9737,8 @@ static int StreamTcpTest36(void)
TCPHdr tcph;
TcpReassemblyThreadCtx *ra_ctx = StreamTcpReassembleInitThreadCtx(NULL);
int ret = 0;
PacketQueue pq;
memset(&pq,0,sizeof(PacketQueue));
PacketQueueNoLock pq;
memset(&pq,0,sizeof(PacketQueueNoLock));
memset (&p, 0, SIZE_OF_PACKET);
memset (&f, 0, sizeof(Flow));
@ -9826,8 +9830,8 @@ static int StreamTcpTest37(void)
uint8_t payload[4];
TCPHdr tcph;
int ret = 0;
PacketQueue pq;
memset(&pq,0,sizeof(PacketQueue));
PacketQueueNoLock pq;
memset(&pq,0,sizeof(PacketQueueNoLock));
memset(p, 0, SIZE_OF_PACKET);
memset (&f, 0, sizeof(Flow));
@ -9942,13 +9946,13 @@ static int StreamTcpTest38 (void)
StreamTcpThread stt;
uint8_t payload[128];
TCPHdr tcph;
PacketQueue pq;
PacketQueueNoLock pq;
memset (&f, 0, sizeof(Flow));
memset(&tv, 0, sizeof (ThreadVars));
memset(&stt, 0, sizeof (StreamTcpThread));
memset(&tcph, 0, sizeof (TCPHdr));
memset(&pq,0,sizeof(PacketQueue));
memset(&pq,0,sizeof(PacketQueueNoLock));
Packet *p = SCMalloc(SIZE_OF_PACKET);
if (unlikely(p == NULL))
@ -10097,13 +10101,13 @@ static int StreamTcpTest39 (void)
StreamTcpThread stt;
uint8_t payload[4];
TCPHdr tcph;
PacketQueue pq;
PacketQueueNoLock pq;
memset (&f, 0, sizeof(Flow));
memset(&tv, 0, sizeof (ThreadVars));
memset(&stt, 0, sizeof (StreamTcpThread));
memset(&tcph, 0, sizeof (TCPHdr));
memset(&pq,0,sizeof(PacketQueue));
memset(&pq,0,sizeof(PacketQueueNoLock));
Packet *p = SCMalloc(SIZE_OF_PACKET);
if (unlikely(p == NULL))
@ -10226,7 +10230,7 @@ static int StreamTcpTest42 (void)
ThreadVars tv;
StreamTcpThread stt;
TCPHdr tcph;
PacketQueue pq;
PacketQueueNoLock pq;
Packet *p = SCMalloc(SIZE_OF_PACKET);
TcpSession *ssn;
@ -10234,7 +10238,7 @@ static int StreamTcpTest42 (void)
return 0;
memset(p, 0, SIZE_OF_PACKET);
memset(&pq,0,sizeof(PacketQueue));
memset(&pq,0,sizeof(PacketQueueNoLock));
memset (&f, 0, sizeof(Flow));
memset(&tv, 0, sizeof (ThreadVars));
memset(&stt, 0, sizeof (StreamTcpThread));
@ -10318,7 +10322,7 @@ static int StreamTcpTest43 (void)
ThreadVars tv;
StreamTcpThread stt;
TCPHdr tcph;
PacketQueue pq;
PacketQueueNoLock pq;
Packet *p = SCMalloc(SIZE_OF_PACKET);
TcpSession *ssn;
@ -10326,7 +10330,7 @@ static int StreamTcpTest43 (void)
return 0;
memset(p, 0, SIZE_OF_PACKET);
memset(&pq,0,sizeof(PacketQueue));
memset(&pq,0,sizeof(PacketQueueNoLock));
memset (&f, 0, sizeof(Flow));
memset(&tv, 0, sizeof (ThreadVars));
memset(&stt, 0, sizeof (StreamTcpThread));
@ -10410,7 +10414,7 @@ static int StreamTcpTest44 (void)
ThreadVars tv;
StreamTcpThread stt;
TCPHdr tcph;
PacketQueue pq;
PacketQueueNoLock pq;
Packet *p = SCMalloc(SIZE_OF_PACKET);
TcpSession *ssn;
@ -10418,7 +10422,7 @@ static int StreamTcpTest44 (void)
return 0;
memset(p, 0, SIZE_OF_PACKET);
memset(&pq,0,sizeof(PacketQueue));
memset(&pq,0,sizeof(PacketQueueNoLock));
memset (&f, 0, sizeof(Flow));
memset(&tv, 0, sizeof (ThreadVars));
memset(&stt, 0, sizeof (StreamTcpThread));
@ -10497,7 +10501,7 @@ static int StreamTcpTest45 (void)
ThreadVars tv;
StreamTcpThread stt;
TCPHdr tcph;
PacketQueue pq;
PacketQueueNoLock pq;
Packet *p = SCMalloc(SIZE_OF_PACKET);
TcpSession *ssn;
@ -10505,7 +10509,7 @@ static int StreamTcpTest45 (void)
return 0;
memset(p, 0, SIZE_OF_PACKET);
memset(&pq,0,sizeof(PacketQueue));
memset(&pq,0,sizeof(PacketQueueNoLock));
memset (&f, 0, sizeof(Flow));
memset(&tv, 0, sizeof (ThreadVars));
memset(&stt, 0, sizeof (StreamTcpThread));

@ -73,7 +73,7 @@ typedef struct StreamTcpThread_ {
/** queue for pseudo packet(s) that were created in the stream
* process and need further handling. Currently only used when
* receiving (valid) RST packets */
PacketQueue pseudo_queue;
PacketQueueNoLock pseudo_queue;
uint16_t counter_tcp_sessions;
/** sessions not picked up because memcap was reached */
@ -138,7 +138,7 @@ int StreamReassembleRaw(TcpSession *ssn, const Packet *p,
uint64_t *progress_out, bool respect_inspect_depth);
void StreamReassembleRawUpdateProgress(TcpSession *ssn, Packet *p, uint64_t progress);
void StreamTcpDetectLogFlush(ThreadVars *tv, StreamTcpThread *stt, Flow *f, Packet *p, PacketQueue *pq);
void StreamTcpDetectLogFlush(ThreadVars *tv, StreamTcpThread *stt, Flow *f, Packet *p, PacketQueueNoLock *pq);
/** ------- Inline functions: ------ */
@ -172,14 +172,14 @@ enum {
STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_ONLY_DETECTION = 1,
};
TmEcode StreamTcp (ThreadVars *, Packet *, void *, PacketQueue *);
TmEcode StreamTcp (ThreadVars *, Packet *, void *, PacketQueueNoLock *);
int StreamNeedsReassembly(const TcpSession *ssn, uint8_t direction);
TmEcode StreamTcpThreadInit(ThreadVars *, void *, void **);
TmEcode StreamTcpThreadDeinit(ThreadVars *tv, void *data);
void StreamTcpRegisterTests (void);
int StreamTcpPacket (ThreadVars *tv, Packet *p, StreamTcpThread *stt,
PacketQueue *pq);
PacketQueueNoLock *pq);
/* clear ssn and return to pool */
void StreamTcpSessionClear(void *ssnptr);
/* cleanup ssn, but don't free ssn */

Loading…
Cancel
Save