From 0f5192cec527139dd0d1e7779e562b3ab4e8dbff Mon Sep 17 00:00:00 2001 From: vijayg10 <33152110+vijayg10@users.noreply.github.com> Date: Fri, 23 Aug 2024 11:52:31 +0530 Subject: [PATCH] fix: fx fulfil header validation (#1084) * fix: fx fulfil * chore(snapshot): 17.8.0-snapshot.10 * chore(snapshot): 17.8.0-snapshot.11 * chore(snapshot): 17.8.0-snapshot.12 * fix: fx fulfil proxy --- .ncurc.yaml | 1 - config/default.json | 1 + package-lock.json | 70 ++++---- package.json | 4 +- src/domain/position/abort.js | 14 +- src/handlers/transfers/FxFulfilService.js | 34 ++-- src/models/fxTransfer/fxTransfer.js | 2 + .../handlers/transfers/handlers.test.js | 156 +++++++++++++++++- .../transfers/FxFulfilService.test.js | 14 +- .../transfers/fxFulfilHandler.test.js | 34 +++- 10 files changed, 267 insertions(+), 63 deletions(-) diff --git a/.ncurc.yaml b/.ncurc.yaml index 0bac0b508..10735f580 100644 --- a/.ncurc.yaml +++ b/.ncurc.yaml @@ -12,5 +12,4 @@ reject: [ "sinon", # glob >= 11 requires node >= 20 "glob", - "@mojaloop/central-services-shared" ] diff --git a/config/default.json b/config/default.json index 2617b2006..ee4291ca3 100644 --- a/config/default.json +++ b/config/default.json @@ -102,6 +102,7 @@ "COMMIT": null, "BULK_COMMIT": null, "RESERVE": null, + "FX_RESERVE": "topic-transfer-position-batch", "TIMEOUT_RESERVED": null, "FX_TIMEOUT_RESERVED": "topic-transfer-position-batch", "ABORT": null, diff --git a/package-lock.json b/package-lock.json index a624f36a5..c5ba5f271 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@mojaloop/central-ledger", - "version": "17.8.0-snapshot.10", + "version": "17.8.0-snapshot.12", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@mojaloop/central-ledger", - "version": "17.8.0-snapshot.10", + "version": "17.8.0-snapshot.12", "license": "Apache-2.0", "dependencies": { "@hapi/basic": "7.0.2", @@ -20,7 +20,7 @@ "@mojaloop/central-services-health": "15.0.0", "@mojaloop/central-services-logger": "11.5.1", "@mojaloop/central-services-metrics": "12.0.8", - "@mojaloop/central-services-shared": "18.6.3", + "@mojaloop/central-services-shared": "18.7.2", "@mojaloop/central-services-stream": "11.3.1", "@mojaloop/database-lib": "11.0.6", "@mojaloop/event-sdk": "14.1.1", @@ -1623,28 +1623,29 @@ } }, "node_modules/@mojaloop/central-services-shared": { - "version": "18.6.3", - "resolved": "https://registry.npmjs.org/@mojaloop/central-services-shared/-/central-services-shared-18.6.3.tgz", - "integrity": "sha512-GTMNxBB4lhjrW7V52OmZvuWKKx7IywmyihAfmcmSJ1zCtb+yL1CzF/pM4slOx2d6taE9Pn+q3S2Ucf/ZV2QzuA==", + "version": "18.7.2", + "resolved": "https://registry.npmjs.org/@mojaloop/central-services-shared/-/central-services-shared-18.7.2.tgz", + "integrity": "sha512-LuvLkww6scSIYdz+cyo8tghpRgJavcOkCs/9sX4F9s6dunfHgnzzWO4dO45K26PBaQZuuax/KtDHmyOH/nrPfg==", "dependencies": { "@hapi/catbox": "12.1.1", "@hapi/catbox-memory": "5.0.1", - "@mojaloop/inter-scheme-proxy-cache-lib": "1.4.0", - "axios": "1.7.2", + "@mojaloop/inter-scheme-proxy-cache-lib": "2.2.0", + "axios": "1.7.4", "clone": "2.1.2", "dotenv": "16.4.5", "env-var": "7.5.0", "event-stream": "4.0.1", - "immutable": "4.3.6", + "fast-safe-stringify": "^2.1.1", + "immutable": "4.3.7", "lodash": "4.17.21", "mustache": "4.2.0", "openapi-backend": "5.10.6", - "raw-body": "2.5.2", + "raw-body": "3.0.0", "rc": "1.2.8", "shins": "2.6.0", "uuid4": "2.0.3", "widdershins": "^4.0.1", - "yaml": "2.4.5" + "yaml": "2.5.0" }, "peerDependencies": { "@mojaloop/central-services-error-handling": ">=13.x.x", @@ -1704,19 +1705,18 @@ "integrity": "sha512-jKtjLLDiH95b002sJVc5c74PE6KKYftuyVdVmsuYId5stTaWcRFqE+5ukZI4gDUKjGn8wv2C3zPn3/nyjEI7gg==", "deprecated": "This version has been deprecated and is no longer supported or maintained" }, - "node_modules/@mojaloop/central-services-shared/node_modules/@mojaloop/inter-scheme-proxy-cache-lib": { - "version": "1.4.0", - "resolved": "https://registry.npmjs.org/@mojaloop/inter-scheme-proxy-cache-lib/-/inter-scheme-proxy-cache-lib-1.4.0.tgz", - "integrity": "sha512-jmAWWdjZxjxlSQ+wt8aUcMYOneVo1GNbIIs7yK/R2K9DBtKb0aYle2mWwdjm9ovk6zSWL2a9lH+n3hq7kb08Wg==", + "node_modules/@mojaloop/central-services-shared/node_modules/raw-body": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/raw-body/-/raw-body-3.0.0.tgz", + "integrity": "sha512-RmkhL8CAyCRPXCE28MMH0z2PNWQBNk2Q09ZdxM9IOOXwxwZbN+qbWaatPkdkWIKL2ZVDImrN/pK5HTRz2PcS4g==", "dependencies": { - "@mojaloop/central-services-logger": "^11.3.1", - "ajv": "^8.16.0", - "convict": "^6.2.4", - "fast-safe-stringify": "^2.1.1", - "ioredis": "^5.4.1" + "bytes": "3.1.2", + "http-errors": "2.0.0", + "iconv-lite": "0.6.3", + "unpipe": "1.0.0" }, "engines": { - "node": ">=18.x" + "node": ">= 0.8" } }, "node_modules/@mojaloop/central-services-stream": { @@ -2654,9 +2654,9 @@ } }, "node_modules/axios": { - "version": "1.7.2", - "resolved": "https://registry.npmjs.org/axios/-/axios-1.7.2.tgz", - "integrity": "sha512-2A8QhOMrbomlDuiLeK9XibIBzuHeRcqqNOHp0Cyp5EoJ1IFDh+XZH3A6BkXtv0K4gFGCI0Y4BM7B1wOEi0Rmgw==", + "version": "1.7.4", + "resolved": "https://registry.npmjs.org/axios/-/axios-1.7.4.tgz", + "integrity": "sha512-DukmaFRnY6AzAALSH4J2M3k6PkaC+MfaAGdEERRWcC9q3/TWQwLpHR8ZRLKTdQ3aBDL64EdluRDjJqKw+BPZEw==", "dependencies": { "follow-redirects": "^1.15.6", "form-data": "^4.0.0", @@ -7187,8 +7187,6 @@ "version": "0.6.3", "resolved": "https://registry.npmjs.org/iconv-lite/-/iconv-lite-0.6.3.tgz", "integrity": "sha512-4fCk79wshMdzMp2rH06qWrJE4iolqLhCUH+OiuIgU++RB0+94NlDL81atO7GX55uUKueo0txHNtvEyI6D7WdMw==", - "optional": true, - "peer": true, "dependencies": { "safer-buffer": ">= 2.1.2 < 3.0.0" }, @@ -7238,9 +7236,9 @@ } }, "node_modules/immutable": { - "version": "4.3.6", - "resolved": "https://registry.npmjs.org/immutable/-/immutable-4.3.6.tgz", - "integrity": "sha512-Ju0+lEMyzMVZarkTn/gqRpdqd5dOPaz1mCZ0SH3JV6iFw81PldE/PEB1hWVEA288HPt4WXW8O7AWxB10M+03QQ==" + "version": "4.3.7", + "resolved": "https://registry.npmjs.org/immutable/-/immutable-4.3.7.tgz", + "integrity": "sha512-1hqclzwYwjRDFLjcFxOM5AYkkG0rpFPpr1RLPMEuGczoS7YA8gLhy8SWXYRAA/XwfEHpfo3cw5JGioS32fnMRw==" }, "node_modules/import-fresh": { "version": "3.3.0", @@ -9102,11 +9100,11 @@ } }, "node_modules/micromatch": { - "version": "4.0.7", - "resolved": "https://registry.npmjs.org/micromatch/-/micromatch-4.0.7.tgz", - "integrity": "sha512-LPP/3KorzCwBxfeUuZmaR6bG2kdeHSbe0P2tY3FLRU4vYrjYz5hI4QZwV0njUx3jeuKe67YukQ1LSPZBKDqO/Q==", + "version": "4.0.5", + "resolved": "https://registry.npmjs.org/micromatch/-/micromatch-4.0.5.tgz", + "integrity": "sha512-DMy+ERcEW2q8Z2Po+WNXuw3c5YaUSFjAO5GsJqfEl7UjvtIuFKO6ZrKvcItdy98dwFI2N1tg3zNIdKaQT+aNdA==", "dependencies": { - "braces": "^3.0.3", + "braces": "^3.0.2", "picomatch": "^2.3.1" }, "engines": { @@ -14852,9 +14850,9 @@ "integrity": "sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A==" }, "node_modules/yaml": { - "version": "2.4.5", - "resolved": "https://registry.npmjs.org/yaml/-/yaml-2.4.5.tgz", - "integrity": "sha512-aBx2bnqDzVOyNKfsysjA2ms5ZlnjSAW2eG3/L5G/CSujfjLJTJsEw1bGw8kCf04KodQWk1pxlGnZ56CRxiawmg==", + "version": "2.5.0", + "resolved": "https://registry.npmjs.org/yaml/-/yaml-2.5.0.tgz", + "integrity": "sha512-2wWLbGbYDiSqqIKoPjar3MPgB94ErzCtrNE1FdqGuaO0pi2JGjmE8aW8TDZwzU7vuxcGRdL/4gPQwQ7hD5AMSw==", "bin": { "yaml": "bin.mjs" }, diff --git a/package.json b/package.json index 3aee098db..6feef587c 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@mojaloop/central-ledger", - "version": "17.8.0-snapshot.10", + "version": "17.8.0-snapshot.12", "description": "Central ledger hosted by a scheme to record and settle transfers", "license": "Apache-2.0", "author": "ModusBox", @@ -92,7 +92,7 @@ "@mojaloop/central-services-health": "15.0.0", "@mojaloop/central-services-logger": "11.5.1", "@mojaloop/central-services-metrics": "12.0.8", - "@mojaloop/central-services-shared": "18.6.3", + "@mojaloop/central-services-shared": "18.7.2", "@mojaloop/central-services-stream": "11.3.1", "@mojaloop/database-lib": "11.0.6", "@mojaloop/event-sdk": "14.1.1", diff --git a/src/domain/position/abort.js b/src/domain/position/abort.js index bb1358485..3fe24f4c4 100644 --- a/src/domain/position/abort.js +++ b/src/domain/position/abort.js @@ -91,11 +91,11 @@ const processPositionAbortBin = async ( for (const positionChange of cyrilResult.positionChanges) { if (positionChange.isFxTransferStateChange) { // Construct notification message for fx transfer state change - const resultMessage = _constructAbortResultMessage(binItem, positionChange.commitRequestId, Config.HUB_NAME, positionChange.notifyTo, Enum.Events.Event.Action.FX_ABORT) + const resultMessage = _constructAbortResultMessage(binItem, positionChange.commitRequestId, Config.HUB_NAME, positionChange.notifyTo) resultMessages.push({ binItem, message: resultMessage }) } else { // Construct notification message for transfer state change - const resultMessage = _constructAbortResultMessage(binItem, positionChange.transferId, Config.HUB_NAME, positionChange.notifyTo, Enum.Events.Event.Action.ABORT) + const resultMessage = _constructAbortResultMessage(binItem, positionChange.transferId, Config.HUB_NAME, positionChange.notifyTo) resultMessages.push({ binItem, message: resultMessage }) } } @@ -125,9 +125,13 @@ const processPositionAbortBin = async ( } } -const _constructAbortResultMessage = (binItem, id, from, notifyTo, action) => { +const _constructAbortResultMessage = (binItem, id, from, notifyTo) => { + let apiErrorCode = ErrorHandler.Enums.FSPIOPErrorCodes.PAYEE_REJECTION + if (binItem.message?.value.metadata.event.action === Enum.Events.Event.Action.FX_ABORT_VALIDATION) { + apiErrorCode = ErrorHandler.Enums.FSPIOPErrorCodes.VALIDATION_ERROR + } const fspiopError = ErrorHandler.Factory.createFSPIOPError( - ErrorHandler.Enums.FSPIOPErrorCodes.PAYEE_REJECTION, // TODO: Need clarification on this + apiErrorCode, null, null, null, @@ -144,7 +148,7 @@ const _constructAbortResultMessage = (binItem, id, from, notifyTo, action) => { const metadata = Utility.StreamingProtocol.createMetadataWithCorrelatedEvent( id, Enum.Kafka.Topics.POSITION, - action, + binItem.message?.value.metadata.event.action, // This will be replaced anyway in Kafka.produceGeneralMessage function state ) const resultMessage = Utility.StreamingProtocol.createMessage( diff --git a/src/handlers/transfers/FxFulfilService.js b/src/handlers/transfers/FxFulfilService.js index a43fcad89..fb25c750f 100644 --- a/src/handlers/transfers/FxFulfilService.js +++ b/src/handlers/transfers/FxFulfilService.js @@ -52,7 +52,7 @@ class FxFulfilService { } async getFxTransferDetails(commitRequestId, functionality) { - const transfer = await this.FxTransferModel.fxTransfer.getAllDetailsByCommitRequestId(commitRequestId) + const transfer = await this.FxTransferModel.fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer(commitRequestId) if (!transfer) { const fspiopError = fspiopErrorFactory.fxTransferNotFound() @@ -79,10 +79,10 @@ class FxFulfilService { async validateHeaders({ transfer, headers, payload }) { let fspiopError = null - if (headers[SOURCE]?.toLowerCase() !== transfer.counterPartyFspName.toLowerCase()) { + if (!transfer.counterPartyFspIsProxy && (headers[SOURCE]?.toLowerCase() !== transfer.counterPartyFspName.toLowerCase())) { fspiopError = fspiopErrorFactory.fxHeaderSourceValidationError() } - if (headers[DESTINATION]?.toLowerCase() !== transfer.initiatingFspName.toLowerCase()) { + if (!transfer.initiatingFspIsProxy && (headers[DESTINATION]?.toLowerCase() !== transfer.initiatingFspName.toLowerCase())) { fspiopError = fspiopErrorFactory.fxHeaderDestinationValidationError() } @@ -97,16 +97,31 @@ class FxFulfilService { // Lets handle the abort validation and change the fxTransfer state to reflect this await this.FxTransferModel.fxTransfer.saveFxFulfilResponse(transfer.commitRequestId, payload, eventDetail.action, apiFSPIOPError) - // Publish message to FX Position Handler + await this._handleAbortValidation(transfer, apiFSPIOPError, eventDetail) + throw fspiopError + } + } + + async _handleAbortValidation(transfer, apiFSPIOPError, eventDetail) { + const cyrilResult = await this.cyril.processFxAbortMessage(transfer.commitRequestId) + + this.params.message.value.content.context = { + ...this.params.message.value.content.context, + cyrilResult + } + if (cyrilResult.positionChanges.length > 0) { + const participantCurrencyId = cyrilResult.positionChanges[0].participantCurrencyId await this.kafkaProceed({ consumerCommit, fspiopError: apiFSPIOPError, eventDetail, fromSwitch, toDestination: transfer.initiatingFspName, - // The message key doesn't matter here, as there are no position changes for FX Fulfil - messageKey: transfer.counterPartyFspSourceParticipantCurrencyId.toString() + messageKey: participantCurrencyId.toString(), + topicNameOverride: this.Config.KAFKA_CONFIG.EVENT_TYPE_ACTION_TOPIC_MAP?.POSITION?.FX_ABORT }) + } else { + const fspiopError = ErrorHandler.Factory.createInternalServerFSPIOPError('Invalid cyril result') throw fspiopError } } @@ -231,12 +246,7 @@ class FxFulfilService { this.log.warn('callbackErrorInvalidFulfilment', { eventDetail, apiFSPIOPError, transfer, payload }) await this.FxTransferModel.fxTransfer.saveFxFulfilResponse(transfer.commitRequestId, payload, eventDetail.action, apiFSPIOPError) - await this.kafkaProceed({ - consumerCommit, - fspiopError: apiFSPIOPError, - eventDetail, - messageKey: transfer.counterPartyFspTargetParticipantCurrencyId.toString() - }) + await this._handleAbortValidation(transfer, apiFSPIOPError, eventDetail) throw fspiopError } diff --git a/src/models/fxTransfer/fxTransfer.js b/src/models/fxTransfer/fxTransfer.js index 9d5502558..38f2b6cea 100644 --- a/src/models/fxTransfer/fxTransfer.js +++ b/src/models/fxTransfer/fxTransfer.js @@ -92,12 +92,14 @@ const getAllDetailsByCommitRequestId = async (commitRequestId) => { 'fxTransfer.*', 'da.participantId AS initiatingFspParticipantId', 'da.name AS initiatingFspName', + 'da.isProxy AS initiatingFspIsProxy', // 'pc21.participantCurrencyId AS counterPartyFspSourceParticipantCurrencyId', // 'pc22.participantCurrencyId AS counterPartyFspTargetParticipantCurrencyId', 'tp21.participantCurrencyId AS counterPartyFspSourceParticipantCurrencyId', 'tp22.participantCurrencyId AS counterPartyFspTargetParticipantCurrencyId', 'ca.participantId AS counterPartyFspParticipantId', 'ca.name AS counterPartyFspName', + 'ca.isProxy AS counterPartyFspIsProxy', 'tsc.fxTransferStateChangeId', 'tsc.transferStateId AS transferState', 'tsc.reason AS reason', diff --git a/test/integration-override/handlers/transfers/handlers.test.js b/test/integration-override/handlers/transfers/handlers.test.js index 805167ed6..5fa38b4e6 100644 --- a/test/integration-override/handlers/transfers/handlers.test.js +++ b/test/integration-override/handlers/transfers/handlers.test.js @@ -266,7 +266,12 @@ const prepareTestData = async (dataObj) => { const fxPrepareHeaders = { 'fspiop-source': payer.participant.name, 'fspiop-destination': fxp.participant.name, - 'content-type': 'application/vnd.interoperability.fxtransfers+json;version=2.0' + 'content-type': 'application/vnd.interoperability.fxTransfers+json;version=2.0' + } + const fxFulfilAbortRejectHeaders = { + 'fspiop-source': fxp.participant.name, + 'fspiop-destination': payer.participant.name, + 'content-type': 'application/vnd.interoperability.fxTransfers+json;version=2.0' } const fulfilAbortRejectHeaders = { 'fspiop-source': payee.participant.name, @@ -309,6 +314,12 @@ const prepareTestData = async (dataObj) => { expiration: dataObj.expiration } + const fxFulfilPayload = { + fulfilment: 'UNlJ98hZTY_dsw0cAqw4i_UN3v4utt7CZFB4yfLbVFA', + completedTimestamp: dataObj.now, + conversionState: 'RESERVED' + } + const rejectPayload = Object.assign({}, fulfilPayload, { transferState: TransferInternalState.ABORTED_REJECTED }) const errorPayload = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.PAYEE_FSP_REJECTED_TXN).toApiErrorObject() @@ -383,6 +394,17 @@ const prepareTestData = async (dataObj) => { messageProtocolFulfil.metadata.event.type = TransferEventType.FULFIL messageProtocolFulfil.metadata.event.action = TransferEventAction.COMMIT + const messageProtocolFxFulfil = Util.clone(messageProtocolFxPrepare) + messageProtocolFxFulfil.id = randomUUID() + messageProtocolFxFulfil.from = fxTransferPayload.counterPartyFsp + messageProtocolFxFulfil.to = fxTransferPayload.initiatingFsp + messageProtocolFxFulfil.content.headers = fxFulfilAbortRejectHeaders + messageProtocolFxFulfil.content.uriParams = { id: fxTransferPayload.commitRequestId } + messageProtocolFxFulfil.content.payload = fxFulfilPayload + messageProtocolFxFulfil.metadata.event.id = randomUUID() + messageProtocolFxFulfil.metadata.event.type = TransferEventType.FULFIL + messageProtocolFxFulfil.metadata.event.action = TransferEventAction.FX_RESERVE + const messageProtocolReject = Util.clone(messageProtocolFulfil) messageProtocolReject.id = randomUUID() messageProtocolFulfil.content.uriParams = { id: transferPayload.transferId } @@ -402,12 +424,14 @@ const prepareTestData = async (dataObj) => { transferPayload, fxTransferPayload, fulfilPayload, + fxFulfilPayload, rejectPayload, errorPayload, messageProtocolPrepare, messageProtocolPrepareForwarded, messageProtocolFxPrepare, messageProtocolFulfil, + messageProtocolFxFulfil, messageProtocolReject, messageProtocolError, topicConfTransferPrepare, @@ -1239,6 +1263,136 @@ Test('Handlers test', async handlersTest => { test.end() }) + await transferProxyPrepare.test(` + Scheme R: PUT /fxTransfer call I.e. From: FXP → To: Proxy AR + No position changes should happen`, async (test) => { + const debtor = 'jurisdictionalFspPayerFsp' + + const td = await prepareTestData(testData) + await ProxyCache.getCache().addDfspIdToProxyMapping(debtor, td.proxyAR.participant.name) + + const prepareConfig = Utility.getKafkaConfig( + Config.KAFKA_CONFIG, + Enum.Kafka.Config.PRODUCER, + TransferEventType.TRANSFER.toUpperCase(), + TransferEventType.PREPARE.toUpperCase()) + prepareConfig.logger = Logger + + td.messageProtocolFxPrepare.content.from = debtor + td.messageProtocolFxPrepare.content.headers['fspiop-source'] = debtor + td.messageProtocolFxPrepare.content.payload.initiatingFsp = debtor + await Producer.produceMessage(td.messageProtocolFxPrepare, td.topicConfTransferPrepare, prepareConfig) + + try { + const positionPrepare = await wrapWithRetries(() => testConsumer.getEventsForFilter({ + topicFilter: 'topic-transfer-position-batch', + action: 'fx-prepare', + // To be keyed with the Proxy AR participantCurrencyId + keyFilter: td.proxyAR.participantCurrencyId.toString() + }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + test.ok(positionPrepare[0], 'Position prepare message with debtor key found') + } catch (err) { + test.notOk('Error should not be thrown') + console.error(err) + } + + // Fulfil the fxTransfer + const fulfilConfig = Utility.getKafkaConfig( + Config.KAFKA_CONFIG, + Enum.Kafka.Config.PRODUCER, + TransferEventType.TRANSFER.toUpperCase(), + TransferEventType.FULFIL.toUpperCase()) + fulfilConfig.logger = Logger + + td.messageProtocolFxFulfil.content.to = debtor + td.messageProtocolFxFulfil.content.headers['fspiop-destination'] = debtor + + testConsumer.clearEvents() + await Producer.produceMessage(td.messageProtocolFxFulfil, td.topicConfTransferFulfil, fulfilConfig) + + try { + const positionFxFulfil = await wrapWithRetries(() => testConsumer.getEventsForFilter({ + topicFilter: 'topic-notification-event', + action: 'fx-reserve', + valueToFilter: td.payer.name + }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + test.ok(positionFxFulfil[0], 'Position fulfil message with key found') + } catch (err) { + test.notOk('Error should not be thrown') + console.error(err) + } + + testConsumer.clearEvents() + test.end() + }) + + await transferProxyPrepare.test(` + Scheme R: PUT /fxTransfer call I.e. From: FXP → To: Proxy AR + with wrong headers - ABORT VALIDATION`, async (test) => { + const debtor = 'jurisdictionalFspPayerFsp' + + const td = await prepareTestData(testData) + await ProxyCache.getCache().addDfspIdToProxyMapping(debtor, td.proxyAR.participant.name) + + const prepareConfig = Utility.getKafkaConfig( + Config.KAFKA_CONFIG, + Enum.Kafka.Config.PRODUCER, + TransferEventType.TRANSFER.toUpperCase(), + TransferEventType.PREPARE.toUpperCase()) + prepareConfig.logger = Logger + + td.messageProtocolFxPrepare.content.from = debtor + td.messageProtocolFxPrepare.content.headers['fspiop-source'] = debtor + td.messageProtocolFxPrepare.content.payload.initiatingFsp = debtor + await Producer.produceMessage(td.messageProtocolFxPrepare, td.topicConfTransferPrepare, prepareConfig) + + try { + const positionPrepare = await wrapWithRetries(() => testConsumer.getEventsForFilter({ + topicFilter: 'topic-transfer-position-batch', + action: 'fx-prepare', + // To be keyed with the Proxy AR participantCurrencyId + keyFilter: td.proxyAR.participantCurrencyId.toString() + }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + test.ok(positionPrepare[0], 'Position prepare message with debtor key found') + } catch (err) { + test.notOk('Error should not be thrown') + console.error(err) + } + + // Fulfil the fxTransfer + const fulfilConfig = Utility.getKafkaConfig( + Config.KAFKA_CONFIG, + Enum.Kafka.Config.PRODUCER, + TransferEventType.TRANSFER.toUpperCase(), + TransferEventType.FULFIL.toUpperCase()) + fulfilConfig.logger = Logger + + td.messageProtocolFxFulfil.content.to = debtor + td.messageProtocolFxFulfil.content.headers['fspiop-destination'] = debtor + + // If initiatingFsp is proxy, fx fulfil handler doesn't validate fspiop-destination header. + // But it should validate fspiop-source header, because counterPartyFsp is not a proxy. + td.messageProtocolFxFulfil.content.headers['fspiop-source'] = 'wrongfsp' + + testConsumer.clearEvents() + await Producer.produceMessage(td.messageProtocolFxFulfil, td.topicConfTransferFulfil, fulfilConfig) + + try { + const positionFxFulfil = await wrapWithRetries(() => testConsumer.getEventsForFilter({ + topicFilter: 'topic-transfer-position-batch', + action: 'fx-abort-validation', + keyFilter: td.proxyAR.participantCurrencyId.toString() + }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + test.ok(positionFxFulfil[0], 'Position fulfil message with key found') + } catch (err) { + test.notOk('Error should not be thrown') + console.error(err) + } + + testConsumer.clearEvents() + test.end() + }) + await transferProxyPrepare.test(` Scheme R: PUT /transfers call I.e. From: Proxy RB → To: Proxy AR If it is a FX transfer with currency conversion diff --git a/test/unit/handlers/transfers/FxFulfilService.test.js b/test/unit/handlers/transfers/FxFulfilService.test.js index 72827f920..c113fc060 100644 --- a/test/unit/handlers/transfers/FxFulfilService.test.js +++ b/test/unit/handlers/transfers/FxFulfilService.test.js @@ -29,6 +29,7 @@ const { Db } = require('@mojaloop/database-lib') const { Enum, Util } = require('@mojaloop/central-services-shared') const { Consumer, Producer } = require('@mojaloop/central-services-stream').Util +const Cyril = require('../../../../src/domain/fx/cyril') const FxFulfilService = require('../../../../src/handlers/transfers/FxFulfilService') const fspiopErrorFactory = require('../../../../src/shared/fspiopErrorFactory') const Validator = require('../../../../src/handlers/transfers/validator') @@ -92,6 +93,12 @@ Test('FxFulfilService Tests -->', fxFulfilTest => { connect: sandbox.stub(), disconnect: sandbox.stub() }) + sandbox.stub(Cyril) + Cyril.processFxAbortMessage.returns({ + positionChanges: [{ + participantCurrencyId: 1 + }] + }) span = mocks.createTracerStub(sandbox).SpanStub test.end() }) @@ -168,6 +175,7 @@ Test('FxFulfilService Tests -->', fxFulfilTest => { const { service } = createFxFulfilServiceWithTestData(fixtures.fxFulfilKafkaMessageDto()) const transfer = { ilpCondition: fixtures.CONDITION, + initiatingFspName: fixtures.DFSP1_ID, counterPartyFspTargetParticipantCurrencyId: 123 } const payload = { fulfilment: 'wrongFulfilment' } @@ -179,9 +187,9 @@ Test('FxFulfilService Tests -->', fxFulfilTest => { t.equal(err.message, ERROR_MESSAGES.fxInvalidFulfilment) t.ok(producer.produceMessage.calledOnce) const [messageProtocol, topicConfig] = producer.produceMessage.lastCall.args - t.equal(topicConfig.topicName, TOPICS.transferPosition) - t.equal(topicConfig.key, String(transfer.counterPartyFspTargetParticipantCurrencyId)) - t.equal(messageProtocol.from, fixtures.FXP_ID) + t.ok(topicConfig.topicName === TOPICS.transferPosition || topicConfig.topicName === TOPICS.transferPositionBatch) + t.equal(topicConfig.key, String(1)) + t.equal(messageProtocol.from, fixtures.SWITCH_ID) t.equal(messageProtocol.to, fixtures.DFSP1_ID) t.equal(messageProtocol.metadata.event.action, Action.FX_ABORT_VALIDATION) checkErrorPayload(t)(messageProtocol.content.payload, fspiopErrorFactory.fxInvalidFulfilment()) diff --git a/test/unit/handlers/transfers/fxFulfilHandler.test.js b/test/unit/handlers/transfers/fxFulfilHandler.test.js index 2e5b3d38d..8dd7669ec 100644 --- a/test/unit/handlers/transfers/fxFulfilHandler.test.js +++ b/test/unit/handlers/transfers/fxFulfilHandler.test.js @@ -40,7 +40,9 @@ const { Util, Enum } = require('@mojaloop/central-services-shared') const { Consumer, Producer } = require('@mojaloop/central-services-stream').Util const FxFulfilService = require('../../../../src/handlers/transfers/FxFulfilService') +const ParticipantPositionChangesModel = require('../../../../src/models/position/participantPositionChanges') const fxTransferModel = require('../../../../src/models/fxTransfer') +const TransferFacade = require('../../../../src/models/transfer/facade') const Validator = require('../../../../src/handlers/transfers/validator') const TransferObjectTransform = require('../../../../src/domain/transfer/transform') const fspiopErrorFactory = require('../../../../src/shared/fspiopErrorFactory') @@ -78,6 +80,8 @@ Test('FX Transfer Fulfil handler -->', fxFulfilTest => { sandbox.stub(Validator) sandbox.stub(fxTransferModel.fxTransfer) sandbox.stub(fxTransferModel.watchList) + sandbox.stub(ParticipantPositionChangesModel) + sandbox.stub(TransferFacade) sandbox.stub(TransferObjectTransform, 'toFulfil') sandbox.stub(Consumer, 'getConsumer').returns({ commitMessageSync: async () => true @@ -161,8 +165,18 @@ Test('FX Transfer Fulfil handler -->', fxFulfilTest => { const counterPartyFsp = fixtures.FXP_ID const fxTransferPayload = fixtures.fxTransferDto({ initiatingFsp, counterPartyFsp }) const fxTransferDetailsFromDb = fixtures.fxtGetAllDetailsByCommitRequestIdDto(fxTransferPayload) + fxTransferModel.fxTransfer.getAllDetailsByCommitRequestId.resolves(fxTransferDetailsFromDb) fxTransferModel.fxTransfer.saveFxFulfilResponse.resolves({}) + fxTransferModel.fxTransfer.getByCommitRequestId.resolves(fxTransferDetailsFromDb) + fxTransferModel.fxTransfer.getByDeterminingTransferId.resolves([]) + fxTransferModel.fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer.resolves(fxTransferDetailsFromDb) + const mockPositionChanges = [ + { participantCurrencyId: 1, value: 100 } + ] + ParticipantPositionChangesModel.getReservedPositionChangesByCommitRequestId.resolves([]) + ParticipantPositionChangesModel.getReservedPositionChangesByTransferId.resolves(mockPositionChanges) + TransferFacade.getById.resolves({ payerfsp: 'testpayer' }) const metadata = fixtures.fulfilMetadataDto({ action: Action.FX_RESERVE }) const content = fixtures.fulfilContentDto({ @@ -178,7 +192,7 @@ Test('FX Transfer Fulfil handler -->', fxFulfilTest => { t.equal(messageProtocol.from, fixtures.SWITCH_ID) t.equal(messageProtocol.metadata.event.action, Action.FX_ABORT_VALIDATION) checkErrorPayload(t)(messageProtocol.content.payload, fspiopErrorFactory.fxHeaderSourceValidationError()) - t.equal(topicConfig.topicName, TOPICS.transferPosition) + t.ok(topicConfig.topicName === TOPICS.transferPosition || topicConfig.topicName === TOPICS.transferPositionBatch) t.end() }) @@ -214,6 +228,20 @@ Test('FX Transfer Fulfil handler -->', fxFulfilTest => { sandbox.stub(FxFulfilService.prototype, 'getFxTransferDetails').resolves(fxTransferDetails) sandbox.stub(FxFulfilService.prototype, 'validateHeaders').resolves() sandbox.stub(FxFulfilService.prototype, 'validateEventType').resolves() + const initiatingFsp = fixtures.DFSP1_ID + const counterPartyFsp = fixtures.FXP_ID + const fxTransferPayload = fixtures.fxTransferDto({ initiatingFsp, counterPartyFsp }) + const fxTransferDetailsFromDb = fixtures.fxtGetAllDetailsByCommitRequestIdDto(fxTransferPayload) + fxTransferModel.fxTransfer.getByCommitRequestId.resolves(fxTransferDetailsFromDb) + fxTransferModel.fxTransfer.getByDeterminingTransferId.resolves([]) + fxTransferModel.fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer.resolves(fxTransferDetailsFromDb) + const mockPositionChanges = [ + { participantCurrencyId: 1, value: 100 } + ] + ParticipantPositionChangesModel.getReservedPositionChangesByCommitRequestId.resolves([]) + ParticipantPositionChangesModel.getReservedPositionChangesByTransferId.resolves(mockPositionChanges) + TransferFacade.getById.resolves({ payerfsp: 'testpayer' }) + Comparators.duplicateCheckComparator.resolves({ hasDuplicateId: false, hasDuplicateHash: false @@ -229,8 +257,8 @@ Test('FX Transfer Fulfil handler -->', fxFulfilTest => { const [messageProtocol, topicConfig] = producer.produceMessage.lastCall.args t.equal(messageProtocol.metadata.event.action, Action.FX_ABORT_VALIDATION) checkErrorPayload(t)(messageProtocol.content.payload, fspiopErrorFactory.fxInvalidFulfilment()) - t.equal(topicConfig.topicName, TOPICS.transferPosition) - t.equal(topicConfig.key, String(fxTransferDetails.counterPartyFspTargetParticipantCurrencyId)) + t.ok(topicConfig.topicName === TOPICS.transferPosition || topicConfig.topicName === TOPICS.transferPositionBatch) + t.equal(topicConfig.key, String(1)) t.end() })