ftp: adds a config option ftp-hash for autofp-scheduler

This allows ftp-data and ftp flows to be processed by the same
thread. Otherwise, there may be a concurrency issue where the
would-be ftp-data flow is first processed, and thus not recognized
as such. And the ftp flow gets processed later and the expectation
coming from it is never found.

To do so, the flow hash gets used as usual, except for flows that
may be either ftp or ftp-data, that is either one port is 21, or
both ports are high ones.

Ticket: #5205
pull/8479/head
Philippe Antoine 4 years ago committed by Victor Julien
parent c1f615b8d2
commit e3105a6614

@ -46,3 +46,21 @@ useful during development.
For more information about the command line options concerning the
runmode, see :doc:`../command-line-options`.
Load balancing
~~~~~~~~~~~~~~
Suricata may use different ways to load balance the packets to process
between different threads with the configuration option `autofp-scheduler`.
The default value is `hash`, which means the packet is assigned to threads
using the 5-7 tuple hash, which is also used anyways to store the flows
in memory.
This option can also be set to
- `ippair` : packets are assigned to threads using addresses only.
- `ftp-hash` : same as `hash` except for flows that may be ftp or ftp-data
so that these flows get processed by the same thread. Like so, there is no
concurrency issue in recognizing ftp-data flows due to processing them
before the ftp flow got processed. In case of such a flow, a variant of the
hash is used.

@ -108,6 +108,61 @@ typedef struct FlowHashKey6_ {
};
} FlowHashKey6;
uint32_t FlowGetIpPairProtoHash(const Packet *p)
{
uint32_t hash = 0;
if (p->ip4h != NULL) {
FlowHashKey4 fhk;
int ai = (p->src.addr_data32[0] > p->dst.addr_data32[0]);
fhk.addrs[1 - ai] = p->src.addr_data32[0];
fhk.addrs[ai] = p->dst.addr_data32[0];
fhk.ports[0] = 0xfedc;
fhk.ports[1] = 0xba98;
fhk.proto = (uint16_t)p->proto;
fhk.recur = (uint16_t)p->recursion_level;
/* g_vlan_mask sets the vlan_ids to 0 if vlan.use-for-tracking
* is disabled. */
fhk.vlan_id[0] = p->vlan_id[0] & g_vlan_mask;
fhk.vlan_id[1] = p->vlan_id[1] & g_vlan_mask;
hash = hashword(fhk.u32, 5, flow_config.hash_rand);
} else if (p->ip6h != NULL) {
FlowHashKey6 fhk;
if (FlowHashRawAddressIPv6GtU32(p->src.addr_data32, p->dst.addr_data32)) {
fhk.src[0] = p->src.addr_data32[0];
fhk.src[1] = p->src.addr_data32[1];
fhk.src[2] = p->src.addr_data32[2];
fhk.src[3] = p->src.addr_data32[3];
fhk.dst[0] = p->dst.addr_data32[0];
fhk.dst[1] = p->dst.addr_data32[1];
fhk.dst[2] = p->dst.addr_data32[2];
fhk.dst[3] = p->dst.addr_data32[3];
} else {
fhk.src[0] = p->dst.addr_data32[0];
fhk.src[1] = p->dst.addr_data32[1];
fhk.src[2] = p->dst.addr_data32[2];
fhk.src[3] = p->dst.addr_data32[3];
fhk.dst[0] = p->src.addr_data32[0];
fhk.dst[1] = p->src.addr_data32[1];
fhk.dst[2] = p->src.addr_data32[2];
fhk.dst[3] = p->src.addr_data32[3];
}
fhk.ports[0] = 0xfedc;
fhk.ports[1] = 0xba98;
fhk.proto = (uint16_t)p->proto;
fhk.recur = (uint16_t)p->recursion_level;
fhk.vlan_id[0] = p->vlan_id[0] & g_vlan_mask;
fhk.vlan_id[1] = p->vlan_id[1] & g_vlan_mask;
hash = hashword(fhk.u32, 11, flow_config.hash_rand);
}
return hash;
}
/* calculate the hash key for this packet
*
* we're using:

@ -85,6 +85,7 @@ Flow *FlowGetFromFlowKey(FlowKey *key, struct timespec *ttime, const uint32_t ha
Flow *FlowGetExistingFlowFromHash(FlowKey * key, uint32_t hash);
Flow *FlowGetExistingFlowFromFlowId(int64_t flow_id);
uint32_t FlowKeyGetHash(FlowKey *flow_key);
uint32_t FlowGetIpPairProtoHash(const Packet *p);
/** \note f->fb must be locked */
static inline void RemoveFromHash(Flow *f, Flow *prev_f)

@ -33,6 +33,7 @@
#include "threads.h"
#include "threadvars.h"
#include "tmqh-flow.h"
#include "flow-hash.h"
#include "tm-queuehandlers.h"
@ -42,6 +43,7 @@
Packet *TmqhInputFlow(ThreadVars *t);
void TmqhOutputFlowHash(ThreadVars *t, Packet *p);
void TmqhOutputFlowIPPair(ThreadVars *t, Packet *p);
static void TmqhOutputFlowFTPHash(ThreadVars *t, Packet *p);
void *TmqhOutputFlowSetupCtx(const char *queue_str);
void TmqhOutputFlowFreeCtx(void *ctx);
void TmqhFlowRegisterTests(void);
@ -66,6 +68,8 @@ void TmqhFlowRegister(void)
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
} else if (strcasecmp(scheduler, "ippair") == 0) {
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowIPPair;
} else if (strcasecmp(scheduler, "ftp-hash") == 0) {
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowFTPHash;
} else {
SCLogError("Invalid entry \"%s\" "
"for autofp-scheduler in conf. Killing engine.",
@ -87,6 +91,7 @@ void TmqhFlowPrintAutofpHandler(void)
PRINT_IF_FUNC(TmqhOutputFlowHash, "Hash");
PRINT_IF_FUNC(TmqhOutputFlowIPPair, "IPPair");
PRINT_IF_FUNC(TmqhOutputFlowFTPHash, "FTPHash");
#undef PRINT_IF_FUNC
}
@ -272,6 +277,34 @@ void TmqhOutputFlowIPPair(ThreadVars *tv, Packet *p)
return;
}
static void TmqhOutputFlowFTPHash(ThreadVars *tv, Packet *p)
{
uint32_t qid;
TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
if (p->flags & PKT_WANTS_FLOW) {
uint32_t hash = p->flow_hash;
if (p->tcph != NULL && ((p->sp >= 1024 && p->dp >= 1024) || p->dp == 21 || p->sp == 21 ||
p->dp == 20 || p->sp == 20)) {
hash = FlowGetIpPairProtoHash(p);
}
qid = hash % ctx->size;
} else {
qid = ctx->last++;
if (ctx->last == ctx->size)
ctx->last = 0;
}
PacketQueue *q = ctx->queues[qid].q;
SCMutexLock(&q->mutex_q);
PacketEnqueue(q, p);
SCCondSignal(&q->cond_q);
SCMutexUnlock(&q->mutex_q);
return;
}
#ifdef UNITTESTS
static int TmqhOutputFlowSetupCtxTest01(void)

@ -1235,6 +1235,8 @@ host-mode: auto
#
# hash - Flow assigned to threads using the 5-7 tuple hash.
# ippair - Flow assigned to threads using addresses only.
# ftp-hash - Flow assigned to threads using the hash, except for FTP, so that
# ftp-data flows will be handled by the same thread
#
#autofp-scheduler: hash

Loading…
Cancel
Save