diff --git a/src/source-mpipe.c b/src/source-mpipe.c index 969d0ea945..26244f3a86 100644 --- a/src/source-mpipe.c +++ b/src/source-mpipe.c @@ -129,6 +129,7 @@ void ReceiveMpipeThreadExitStats(ThreadVars *, void *); TmEcode DecodeMpipeThreadInit(ThreadVars *, void *, void **); TmEcode DecodeMpipe(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); +static int MpipeReceiveOpenIqueue(int rank); #define MAX_CHANNELS 32 /* can probably find this in the MDE */ @@ -140,8 +141,11 @@ TmEcode DecodeMpipe(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue * static gxio_mpipe_context_t context_body; static gxio_mpipe_context_t* context = &context_body; -/* The ingress queues (one per worker) */ -static gxio_mpipe_iqueue_t** iqueues; +/* First allocated Notification ring for iQueues. */ +static int first_notif_ring; + +/* The ingress queue for this worker thread */ +static __thread gxio_mpipe_iqueue_t* thread_iqueue; /* The egress queues (one per port) */ static gxio_mpipe_equeue_t equeue[MAX_CHANNELS]; @@ -196,11 +200,13 @@ void TmModuleDecodeMpipeRegister (void) /* Release Packet without sending. */ void MpipeReleasePacket(Packet *p) { - gxio_mpipe_iqueue_t* iqueue = iqueues[p->mpipe_v.rank]; + /* Use this thread's context to free the packet. */ + // TODO: Check for dual mPipes. + gxio_mpipe_iqueue_t* iqueue = thread_iqueue; int bucket = p->mpipe_v.idesc.bucket_id; gxio_mpipe_credit(iqueue->context, iqueue->ring, bucket, 1); - gxio_mpipe_push_buffer(context, + gxio_mpipe_push_buffer(iqueue->context, p->mpipe_v.idesc.stack_idx, (void*)(intptr_t)p->mpipe_v.idesc.va); } @@ -208,7 +214,7 @@ void MpipeReleasePacket(Packet *p) /* Unconditionally send packet, then release packet buffer. */ void MpipeReleasePacketCopyTap(Packet *p) { - gxio_mpipe_iqueue_t* iqueue = iqueues[p->mpipe_v.rank]; + gxio_mpipe_iqueue_t* iqueue = thread_iqueue; int bucket = p->mpipe_v.idesc.bucket_id; gxio_mpipe_credit(iqueue->context, iqueue->ring, bucket, 1); gxio_mpipe_edesc_t edesc; @@ -340,7 +346,9 @@ TmEcode ReceiveMpipeLoop(ThreadVars *tv, void *data, void *slot) } } - gxio_mpipe_iqueue_t* iqueue = iqueues[rank]; + /* Open Ingress Queue for this worker thread. */ + MpipeReceiveOpenIqueue(rank); + gxio_mpipe_iqueue_t* iqueue = thread_iqueue; for (;;) { if (suricata_ctl_flags != 0) { @@ -699,6 +707,45 @@ static int ReceiveMpipeRegisterRules(int bucket, int num_buckets) return gxio_mpipe_rules_commit(&rules); } +/* \brief Initialize mPIPE ingress ring + * + * \param name of interface to open + * \param Array of port configuations + * + * \return Output port channel number, or -1 on error + */ +static int MpipeReceiveOpenIqueue(int rank) +{ + /* Init the NotifRings. */ + const size_t notif_ring_entries = 2048; + + size_t notif_ring_size = notif_ring_entries * sizeof(gxio_mpipe_idesc_t); + + tmc_alloc_t alloc = TMC_ALLOC_INIT; + /* Allocate the memory locally on this thread's CPU. */ + tmc_alloc_set_home(&alloc, TMC_ALLOC_HOME_TASK); + /* Allocate all the memory on one page. Which is required for the + notif ring, not the iqueue. */ + if (notif_ring_size > (size_t)getpagesize()) + tmc_alloc_set_huge(&alloc); + int needed = notif_ring_size + sizeof(gxio_mpipe_iqueue_t); + // TODO - Save the rest of the Huge Page for other allocations. + void *iqueue_mem = tmc_alloc_map(&alloc, needed); + if (iqueue_mem == NULL) { + SCLogError(SC_ERR_FATAL, "Failed to allocate memory for mPIPE iQueue"); + return TM_ECODE_FAILED; + } + + thread_iqueue = iqueue_mem + notif_ring_size; + int result = gxio_mpipe_iqueue_init(thread_iqueue, context, first_notif_ring + rank, + iqueue_mem, notif_ring_size, 0); + if (result < 0) { + VERIFY(result, "gxio_mpipe_iqueue_init()"); + } + + return TM_ECODE_OK; +} + /* \brief Initialize on MPIPE egress port * * Initialize one mPIPE egress port for use in IPS mode. @@ -862,42 +909,16 @@ TmEcode ReceiveMpipeThreadInit(ThreadVars *tv, void *initdata, void **data) result = gxio_mpipe_link_open(&link, context, link_name, 0); VERIFY(result, "gxio_mpipe_link_open()"); } - - /* Allocate some ingress queues. */ - iqueues = SCCalloc(num_workers, sizeof(*iqueues)); - if (unlikely(iqueues == NULL)) - SCReturnInt(TM_ECODE_FAILED); - /* Allocate some NotifRings. */ result = gxio_mpipe_alloc_notif_rings(context, num_workers, 0, 0); VERIFY(result, "gxio_mpipe_alloc_notif_rings()"); - int ring = result; - - /* Init the NotifRings. */ - size_t notif_ring_entries = 2048; - size_t notif_ring_size = notif_ring_entries * sizeof(gxio_mpipe_idesc_t); - for (int i = 0; i < num_workers; i++) { - tmc_alloc_t alloc = TMC_ALLOC_INIT; - tmc_alloc_set_home(&alloc, 1 + i); // FIXME: static worker to Core mapping - if (notif_ring_size > (size_t)getpagesize()) - tmc_alloc_set_huge(&alloc); - int needed = notif_ring_size + sizeof(gxio_mpipe_iqueue_t); - void *iqueue_mem = tmc_alloc_map(&alloc, needed); - if (iqueue_mem == NULL) - SCReturnInt(TM_ECODE_FAILED); - - gxio_mpipe_iqueue_t *iqueue = iqueue_mem + notif_ring_size; - result = gxio_mpipe_iqueue_init(iqueue, context, ring + i, - iqueue_mem, notif_ring_size, 0); - VERIFY(result, "gxio_mpipe_iqueue_init()"); - iqueues[i] = iqueue; - } + first_notif_ring = result; int first_bucket = 0; int rc; - rc = ReceiveMpipeCreateBuckets(ring, num_workers, + rc = ReceiveMpipeCreateBuckets(first_notif_ring, num_workers, &first_bucket, &num_buckets); if (rc != TM_ECODE_OK) SCReturnInt(rc);