Allocate mPIPE packet ingress queue in each worker thread.

Move the allocation of the mPipe ingress queue from a loop over
the number of workers in the main init function to being done inside
each worker thread. This allows allocating the memory locally on the
worker's CPU without needing to figure out ahead of time where that thread
will be running. This fixes one case of static mapping of workers to CPUs.

Use __thread to hold the queue rather than a global tables of queues.
pull/635/merge
Ken Steele 11 years ago committed by Victor Julien
parent 601c7c8e3c
commit de6cbb01c8

@ -129,6 +129,7 @@ void ReceiveMpipeThreadExitStats(ThreadVars *, void *);
TmEcode DecodeMpipeThreadInit(ThreadVars *, void *, void **);
TmEcode DecodeMpipe(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
static int MpipeReceiveOpenIqueue(int rank);
#define MAX_CHANNELS 32 /* can probably find this in the MDE */
@ -140,8 +141,11 @@ TmEcode DecodeMpipe(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *
static gxio_mpipe_context_t context_body;
static gxio_mpipe_context_t* context = &context_body;
/* The ingress queues (one per worker) */
static gxio_mpipe_iqueue_t** iqueues;
/* First allocated Notification ring for iQueues. */
static int first_notif_ring;
/* The ingress queue for this worker thread */
static __thread gxio_mpipe_iqueue_t* thread_iqueue;
/* The egress queues (one per port) */
static gxio_mpipe_equeue_t equeue[MAX_CHANNELS];
@ -196,11 +200,13 @@ void TmModuleDecodeMpipeRegister (void)
/* Release Packet without sending. */
void MpipeReleasePacket(Packet *p)
{
gxio_mpipe_iqueue_t* iqueue = iqueues[p->mpipe_v.rank];
/* Use this thread's context to free the packet. */
// TODO: Check for dual mPipes.
gxio_mpipe_iqueue_t* iqueue = thread_iqueue;
int bucket = p->mpipe_v.idesc.bucket_id;
gxio_mpipe_credit(iqueue->context, iqueue->ring, bucket, 1);
gxio_mpipe_push_buffer(context,
gxio_mpipe_push_buffer(iqueue->context,
p->mpipe_v.idesc.stack_idx,
(void*)(intptr_t)p->mpipe_v.idesc.va);
}
@ -208,7 +214,7 @@ void MpipeReleasePacket(Packet *p)
/* Unconditionally send packet, then release packet buffer. */
void MpipeReleasePacketCopyTap(Packet *p)
{
gxio_mpipe_iqueue_t* iqueue = iqueues[p->mpipe_v.rank];
gxio_mpipe_iqueue_t* iqueue = thread_iqueue;
int bucket = p->mpipe_v.idesc.bucket_id;
gxio_mpipe_credit(iqueue->context, iqueue->ring, bucket, 1);
gxio_mpipe_edesc_t edesc;
@ -340,7 +346,9 @@ TmEcode ReceiveMpipeLoop(ThreadVars *tv, void *data, void *slot)
}
}
gxio_mpipe_iqueue_t* iqueue = iqueues[rank];
/* Open Ingress Queue for this worker thread. */
MpipeReceiveOpenIqueue(rank);
gxio_mpipe_iqueue_t* iqueue = thread_iqueue;
for (;;) {
if (suricata_ctl_flags != 0) {
@ -699,6 +707,45 @@ static int ReceiveMpipeRegisterRules(int bucket, int num_buckets)
return gxio_mpipe_rules_commit(&rules);
}
/* \brief Initialize mPIPE ingress ring
*
* \param name of interface to open
* \param Array of port configuations
*
* \return Output port channel number, or -1 on error
*/
static int MpipeReceiveOpenIqueue(int rank)
{
/* Init the NotifRings. */
const size_t notif_ring_entries = 2048;
size_t notif_ring_size = notif_ring_entries * sizeof(gxio_mpipe_idesc_t);
tmc_alloc_t alloc = TMC_ALLOC_INIT;
/* Allocate the memory locally on this thread's CPU. */
tmc_alloc_set_home(&alloc, TMC_ALLOC_HOME_TASK);
/* Allocate all the memory on one page. Which is required for the
notif ring, not the iqueue. */
if (notif_ring_size > (size_t)getpagesize())
tmc_alloc_set_huge(&alloc);
int needed = notif_ring_size + sizeof(gxio_mpipe_iqueue_t);
// TODO - Save the rest of the Huge Page for other allocations.
void *iqueue_mem = tmc_alloc_map(&alloc, needed);
if (iqueue_mem == NULL) {
SCLogError(SC_ERR_FATAL, "Failed to allocate memory for mPIPE iQueue");
return TM_ECODE_FAILED;
}
thread_iqueue = iqueue_mem + notif_ring_size;
int result = gxio_mpipe_iqueue_init(thread_iqueue, context, first_notif_ring + rank,
iqueue_mem, notif_ring_size, 0);
if (result < 0) {
VERIFY(result, "gxio_mpipe_iqueue_init()");
}
return TM_ECODE_OK;
}
/* \brief Initialize on MPIPE egress port
*
* Initialize one mPIPE egress port for use in IPS mode.
@ -862,42 +909,16 @@ TmEcode ReceiveMpipeThreadInit(ThreadVars *tv, void *initdata, void **data)
result = gxio_mpipe_link_open(&link, context, link_name, 0);
VERIFY(result, "gxio_mpipe_link_open()");
}
/* Allocate some ingress queues. */
iqueues = SCCalloc(num_workers, sizeof(*iqueues));
if (unlikely(iqueues == NULL))
SCReturnInt(TM_ECODE_FAILED);
/* Allocate some NotifRings. */
result = gxio_mpipe_alloc_notif_rings(context,
num_workers,
0, 0);
VERIFY(result, "gxio_mpipe_alloc_notif_rings()");
int ring = result;
/* Init the NotifRings. */
size_t notif_ring_entries = 2048;
size_t notif_ring_size = notif_ring_entries * sizeof(gxio_mpipe_idesc_t);
for (int i = 0; i < num_workers; i++) {
tmc_alloc_t alloc = TMC_ALLOC_INIT;
tmc_alloc_set_home(&alloc, 1 + i); // FIXME: static worker to Core mapping
if (notif_ring_size > (size_t)getpagesize())
tmc_alloc_set_huge(&alloc);
int needed = notif_ring_size + sizeof(gxio_mpipe_iqueue_t);
void *iqueue_mem = tmc_alloc_map(&alloc, needed);
if (iqueue_mem == NULL)
SCReturnInt(TM_ECODE_FAILED);
gxio_mpipe_iqueue_t *iqueue = iqueue_mem + notif_ring_size;
result = gxio_mpipe_iqueue_init(iqueue, context, ring + i,
iqueue_mem, notif_ring_size, 0);
VERIFY(result, "gxio_mpipe_iqueue_init()");
iqueues[i] = iqueue;
}
first_notif_ring = result;
int first_bucket = 0;
int rc;
rc = ReceiveMpipeCreateBuckets(ring, num_workers,
rc = ReceiveMpipeCreateBuckets(first_notif_ring, num_workers,
&first_bucket, &num_buckets);
if (rc != TM_ECODE_OK)
SCReturnInt(rc);

Loading…
Cancel
Save