From de6cbb01c85e7b3837f38f78367cfddc17cf8fd7 Mon Sep 17 00:00:00 2001 From: Ken Steele Date: Tue, 12 Nov 2013 15:47:56 -0500 Subject: [PATCH] Allocate mPIPE packet ingress queue in each worker thread. Move the allocation of the mPipe ingress queue from a loop over the number of workers in the main init function to being done inside each worker thread. This allows allocating the memory locally on the worker's CPU without needing to figure out ahead of time where that thread will be running. This fixes one case of static mapping of workers to CPUs. Use __thread to hold the queue rather than a global tables of queues. --- src/source-mpipe.c | 89 ++++++++++++++++++++++++++++------------------ 1 file changed, 55 insertions(+), 34 deletions(-) diff --git a/src/source-mpipe.c b/src/source-mpipe.c index 969d0ea945..26244f3a86 100644 --- a/src/source-mpipe.c +++ b/src/source-mpipe.c @@ -129,6 +129,7 @@ void ReceiveMpipeThreadExitStats(ThreadVars *, void *); TmEcode DecodeMpipeThreadInit(ThreadVars *, void *, void **); TmEcode DecodeMpipe(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); +static int MpipeReceiveOpenIqueue(int rank); #define MAX_CHANNELS 32 /* can probably find this in the MDE */ @@ -140,8 +141,11 @@ TmEcode DecodeMpipe(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue * static gxio_mpipe_context_t context_body; static gxio_mpipe_context_t* context = &context_body; -/* The ingress queues (one per worker) */ -static gxio_mpipe_iqueue_t** iqueues; +/* First allocated Notification ring for iQueues. */ +static int first_notif_ring; + +/* The ingress queue for this worker thread */ +static __thread gxio_mpipe_iqueue_t* thread_iqueue; /* The egress queues (one per port) */ static gxio_mpipe_equeue_t equeue[MAX_CHANNELS]; @@ -196,11 +200,13 @@ void TmModuleDecodeMpipeRegister (void) /* Release Packet without sending. */ void MpipeReleasePacket(Packet *p) { - gxio_mpipe_iqueue_t* iqueue = iqueues[p->mpipe_v.rank]; + /* Use this thread's context to free the packet. */ + // TODO: Check for dual mPipes. + gxio_mpipe_iqueue_t* iqueue = thread_iqueue; int bucket = p->mpipe_v.idesc.bucket_id; gxio_mpipe_credit(iqueue->context, iqueue->ring, bucket, 1); - gxio_mpipe_push_buffer(context, + gxio_mpipe_push_buffer(iqueue->context, p->mpipe_v.idesc.stack_idx, (void*)(intptr_t)p->mpipe_v.idesc.va); } @@ -208,7 +214,7 @@ void MpipeReleasePacket(Packet *p) /* Unconditionally send packet, then release packet buffer. */ void MpipeReleasePacketCopyTap(Packet *p) { - gxio_mpipe_iqueue_t* iqueue = iqueues[p->mpipe_v.rank]; + gxio_mpipe_iqueue_t* iqueue = thread_iqueue; int bucket = p->mpipe_v.idesc.bucket_id; gxio_mpipe_credit(iqueue->context, iqueue->ring, bucket, 1); gxio_mpipe_edesc_t edesc; @@ -340,7 +346,9 @@ TmEcode ReceiveMpipeLoop(ThreadVars *tv, void *data, void *slot) } } - gxio_mpipe_iqueue_t* iqueue = iqueues[rank]; + /* Open Ingress Queue for this worker thread. */ + MpipeReceiveOpenIqueue(rank); + gxio_mpipe_iqueue_t* iqueue = thread_iqueue; for (;;) { if (suricata_ctl_flags != 0) { @@ -699,6 +707,45 @@ static int ReceiveMpipeRegisterRules(int bucket, int num_buckets) return gxio_mpipe_rules_commit(&rules); } +/* \brief Initialize mPIPE ingress ring + * + * \param name of interface to open + * \param Array of port configuations + * + * \return Output port channel number, or -1 on error + */ +static int MpipeReceiveOpenIqueue(int rank) +{ + /* Init the NotifRings. */ + const size_t notif_ring_entries = 2048; + + size_t notif_ring_size = notif_ring_entries * sizeof(gxio_mpipe_idesc_t); + + tmc_alloc_t alloc = TMC_ALLOC_INIT; + /* Allocate the memory locally on this thread's CPU. */ + tmc_alloc_set_home(&alloc, TMC_ALLOC_HOME_TASK); + /* Allocate all the memory on one page. Which is required for the + notif ring, not the iqueue. */ + if (notif_ring_size > (size_t)getpagesize()) + tmc_alloc_set_huge(&alloc); + int needed = notif_ring_size + sizeof(gxio_mpipe_iqueue_t); + // TODO - Save the rest of the Huge Page for other allocations. + void *iqueue_mem = tmc_alloc_map(&alloc, needed); + if (iqueue_mem == NULL) { + SCLogError(SC_ERR_FATAL, "Failed to allocate memory for mPIPE iQueue"); + return TM_ECODE_FAILED; + } + + thread_iqueue = iqueue_mem + notif_ring_size; + int result = gxio_mpipe_iqueue_init(thread_iqueue, context, first_notif_ring + rank, + iqueue_mem, notif_ring_size, 0); + if (result < 0) { + VERIFY(result, "gxio_mpipe_iqueue_init()"); + } + + return TM_ECODE_OK; +} + /* \brief Initialize on MPIPE egress port * * Initialize one mPIPE egress port for use in IPS mode. @@ -862,42 +909,16 @@ TmEcode ReceiveMpipeThreadInit(ThreadVars *tv, void *initdata, void **data) result = gxio_mpipe_link_open(&link, context, link_name, 0); VERIFY(result, "gxio_mpipe_link_open()"); } - - /* Allocate some ingress queues. */ - iqueues = SCCalloc(num_workers, sizeof(*iqueues)); - if (unlikely(iqueues == NULL)) - SCReturnInt(TM_ECODE_FAILED); - /* Allocate some NotifRings. */ result = gxio_mpipe_alloc_notif_rings(context, num_workers, 0, 0); VERIFY(result, "gxio_mpipe_alloc_notif_rings()"); - int ring = result; - - /* Init the NotifRings. */ - size_t notif_ring_entries = 2048; - size_t notif_ring_size = notif_ring_entries * sizeof(gxio_mpipe_idesc_t); - for (int i = 0; i < num_workers; i++) { - tmc_alloc_t alloc = TMC_ALLOC_INIT; - tmc_alloc_set_home(&alloc, 1 + i); // FIXME: static worker to Core mapping - if (notif_ring_size > (size_t)getpagesize()) - tmc_alloc_set_huge(&alloc); - int needed = notif_ring_size + sizeof(gxio_mpipe_iqueue_t); - void *iqueue_mem = tmc_alloc_map(&alloc, needed); - if (iqueue_mem == NULL) - SCReturnInt(TM_ECODE_FAILED); - - gxio_mpipe_iqueue_t *iqueue = iqueue_mem + notif_ring_size; - result = gxio_mpipe_iqueue_init(iqueue, context, ring + i, - iqueue_mem, notif_ring_size, 0); - VERIFY(result, "gxio_mpipe_iqueue_init()"); - iqueues[i] = iqueue; - } + first_notif_ring = result; int first_bucket = 0; int rc; - rc = ReceiveMpipeCreateBuckets(ring, num_workers, + rc = ReceiveMpipeCreateBuckets(first_notif_ring, num_workers, &first_bucket, &num_buckets); if (rc != TM_ECODE_OK) SCReturnInt(rc);