Stream: use per thread ssn pool

Use per thread pools to store and retrieve SSN's from. Uses PoolThread
API.

Remove max-sessions setting. Pools are set to unlimited, but TCP memcap
limits the amount of sessions.

The prealloc_session settings now applies to each thread, so lowered the
default from 32k to 2k.
pull/414/head
Victor Julien 12 years ago
parent b6af6cb241
commit aa449d51ca

@ -25,6 +25,8 @@
#define __STREAM_TCP_PRIVATE_H__ #define __STREAM_TCP_PRIVATE_H__
#include "decode.h" #include "decode.h"
#include "util-pool.h"
#include "util-pool-thread.h"
#define STREAMTCP_QUEUE_FLAG_TS 0x01 #define STREAMTCP_QUEUE_FLAG_TS 0x01
#define STREAMTCP_QUEUE_FLAG_WS 0x02 #define STREAMTCP_QUEUE_FLAG_WS 0x02
@ -200,6 +202,7 @@ enum
} }
typedef struct TcpSession_ { typedef struct TcpSession_ {
PoolThreadReserved res;
uint8_t state; uint8_t state;
uint8_t queue_len; /**< length of queue list below */ uint8_t queue_len; /**< length of queue list below */
uint16_t flags; uint16_t flags;

@ -44,6 +44,7 @@
#include "tm-threads.h" #include "tm-threads.h"
#include "util-pool.h" #include "util-pool.h"
#include "util-pool-thread.h"
#include "util-checksum.h" #include "util-checksum.h"
#include "util-unittest.h" #include "util-unittest.h"
#include "util-print.h" #include "util-print.h"
@ -71,8 +72,7 @@
//#define DEBUG //#define DEBUG
#define STREAMTCP_DEFAULT_SESSIONS 262144 #define STREAMTCP_DEFAULT_PREALLOC 2048
#define STREAMTCP_DEFAULT_PREALLOC 32768
#define STREAMTCP_DEFAULT_MEMCAP (32 * 1024 * 1024) /* 32mb */ #define STREAMTCP_DEFAULT_MEMCAP (32 * 1024 * 1024) /* 32mb */
#define STREAMTCP_DEFAULT_REASSEMBLY_MEMCAP (64 * 1024 * 1024) /* 64mb */ #define STREAMTCP_DEFAULT_REASSEMBLY_MEMCAP (64 * 1024 * 1024) /* 64mb */
#define STREAMTCP_DEFAULT_TOSERVER_CHUNK_SIZE 2560 #define STREAMTCP_DEFAULT_TOSERVER_CHUNK_SIZE 2560
@ -104,8 +104,8 @@ static int StreamTcpHandleTimestamp(TcpSession * , Packet *);
static int StreamTcpValidateRst(TcpSession * , Packet *); static int StreamTcpValidateRst(TcpSession * , Packet *);
static inline int StreamTcpValidateAck(TcpSession *ssn, TcpStream *, Packet *); static inline int StreamTcpValidateAck(TcpSession *ssn, TcpStream *, Packet *);
static Pool *ssn_pool = NULL; static PoolThread *ssn_pool = NULL;
static SCMutex ssn_pool_mutex; static SCMutex ssn_pool_mutex = PTHREAD_MUTEX_INITIALIZER; /**< init only, protect initializing and growing pool */
#ifdef DEBUG #ifdef DEBUG
static uint64_t ssn_pool_cnt = 0; /** counts ssns, protected by ssn_pool_mutex */ static uint64_t ssn_pool_cnt = 0; /** counts ssns, protected by ssn_pool_mutex */
#endif #endif
@ -210,12 +210,12 @@ void StreamTcpSessionClear(void *ssnptr)
ssn->toclient_smsg_head = NULL; ssn->toclient_smsg_head = NULL;
memset(ssn, 0, sizeof(TcpSession)); memset(ssn, 0, sizeof(TcpSession));
SCMutexLock(&ssn_pool_mutex); PoolThreadReturn(ssn_pool, ssn);
PoolReturn(ssn_pool, ssn);
#ifdef DEBUG #ifdef DEBUG
SCMutexLock(&ssn_pool_mutex);
ssn_pool_cnt--; ssn_pool_cnt--;
#endif
SCMutexUnlock(&ssn_pool_mutex); SCMutexUnlock(&ssn_pool_mutex);
#endif
SCReturn; SCReturn;
} }
@ -337,17 +337,10 @@ void StreamTcpInitConfig(char quiet)
memset(&stream_config, 0, sizeof(stream_config)); memset(&stream_config, 0, sizeof(stream_config));
/** set config defaults */
if ((ConfGetInt("stream.max-sessions", &value)) == 1) { if ((ConfGetInt("stream.max-sessions", &value)) == 1) {
stream_config.max_sessions = (uint32_t)value; SCLogWarning(SC_WARN_OPTION_OBSOLETE, "max-sessions is obsolete. "
} else { "Number of concurrent sessions is now only limited by Flow and "
if (RunmodeIsUnittests()) "TCP stream engine memcaps.");
stream_config.max_sessions = 1024;
else
stream_config.max_sessions = STREAMTCP_DEFAULT_SESSIONS;
}
if (!quiet) {
SCLogInfo("stream \"max-sessions\": %"PRIu32"", stream_config.max_sessions);
} }
if ((ConfGetInt("stream.prealloc-sessions", &value)) == 1) { if ((ConfGetInt("stream.prealloc-sessions", &value)) == 1) {
@ -359,7 +352,8 @@ void StreamTcpInitConfig(char quiet)
stream_config.prealloc_sessions = STREAMTCP_DEFAULT_PREALLOC; stream_config.prealloc_sessions = STREAMTCP_DEFAULT_PREALLOC;
} }
if (!quiet) { if (!quiet) {
SCLogInfo("stream \"prealloc-sessions\": %"PRIu32"", stream_config.prealloc_sessions); SCLogInfo("stream \"prealloc-sessions\": %"PRIu32" (per thread)",
stream_config.prealloc_sessions);
} }
char *temp_stream_memcap_str; char *temp_stream_memcap_str;
@ -566,21 +560,6 @@ void StreamTcpInitConfig(char quiet)
/* init the memcap/use tracking */ /* init the memcap/use tracking */
SC_ATOMIC_INIT(st_memuse); SC_ATOMIC_INIT(st_memuse);
SCMutexInit(&ssn_pool_mutex, NULL);
SCMutexLock(&ssn_pool_mutex);
ssn_pool = PoolInit(stream_config.max_sessions,
stream_config.prealloc_sessions,
sizeof(TcpSession),
StreamTcpSessionPoolAlloc,
StreamTcpSessionPoolInit, NULL,
StreamTcpSessionPoolCleanup, NULL);
if (ssn_pool == NULL) {
SCLogError(SC_ERR_POOL_INIT, "ssn_pool is not initialized");
SCMutexUnlock(&ssn_pool_mutex);
exit(EXIT_FAILURE);
}
SCMutexUnlock(&ssn_pool_mutex);
StreamTcpReassembleInit(quiet); StreamTcpReassembleInit(quiet);
/* set the default free function and flow state function /* set the default free function and flow state function
@ -595,7 +574,7 @@ void StreamTcpFreeConfig(char quiet)
SCMutexLock(&ssn_pool_mutex); SCMutexLock(&ssn_pool_mutex);
if (ssn_pool != NULL) { if (ssn_pool != NULL) {
PoolFree(ssn_pool); PoolThreadFree(ssn_pool);
ssn_pool = NULL; ssn_pool = NULL;
} }
SCMutexUnlock(&ssn_pool_mutex); SCMutexUnlock(&ssn_pool_mutex);
@ -611,18 +590,18 @@ void StreamTcpFreeConfig(char quiet)
* *
* \retval TcpSession A new TCP session with field initilaized to 0/NULL. * \retval TcpSession A new TCP session with field initilaized to 0/NULL.
*/ */
TcpSession *StreamTcpNewSession (Packet *p) TcpSession *StreamTcpNewSession (Packet *p, int id)
{ {
TcpSession *ssn = (TcpSession *)p->flow->protoctx; TcpSession *ssn = (TcpSession *)p->flow->protoctx;
if (ssn == NULL) { if (ssn == NULL) {
SCMutexLock(&ssn_pool_mutex); p->flow->protoctx = PoolThreadGetById(ssn_pool, id);
p->flow->protoctx = PoolGet(ssn_pool);
#ifdef DEBUG #ifdef DEBUG
SCMutexLock(&ssn_pool_mutex);
if (p->flow->protoctx != NULL) if (p->flow->protoctx != NULL)
ssn_pool_cnt++; ssn_pool_cnt++;
#endif
SCMutexUnlock(&ssn_pool_mutex); SCMutexUnlock(&ssn_pool_mutex);
#endif
ssn = (TcpSession *)p->flow->protoctx; ssn = (TcpSession *)p->flow->protoctx;
if (ssn == NULL) { if (ssn == NULL) {
@ -770,7 +749,7 @@ static int StreamTcpPacketStateNone(ThreadVars *tv, Packet *p,
return 0; return 0;
if (ssn == NULL) { if (ssn == NULL) {
ssn = StreamTcpNewSession(p); ssn = StreamTcpNewSession(p, tv->id);
if (ssn == NULL) { if (ssn == NULL) {
SCPerfCounterIncr(stt->counter_tcp_ssn_memcap, tv->sc_perf_pca); SCPerfCounterIncr(stt->counter_tcp_ssn_memcap, tv->sc_perf_pca);
return -1; return -1;
@ -844,7 +823,7 @@ static int StreamTcpPacketStateNone(ThreadVars *tv, Packet *p,
} else if (p->tcph->th_flags & TH_SYN) { } else if (p->tcph->th_flags & TH_SYN) {
if (ssn == NULL) { if (ssn == NULL) {
ssn = StreamTcpNewSession(p); ssn = StreamTcpNewSession(p, tv->id);
if (ssn == NULL) { if (ssn == NULL) {
SCPerfCounterIncr(stt->counter_tcp_ssn_memcap, tv->sc_perf_pca); SCPerfCounterIncr(stt->counter_tcp_ssn_memcap, tv->sc_perf_pca);
return -1; return -1;
@ -897,7 +876,7 @@ static int StreamTcpPacketStateNone(ThreadVars *tv, Packet *p,
return 0; return 0;
if (ssn == NULL) { if (ssn == NULL) {
ssn = StreamTcpNewSession(p); ssn = StreamTcpNewSession(p, tv->id);
if (ssn == NULL) { if (ssn == NULL) {
SCPerfCounterIncr(stt->counter_tcp_ssn_memcap, tv->sc_perf_pca); SCPerfCounterIncr(stt->counter_tcp_ssn_memcap, tv->sc_perf_pca);
return -1; return -1;
@ -4521,6 +4500,34 @@ TmEcode StreamTcpThreadInit(ThreadVars *tv, void *initdata, void **data)
SCLogDebug("StreamTcp thread specific ctx online at %p, reassembly ctx %p", SCLogDebug("StreamTcp thread specific ctx online at %p, reassembly ctx %p",
stt, stt->ra_ctx); stt, stt->ra_ctx);
int r = 0;
SCMutexLock(&ssn_pool_mutex);
if (ssn_pool == NULL)
ssn_pool = PoolThreadInit(1, /* thread */
0, /* unlimited */
stream_config.prealloc_sessions,
sizeof(TcpSession),
StreamTcpSessionPoolAlloc,
StreamTcpSessionPoolInit, NULL,
StreamTcpSessionPoolCleanup, NULL);
else {
/* grow ssn_pool until we have a element for our thread id */
do {
r = PoolThreadGrow(ssn_pool,
0, /* unlimited */
stream_config.prealloc_sessions,
sizeof(TcpSession),
StreamTcpSessionPoolAlloc,
StreamTcpSessionPoolInit, NULL,
StreamTcpSessionPoolCleanup, NULL);
} while (r != -1 && r < tv->id);
SCLogDebug("pool size %d, thread %d", PoolThreadSize(ssn_pool), tv->id);
}
SCMutexUnlock(&ssn_pool_mutex);
if (r < 0 || ssn_pool == NULL)
SCReturnInt(TM_ECODE_FAILED);
SCReturnInt(TM_ECODE_OK); SCReturnInt(TM_ECODE_OK);
} }
@ -5384,7 +5391,7 @@ static int StreamTcpTest01 (void) {
StreamTcpInitConfig(TRUE); StreamTcpInitConfig(TRUE);
TcpSession *ssn = StreamTcpNewSession(p); TcpSession *ssn = StreamTcpNewSession(p, 0);
if (ssn == NULL) { if (ssn == NULL) {
printf("Session can not be allocated: "); printf("Session can not be allocated: ");
goto end; goto end;

@ -48,8 +48,7 @@ typedef struct TcpStreamCnf_ {
uint64_t memcap; uint64_t memcap;
uint64_t reassembly_memcap; /**< max memory usage for stream reassembly */ uint64_t reassembly_memcap; /**< max memory usage for stream reassembly */
uint32_t max_sessions; uint32_t prealloc_sessions; /**< ssns to prealloc per stream thread */
uint32_t prealloc_sessions;
int midstream; int midstream;
int async_oneside; int async_oneside;
uint32_t reassembly_depth; /**< Depth until when we reassemble the stream */ uint32_t reassembly_depth; /**< Depth until when we reassemble the stream */

@ -273,6 +273,7 @@ const char * SCErrorToString(SCError err)
CASE_CODE (SC_ERR_MAGIC_LOAD); CASE_CODE (SC_ERR_MAGIC_LOAD);
CASE_CODE (SC_ERR_CUDA_BUFFER_ERROR); CASE_CODE (SC_ERR_CUDA_BUFFER_ERROR);
CASE_CODE (SC_ERR_DNS_LOG_GENERIC); CASE_CODE (SC_ERR_DNS_LOG_GENERIC);
CASE_CODE (SC_WARN_OPTION_OBSOLETE);
} }
return "UNKNOWN_ERROR"; return "UNKNOWN_ERROR";

@ -262,6 +262,7 @@ typedef enum {
SC_WARN_UNCOMMON, SC_WARN_UNCOMMON,
SC_ERR_CUDA_BUFFER_ERROR, SC_ERR_CUDA_BUFFER_ERROR,
SC_ERR_DNS_LOG_GENERIC, SC_ERR_DNS_LOG_GENERIC,
SC_WARN_OPTION_OBSOLETE,
} SCError; } SCError;
const char *SCErrorToString(SCError); const char *SCErrorToString(SCError);

@ -587,8 +587,7 @@ flow-timeouts:
# # of checksum. You can control the handling of checksum # # of checksum. You can control the handling of checksum
# # on a per-interface basis via the 'checksum-checks' # # on a per-interface basis via the 'checksum-checks'
# # option # # option
# max-sessions: 262144 # 256k concurrent sessions # prealloc-sessions: 2k # 2k sessions prealloc'd per stream thread
# prealloc-sessions: 32768 # 32k sessions prealloc'd
# midstream: false # don't allow midstream session pickups # midstream: false # don't allow midstream session pickups
# async-oneside: false # don't enable async stream handling # async-oneside: false # don't enable async stream handling
# inline: no # stream inline mode # inline: no # stream inline mode

Loading…
Cancel
Save