From 17b086ebc255066c3ae8223efa3f68b68e34a714 Mon Sep 17 00:00:00 2001 From: Kalin Krustev Date: Thu, 16 May 2024 19:51:51 +0000 Subject: [PATCH] fix: optimize for HighLevelProducer --- package-lock.json | 8 ++++---- package.json | 2 +- src/handlers/positions/handlerBatch.js | 12 ++++++------ 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/package-lock.json b/package-lock.json index 873ae6ff1..da33cc772 100644 --- a/package-lock.json +++ b/package-lock.json @@ -20,7 +20,7 @@ "@mojaloop/central-services-logger": "11.3.1", "@mojaloop/central-services-metrics": "12.0.8", "@mojaloop/central-services-shared": "18.3.5", - "@mojaloop/central-services-stream": "11.2.5", + "@mojaloop/central-services-stream": "11.2.6", "@mojaloop/database-lib": "11.0.5", "@mojaloop/event-sdk": "14.0.3-snapshot.3", "@mojaloop/ml-number": "11.2.4", @@ -1713,9 +1713,9 @@ "integrity": "sha512-/c6rf4UJlmHlC9b5BaNvzAcFv7HZ2QHaV0D4/HNlBdvFnvQq8RI4kYdhyPCl7Xj+oWvTWQ8ujhqS53LIgAe6KQ==" }, "node_modules/@mojaloop/central-services-stream": { - "version": "11.2.5", - "resolved": "https://registry.npmjs.org/@mojaloop/central-services-stream/-/central-services-stream-11.2.5.tgz", - "integrity": "sha512-7OfOvXBtBOE2zBLhkIv5gR4BN72sdVEWDyit9uT01pu/1KjNstn3nopErBhjTo2ANgdB4Jx74UMhLlokwl24IQ==", + "version": "11.2.6", + "resolved": "https://registry.npmjs.org/@mojaloop/central-services-stream/-/central-services-stream-11.2.6.tgz", + "integrity": "sha512-U94lMqIIEqIjPACimOGzT9I98e7zP8oM2spbHznbc5kUDePjsookXi0xQ4H89OECEr4MoKwykDSTAuxUVtczjg==", "dependencies": { "async": "3.2.5", "async-exit-hook": "2.0.1", diff --git a/package.json b/package.json index 4973bab26..35bb99073 100644 --- a/package.json +++ b/package.json @@ -102,7 +102,7 @@ "@mojaloop/central-services-logger": "11.3.1", "@mojaloop/central-services-metrics": "12.0.8", "@mojaloop/central-services-shared": "18.3.5", - "@mojaloop/central-services-stream": "11.2.5", + "@mojaloop/central-services-stream": "11.2.6", "@mojaloop/event-sdk": "14.0.3-snapshot.3", "@mojaloop/ml-number": "11.2.4", "@mojaloop/object-store-lib": "12.0.3", diff --git a/src/handlers/positions/handlerBatch.js b/src/handlers/positions/handlerBatch.js index cc706b3ca..04ed30456 100644 --- a/src/handlers/positions/handlerBatch.js +++ b/src/handlers/positions/handlerBatch.js @@ -89,7 +89,7 @@ const positions = async (error, messages) => { // Iterate through consumedMessages const bins = {} const lastPerPartition = {} - for (const message of consumedMessages) { + await Promise.all(consumedMessages.map(message => { const histTimerMsgEnd = Metrics.getHistogram( 'transfer_position', 'Process a prepare transfer message', @@ -126,8 +126,8 @@ const positions = async (error, messages) => { lastPerPartition[message.partition] = message } - await span.audit(message, EventSdk.AuditEventAction.start) - } + return span.audit(message, EventSdk.AuditEventAction.start) + })) // Start DB Transaction const trx = await BatchPositionModel.startDbTransaction() @@ -148,12 +148,12 @@ const positions = async (error, messages) => { await trx.commit() // Loop through results and produce notification messages and audit messages - for (const item of result.notifyMessages) { + await Promise.all(result.notifyMessages.map(item => { // Produce notification message and audit message const action = item.binItem.message?.value.metadata.event.action const eventStatus = item?.message.metadata.event.state.status === Enum.Events.EventStatus.SUCCESS.status ? Enum.Events.EventStatus.SUCCESS : Enum.Events.EventStatus.FAILURE - await Kafka.produceGeneralMessage(Config.KAFKA_CONFIG, Producer, Enum.Events.Event.Type.NOTIFICATION, action, item.message, eventStatus, null, item.binItem.span) - } + return Kafka.produceGeneralMessage(Config.KAFKA_CONFIG, Producer, Enum.Events.Event.Type.NOTIFICATION, action, item.message, eventStatus, null, item.binItem.span) + })) histTimerEnd({ success: true }) } catch (err) { // If Bin Processor returns failure